git ssb

0+

cel / ssb-wikimedia



Tree: 23f1a2b8649b31b64b0da903aec6a9ab15b8fccd

Files: 23f1a2b8649b31b64b0da903aec6a9ab15b8fccd / bin.js

14727 bytesRaw
1#!/usr/bin/env node
2
3var fs = require('fs')
4var path = require('path')
5var URL = require('url')
6var http = require('http')
7var https = require('https')
8var crypto = require('crypto')
9var readline = require('readline')
10var os = require('os')
11
12var ssbClient = require('ssb-client')
13var pull = require('pull-stream')
14
15var pkg = require('./package')
16
17var userAgentBase = pkg.name + '/' + pkg.version
18var userAgentContact
19var userAgentBot = false
20
21function estimateMessageSize(content) {
22 var draftMsg = {
23 key: '%0000000000000000000000000000000000000000000=.sha256',
24 value: {
25 previous: '%0000000000000000000000000000000000000000000=.sha256',
26 author: '@0000000000000000000000000000000000000000000=.ed25519',
27 sequence: 100000,
28 timestamp: 1000000000000.0001,
29 hash: 'sha256',
30 content: content,
31 signature: '00000000000000000000000000000000000000000000000000000000000000000000000000000000000000==.sig.ed25519'
32 }
33 }
34 return JSON.stringify(draftMsg, null, 2).length
35}
36
37function getJson(url, cb) {
38 var opts = URL.parse(url)
39 opts.headers = {
40 'User-Agent': userAgentBase
41 + (userAgentContact ? ' (' + userAgentContact + ')' : '')
42 + (userAgentBot ? ' bot' : '')
43 }
44 var h = opts.protocol === 'https:' ? https : http
45 h.get(opts, function (res) {
46 if (res.statusCode !== 200) {
47 console.error(res.headers, url)
48 return cb(new Error('HTTP ' + res.statusCode + ' ' + res.statusMessage))
49 }
50 var bufs = []
51 res.on('data', function (buf) {
52 bufs.push(buf)
53 })
54 res.on('end', function () {
55 res.removeListener('error', cb)
56 var buf = Buffer.concat(bufs)
57 bufs = null
58 var data
59 try {
60 data = JSON.parse(buf.toString('utf8'))
61 } catch(e) {
62 return cb(e)
63 }
64 cb(null, data)
65 })
66 res.on('error', cb)
67 })
68}
69
70function publishDrafts(sbot, drafts, cb) {
71 var draftIdIndex = {}
72 drafts.forEach(function (draft, i) {
73 draftIdIndex[draft.draftId] = i
74 })
75 var ids = []
76
77 function replaceDraftIds(obj) {
78 if (typeof obj === 'string') {
79 var i = draftIdIndex[obj]
80 if (typeof i === 'number') {
81 var id = ids[i]
82 if (!id) throw new ReferenceError('draft referernces unknown message')
83 return id
84 }
85 } else if (Array.isArray(obj)) {
86 return obj.map(replaceDraftIds)
87 } else if (obj !== null && typeof obj === 'object') {
88 var o = {}
89 for (var k in obj) o[k] = replaceDraftIds(obj[k])
90 return o
91 }
92 return obj
93 }
94
95 pull(
96 pull.values(drafts),
97 pull.asyncMap(function (draft, cb) {
98 var content = replaceDraftIds(draft.content)
99 sbot.publish(content, function (err, msg) {
100 if (err) return cb(err)
101 ids.push(msg.key)
102 cb(null, msg)
103 })
104 }),
105 pull.collect(cb)
106 )
107}
108
109var args = process.argv.slice(2)
110var yes = false
111var dry = false
112var help = false
113var urls = []
114args.forEach(function (arg) {
115 if (arg[0] === '-') switch (arg) {
116 case '-n': return dry = true
117 case '-y': return yes = true
118 case '-h': return help = true
119 default: throw 'Unknown argument: ' + arg
120 } else urls.push(arg)
121})
122
123if (help) {
124 process.stdout.write(fs.readFileSync(path.join(__dirname, 'usage.txt')))
125 process.exit(0)
126}
127
128ssbClient(function (err, sbot, config) {
129 if (err) throw err
130 var conf = config.wikimedia || {}
131 userAgentContact = conf.contact
132 userAgentBot = conf.bot
133
134 if (urls.length === 0) {
135 var pagesFile = path.join(config.path, 'wikimedia-pages.txt')
136 var pagesData = fs.readFileSync(pagesFile, 'utf8')
137 urls = pagesData.split('\n').filter(RegExp.prototype.test.bind(/^[^#]/))
138 if (!urls.length) {
139 console.log('No pages to sync.')
140 return sbot.close()
141 }
142 }
143
144 var pagesInfo = urls.map(function (page) {
145 var m = /^(https?:\/\/.*?)(\/wiki)?\/(.*)$/.exec(page)
146 if (!m) throw 'Unable to parse page URL ' + page
147 return {
148 site: m[1] + '/',
149 api: m[1] + (m[2] ? '/w' : '/wiki') + '/api.php',
150 title: m[3]
151 }
152 })
153 var pagesInfoByApi = {}
154 pagesInfo.forEach(function (pageInfo) {
155 var infos = pagesInfoByApi[pageInfo.api] || (pagesInfoByApi[pageInfo.api] = [])
156 infos.push(pageInfo)
157 })
158 console.log('Normalizing titles...')
159 var waiting = 0
160 for (var api in pagesInfoByApi) (function (api) {
161 var pagesInfoForApi = pagesInfoByApi[api]
162 var pagesInfoForApiByTitle = {}
163 var titles = pagesInfoForApi.map(function (info) {
164 pagesInfoForApiByTitle[info.title] = info
165 return info.title
166 })
167 var url = api + '?format=json&action=query' +
168 '&titles=' + encodeURIComponent('\x1f' + titles.join('\x1f')) +
169 '&' // trailing & needed for some reason
170 waiting++
171 getJson(url, function (err, data) {
172 if (err) throw err
173 if (data.warnings) console.trace('Warnings:', data.warnings)
174 if (data.query.normalized) data.query.normalized.forEach(function (norm) {
175 var info = pagesInfoForApiByTitle[norm.from]
176 if (!info) {
177 console.error(JSON.stringify({titles: titles, response: data}, 0, 2))
178 throw new Error('Unexpected title in server response')
179 }
180 // console.log('Normalized title', norm.from, norm.to)
181 info.title = norm.to
182 })
183 if (!--waiting) next()
184 })
185 }(api))
186
187 function next() {
188 console.log('Getting revisions...')
189 var userHashes = {}
190 pull(
191 pull.values(pagesInfo),
192 pull.asyncMap(function (pageInfo, cb) {
193 // Calculate blob id for page URL + title, for linking
194 pull(
195 pull.once(pageInfo.site + '\t' + pageInfo.title),
196 sbot.blobs.add(function (err, hash) {
197 pageInfo.hash = hash
198 cb(null, pageInfo)
199 })
200 )
201 }),
202 pull.asyncMap(function (pageInfo, cb) {
203 // Get previous messages for this page.
204 // Simple solution: find the revision with latest timestamp.
205 var maxRevTs = ''
206 var maxRevMsgId
207 pull(
208 sbot.links({
209 dest: pageInfo.hash,
210 rel: 'pageId',
211 values: true,
212 meta: false
213 }),
214 pull.filter(function (msg) {
215 var c = msg && msg.value && msg.value.content
216 return c
217 && c.type === 'wikimedia/revisions'
218 && c.site === pageInfo.site
219 && c.title === pageInfo.title
220 }),
221 pull.drain(function (msg) {
222 var c = msg && msg.value && msg.value.content
223 var revs = Array.isArray(c.revisions) && c.revisions
224 if (revs) revs.forEach(function (rev) {
225 if (rev && rev.timestamp > maxRevTs) {
226 maxRevTs = rev.timestamp
227 maxRevMsgId = msg.key
228 }
229 })
230 }, function (err) {
231 if (err) return cb(err)
232 pageInfo.latestMsgId = maxRevMsgId
233 pageInfo.latestRevTs = maxRevTs
234 cb(null, pageInfo)
235 })
236 )
237 }),
238 pull.map(function (pageInfo) {
239 // Get new revisions.
240 var rvcontinue, rvdone
241 var rvstart = pageInfo.latestRevTs
242 var prevId = pageInfo.latestMsgId
243 var aborted
244 var first = true
245 var revisions = pull(
246 function (abort, cb) {
247 if (aborted = abort) return cb(abort)
248 if (rvdone) return cb(true)
249
250 console.log('Getting revisions for', pageInfo.title + '...',
251 rvstart || '', rvcontinue || '')
252 var url = api + '?format=json&action=query&prop=revisions&rvslots=*'
253 + '&titles=' + encodeURIComponent(pageInfo.title)
254 + '&rvprop=ids|timestamp|comment|user|sha1|size|slotsha1|slotsize|content|roles|flags|tags'
255 + '&rvdir=newer'
256 + (rvcontinue ? '&rvcontinue=' + rvcontinue : '')
257 + (rvstart ? '&rvstart=' + rvstart : '')
258 + '&rvlimit=50'
259 getJson(url, function (err, data) {
260 if (aborted) return err && console.trace(err)
261 if (err) return cb(err)
262
263 var warnings = data.warnings
264 if (warnings) {
265 if (warnings.main) {
266 if (warnings.main['*'] === 'Unrecognized parameter: rvslots.') {
267 delete warnings.main['*']
268 if (Object.keys(warnings.main).length === 0) {
269 delete warnings.main
270 }
271 }
272 }
273 if (warnings.revisions) {
274 if (warnings.revisions['*'] === 'Unrecognized values for parameter "rvprop": slotsha1, slotsize, roles.') {
275 delete warnings.revisions['*']
276 if (Object.keys(warnings.revisions).length === 0) {
277 delete warnings.revisions
278 }
279 }
280 }
281 if (Object.keys(warnings).length > 0) {
282 console.trace('Warnings:', warnings)
283 }
284 }
285
286 rvcontinue = data.continue && data.continue.rvcontinue
287 if (!rvcontinue) rvdone = true
288 var page
289 if (data.query) for (var pageid in data.query.pages) {
290 page = data.query.pages[pageid]
291 if (page.title === pageInfo.title) break
292 else page = null
293 }
294 if (!page) {
295 console.trace(data.query.pages, pageInfo)
296 return cb(new Error('Unable to find page'))
297 }
298 var revs = page.revisions || []
299 console.log('Got ' + revs.length + ' revisions')
300 cb(null, revs)
301 })
302 },
303 pull.flatten(),
304
305 pull.filter(function (rev) {
306 if (rev.timestamp === rvstart && first) {
307 first = false
308 return false
309 }
310 return true
311 }),
312
313 pull.through(function (rev) {
314 if (!rev.slots) {
315 // old API does not use slots.
316 // Transform result to be forward-compatible.
317 rev.slots = {
318 main: {
319 size: rev.size,
320 sha1: rev.sha1,
321 contentmodel: rev.contentmodel,
322 contentformat: rev.contentformat,
323 '*': rev['*']
324 }
325 }
326 delete rev.contentmodel
327 delete rev.contentformat
328 delete rev['*']
329 }
330 // duplicate values supplied in new API in slotsize and slotsha1
331 delete rev.sha1
332 delete rev.size
333 }),
334
335 pull.asyncMap(function (rev, cb) {
336 // Calculate blob id for user page URL + title, for linking
337 var hash = userHashes[rev.user]
338 if (hash) {
339 rev.userId = hash
340 return cb(null, rev)
341 }
342 pull(
343 pull.once(pageInfo.site + '\tUser:' + rev.user),
344 sbot.blobs.add(function (err, hash) {
345 rev.userId = userHashes[rev.user] = hash
346 cb(null, rev)
347 })
348 )
349 }),
350
351 pull.asyncMap(function (rev, cb) {
352 var waiting = 0
353 for (var slot in rev.slots) (function (slot) {
354 waiting++
355 var slotInfo = rev.slots[slot]
356 var content = slotInfo['*']
357 if (!content) {
358 console.trace(slotInfo)
359 return cb(new Error('Missing content'))
360 }
361 var sha1 = crypto.createHash('sha1').update(content).digest('hex')
362 if (sha1 !== slotInfo.sha1) {
363 console.trace(slotInfo, sha1)
364 return cb(new Error('Mismatched content sha1'))
365 }
366 pull(
367 pull.once(content),
368 sbot.blobs.add(function (err, hash) {
369 if (err) return cb(err)
370 slotInfo.link = hash
371 delete slotInfo['*']
372 if (!--waiting) cb(null, rev)
373 })
374 )
375 }(slot))
376 })
377 )
378
379 var queuedRevisions = []
380 var ended
381 function cbDraft(content, cb) {
382 if (!content.revisions.length) {
383 console.log('No revisions for', pageInfo.title)
384 return cb(true)
385 }
386 console.log('Prepared a message',
387 'with', content.revisions.length, 'revisions',
388 'for', pageInfo.title)
389 prevId = '%' + crypto.createHash('sha256').update(JSON.stringify(content)).digest('base64') + '.draft6'
390 cb(null, {
391 draftId: prevId,
392 content: content
393 })
394 }
395
396 return function (abort, cb) {
397 if (abort) return revisions(abort, cb)
398 if (ended) return cb(true)
399 var content = {
400 type: 'wikimedia/revisions',
401 site: pageInfo.site,
402 title: pageInfo.title,
403 pageId: pageInfo.hash,
404 parents: prevId ? [prevId] : undefined,
405 revisions: queuedRevisions.splice(0)
406 }
407 revisions(null, function next(end, revision) {
408 if (ended = end) return cbDraft(content, cb)
409 content.revisions.push(revision)
410 if (estimateMessageSize(content) > 8192) {
411 queuedRevisions.push(content.revisions.pop())
412 // console.log('filled msg for ', pageInfo.title, ' with ', content.revisions.length, 'revisions')
413 return cbDraft(content, cb)
414 }
415 revisions(null, next)
416 })
417 }
418 }),
419 pull.flatten(),
420
421 pull.collect(function (err, drafts) {
422 if (err) throw err
423 if (dry) {
424 console.log(JSON.stringify(drafts, 0, 2))
425 return sbot.close()
426 }
427 if (!drafts.length) {
428 console.log('No messages to publish.')
429 return sbot.close()
430 }
431 if (yes) return confirmed(true)
432 var rl = readline.createInterface({
433 input: process.stdin,
434 output: process.stdout
435 })
436 rl.question('Publish ' + drafts.length + ' messages? [Y/n] ', function (answer) {
437 rl.close()
438 confirmed(!/^n/i.test(answer))
439 })
440 function confirmed(yes) {
441 if (!yes) return sbot.close()
442 publishDrafts(sbot, drafts, function (err, msgs) {
443 if (err) throw err
444 console.log('Published:\n' + msgs.map(function (msg) {
445 return msg.key
446 }).join('\n'))
447 sbot.close()
448 })
449 }
450 })
451 )
452 }
453})
454

Built with git-ssb-web