#!/usr/bin/env node var fs = require('fs') var path = require('path') var URL = require('url') var http = require('http') var https = require('https') var crypto = require('crypto') var readline = require('readline') var os = require('os') var ssbClient = require('ssb-client') var pull = require('pull-stream') var pkg = require('./package') var userAgentBase = pkg.name + '/' + pkg.version var userAgentContact var userAgentBot = false function estimateMessageSize(content) { var draftMsg = { key: '%0000000000000000000000000000000000000000000=.sha256', value: { previous: '%0000000000000000000000000000000000000000000=.sha256', author: '@0000000000000000000000000000000000000000000=.ed25519', sequence: 100000, timestamp: 1000000000000.0001, hash: 'sha256', content: content, signature: '00000000000000000000000000000000000000000000000000000000000000000000000000000000000000==.sig.ed25519' } } return JSON.stringify(draftMsg, null, 2).length } function mapCollect(fn) { var aborted return function (read) { var queue = [] return function (abort, cb) { if (aborted = abort) return read(abort, cb) read(null, function next(end, data) { if (end) return cb(end) queue.push(data) var result = fn(queue) if (result) read(null, next) }) } } } function getJson(url, cb) { var opts = URL.parse(url) opts.headers = { 'User-Agent': userAgentBase + (userAgentContact ? ' (' + userAgentContact + ')' : '') + (userAgentBot ? ' bot' : '') } var h = opts.protocol === 'https:' ? https : http h.get(opts, function (res) { if (res.statusCode !== 200) return cb(new Error('HTTP ' + res.statusCode + ' ' + res.statusMessage)) var bufs = [] res.on('data', function (buf) { bufs.push(buf) }) res.on('end', function () { res.removeListener('error', cb) var buf = Buffer.concat(bufs) bufs = null var data try { data = JSON.parse(buf.toString('utf8')) } catch(e) { return cb(e) } cb(null, data) }) res.on('error', cb) }) } function publishDrafts(sbot, drafts, cb) { var draftIdIndex = {} drafts.forEach(function (draft, i) { draftIdIndex[draft.draftId] = i }) var ids = [] function replaceDraftIds(obj) { if (typeof obj === 'string') { var i = draftIdIndex[obj] if (typeof i === 'number') { var id = ids[i] if (!id) throw new ReferenceError('draft referernces unknown message') return id } } else if (Array.isArray(obj)) { return obj.map(replaceDraftIds) } else if (obj !== null && typeof obj === 'object') { var o = {} for (var k in obj) o[k] = replaceDraftIds(obj[k]) return o } return obj } pull( pull.values(drafts), pull.asyncMap(function (draft, cb) { var content = replaceDraftIds(draft.content) sbot.publish(content, function (err, msg) { if (err) return cb(err) ids.push(msg.key) cb(null, msg) }) }), pull.collect(cb) ) } var args = process.argv.slice(2) var yes = false var dry = false var help = false var urls = [] args.forEach(function (arg) { if (arg[0] === '-') switch (arg) { case '-n': return dry = true case '-y': return yes = true case '-h': return help = true default: throw 'Unknown argument: ' + arg } else urls.push(arg) }) if (help) { process.stdout.write(fs.readFileSync(path.join(__dirname, 'usage.txt'))) process.exit(0) } ssbClient(function (err, sbot, config) { if (err) throw err var conf = config.wikimedia || {} userAgentContact = conf.contact userAgentBot = conf.bot if (urls.length === 0) { var pagesFile = path.join(config.path, 'wikimedia-pages.txt') var pagesData = fs.readFileSync(pagesFile, 'utf8') urls = pagesData.split('\n').filter(RegExp.prototype.test.bind(/[^#]/)) if (!urls.length) { console.log('No pages to sync.') return sbot.close() } } var pagesInfo = urls.map(function (page) { var m = /^(.*?)\/wiki\/(.*)$/.exec(page) if (!m) throw 'Unable to parse page URL ' + page return { site: m[1] + '/', api: m[1] + '/w/api.php', title: m[2] } }) var pagesInfoByApi = {} pagesInfo.forEach(function (pageInfo) { var infos = pagesInfoByApi[pageInfo.api] || (pagesInfoByApi[pageInfo.api] = []) infos.push(pageInfo) }) console.log('Normalizing titles...') var waiting = 0 for (var api in pagesInfoByApi) (function (api) { var pagesInfoForApi = pagesInfoByApi[api] var pagesInfoForApiByTitle = {} var titles = pagesInfoForApi.map(function (info) { pagesInfoForApiByTitle[info.title] = info return info.title }) var url = api + '?format=json&action=query' + '&titles=' + encodeURIComponent('\x1f' + titles.join('\x1f')) waiting++ getJson(url, function (err, data) { if (err) throw err if (data.warnings) console.trace('Warnings:', data.warnings) if (data.query.normalized) data.query.normalized.forEach(function (norm) { var info = pagesInfoForApiByTitle[norm.from] if (!info) { console.error(JSON.stringify({titles: titles, response: data}, 0, 2)) throw new Error('Unexpected title in server response') } // console.log('Normalized title', norm.from, norm.to) info.title = norm.to }) if (!--waiting) next() }) }(api)) function next() { console.log('Getting revisions...') var userHashes = {} pull( pull.values(pagesInfo), pull.asyncMap(function (pageInfo, cb) { // Calculate blob id for page URL + title, for linking pull( pull.once(pageInfo.site + '\t' + pageInfo.title), sbot.blobs.add(function (err, hash) { pageInfo.hash = hash cb(null, pageInfo) }) ) }), pull.asyncMap(function (pageInfo, cb) { // Get previous messages for this page. // Simple solution: find the revision with latest timestamp. var maxRevTs = '' var maxRevMsgId pull( sbot.links({ dest: pageInfo.hash, rel: 'pageId', values: true, meta: false }), pull.filter(function (msg) { var c = msg && msg.value && msg.value.content return c && c.type === 'wikimedia/revisions' && c.site === pageInfo.site && c.title === pageInfo.title }), pull.drain(function (msg) { var c = msg && msg.value && msg.value.content var revs = Array.isArray(c.revisions) && c.revisions if (revs) revs.forEach(function (rev) { if (rev && rev.timestamp > maxRevTs) { maxRevTs = rev.timestamp maxRevMsgId == msg.key } }) }, function (err) { if (err) return cb(err) pageInfo.latestMsgId = maxRevMsgId pageInfo.latestRevTs = maxRevTs cb(null, pageInfo) }) ) }), pull.map(function (pageInfo) { // Get new revisions. var rvcontinue, rvdone var rvstart = pageInfo.latestRevTs var prevId = pageInfo.latestMsgId var aborted var revisions = pull( function (abort, cb) { if (aborted = abort) return cb(abort) if (rvdone) return cb(true) console.log('Getting revisions for', pageInfo.title + '...', rvstart || '', rvcontinue || '') var url = api + '?format=json&action=query&prop=revisions&rvslots=*' + '&titles=' + encodeURIComponent(pageInfo.title) + '&rvprop=ids|timestamp|comment|user|sha1|size|slotsha1|slotsize|content|roles|flags|tags' + '&rvdir=newer' + (rvcontinue ? '&rvcontinue=' + rvcontinue : '') + (rvstart ? '&rvstart=' + rvstart : '') + '&rvlimit=50' getJson(url, function (err, data) { if (aborted) return err && console.trace(err) if (err) return cb(err) var warnings = data.warnings if (warnings) { if (warnings.main) { if (warnings.main['*'] === 'Unrecognized parameter: rvslots.') { delete warnings.main['*'] if (Object.keys(warnings.main).length === 0) { delete warnings.main } } } if (warnings.revisions) { if (warnings.revisions['*'] === 'Unrecognized values for parameter "rvprop": slotsha1, slotsize, roles.') { delete warnings.revisions['*'] if (Object.keys(warnings.revisions).length === 0) { delete warnings.revisions } } } if (Object.keys(warnings).length > 0) { console.trace('Warnings:', warnings) } } rvcontinue = data.continue && data.continue.rvcontinue if (!rvcontinue) rvdone = true var page if (data.query) for (var pageid in data.query.pages) { page = data.query.pages[pageid] if (page.title === pageInfo.title) break else page = null } if (!page) { console.trace(data.query.pages, pageInfo) return cb(new Error('Unable to find page')) } var revs = page.revisions if (!revs) { console.trace(page, pageInfo) return cb(new Error('Unable to get revisions')) } console.log('Got ' + page.revisions.length + ' revisions') cb(null, page.revisions) }) }, pull.flatten(), pull.through(function (rev) { if (!rev.slots) { // old API does not use slots. // Transform result to be forward-compatible. rev.slots = { main: { size: rev.size, sha1: rev.sha1, contentmodel: rev.contentmodel, contentformat: rev.contentformat, '*': rev['*'] } } delete rev.contentmodel delete rev.contentformat delete rev['*'] } // duplicate values supplied in new API in slotsize and slotsha1 delete rev.sha1 delete rev.size }), pull.asyncMap(function (rev, cb) { // Calculate blob id for user page URL + title, for linking var hash = userHashes[rev.user] if (hash) { rev.userId = hash return cb(null, rev) } pull( pull.once(pageInfo.site + '\tUser:' + rev.user), sbot.blobs.add(function (err, hash) { rev.userId = userHashes[rev.user] = hash cb(null, rev) }) ) }), pull.asyncMap(function (rev, cb) { var waiting = 0 for (var slot in rev.slots) (function (slot) { waiting++ var slotInfo = rev.slots[slot] var content = slotInfo['*'] if (!content) { console.trace(slotInfo) return cb(new Error('Missing content')) } var sha1 = crypto.createHash('sha1').update(content).digest('hex') if (sha1 !== slotInfo.sha1) { console.trace(slotInfo, sha1) return cb(new Error('Mismatched content sha1')) } pull( pull.once(content), sbot.blobs.add(function (err, hash) { if (err) return cb(err) slotInfo.link = hash delete slotInfo['*'] if (!--waiting) cb(null, rev) }) ) }(slot)) }) ) var queuedRevisions = [] var ended function cbDraft(content, cb) { if (!content.revisions.length) { console.log('No revisions for', pageInfo.title) return cb(true) } console.log('Prepared a message', 'with', content.revisions.length, 'revisions', 'for', pageInfo.title) prevId = '%' + crypto.createHash('sha256').update(JSON.stringify(content)).digest('base64') + '.draft6' cb(null, { draftId: prevId, content: content }) } return function (abort, cb) { if (abort) return revisions(abort, cb) if (ended) return cb(true) var content = { type: 'wikimedia/revisions', site: pageInfo.site, title: pageInfo.title, pageId: pageInfo.hash, parents: prevId ? [prevId] : undefined, revisions: queuedRevisions.splice(0) } revisions(null, function next(end, revision) { if (ended = end) return cbDraft(content, cb) content.revisions.push(revision) if (estimateMessageSize(content) > 8192) { queuedRevisions.push(content.revisions.pop()) // console.log('filled msg for ', pageInfo.title, ' with ', content.revisions.length, 'revisions') return cbDraft(content, cb) } revisions(null, next) }) } }), pull.flatten(), pull.collect(function (err, drafts) { if (err) throw err if (dry) { console.log(JSON.stringify(drafts, 0, 2)) return sbot.close() } if (yes) return confirmed(true) var rl = readline.createInterface({ input: process.stdin, output: process.stdout }) rl.question('Publish ' + drafts.length + ' messages? [Y/n] ', function (answer) { rl.close() confirmed(!/^n/i.test(answer)) }) function confirmed(yes) { if (!yes) return sbot.close() publishDrafts(sbot, drafts, function (err, msgs) { if (err) throw err console.log('Published:\n' + msgs.map(function (msg) { return msg.key }.join('\n'))) sbot.close() }) } }) ) } })