Files: df6d424b86391119a29d101da2fe0a42140583c6 / lib / app.js
32925 bytesRaw
1 | var http = require('http') |
2 | var memo = require('asyncmemo') |
3 | var lru = require('hashlru') |
4 | var pkg = require('../package') |
5 | var u = require('./util') |
6 | var pull = require('pull-stream') |
7 | var multicb = require('multicb') |
8 | var paramap = require('pull-paramap') |
9 | var Contacts = require('./contacts') |
10 | var PrivateBox = require('private-box') |
11 | var About = require('./about') |
12 | var Follows = require('./follows') |
13 | var Serve = require('./serve') |
14 | var Render = require('./render') |
15 | var Git = require('ssb-git') |
16 | var cat = require('pull-cat') |
17 | var proc = require('child_process') |
18 | var toPull = require('stream-to-pull-stream') |
19 | var BoxStream = require('pull-box-stream') |
20 | var crypto = require('crypto') |
21 | var SsbNpmRegistry = require('ssb-npm-registry') |
22 | var os = require('os') |
23 | var path = require('path') |
24 | var fs = require('fs') |
25 | |
26 | var zeros = new Buffer(24); zeros.fill(0) |
27 | |
28 | module.exports = App |
29 | |
30 | function App(sbot, config) { |
31 | this.sbot = sbot |
32 | this.config = config |
33 | |
34 | var conf = config.patchfoo || {} |
35 | this.port = conf.port || 8027 |
36 | this.host = conf.host || 'localhost' |
37 | this.msgFilter = conf.filter == null ? 'all' : conf.filter |
38 | this.showPrivates = conf.showPrivates == null ? true : conf.showPrivates |
39 | this.previewVotes = conf.previewVotes == null ? false : conf.previewVotes |
40 | this.previewContacts = conf.previewContacts == null ? false : conf.previewContacts |
41 | this.useOoo = conf.ooo == null ? false : conf.ooo |
42 | |
43 | var host1 = /:/.test(this.host) ? '[' + this.host + ']' : this.host |
44 | this.baseUrl = 'http://' + host1 + ':' + this.port |
45 | |
46 | var base = conf.base || '/' |
47 | this.opts = { |
48 | base: base, |
49 | blob_base: conf.blob_base || conf.img_base || base, |
50 | img_base: conf.img_base || (base + 'image/'), |
51 | emoji_base: conf.emoji_base || (base + 'emoji/'), |
52 | encode_msgids: conf.encode_msgids == null ? true : Boolean(conf.encode_msgids), |
53 | codeInTextareas: conf.codeInTextareas, |
54 | } |
55 | |
56 | this.msgCache = lru(100) |
57 | this.getMsg = memo({cache: this.msgCache}, getMsgWithValue, sbot) |
58 | this.getMsgOoo = memo({cache: this.msgCache}, this.getMsgOoo) |
59 | this.getAbout = memo({cache: this.aboutCache = lru(500)}, |
60 | this._getAbout.bind(this)) |
61 | this.unboxContent = memo({cache: lru(100)}, function(value, cb){ |
62 | if (!sbot.private || !sbot.private.unbox) return cb(new Error('missing sbot.private.unbox')) |
63 | sbot.private.unbox(value, cb) |
64 | }) |
65 | this.reverseNameCache = lru(500) |
66 | this.reverseEmojiNameCache = lru(500) |
67 | this.getBlobSize = memo({cache: this.blobSizeCache = lru(100)}, |
68 | sbot.blobs.size.bind(sbot.blobs)) |
69 | this.getVotes = memo({cache: lru(100)}, this._getVotes.bind(this)) |
70 | this.getIdeaTitle = memo({cache: lru(100)}, this.getIdeaTitle) |
71 | |
72 | this.unboxMsg = this.unboxMsg.bind(this) |
73 | |
74 | this.render = new Render(this, this.opts) |
75 | this.git = new Git(this.sbot, this.config) |
76 | this.contacts = new Contacts(this.sbot) |
77 | this.follows = new Follows(this.sbot, this.contacts) |
78 | this.about = new About(this, sbot.id, this.follows) |
79 | this.serveSsbNpmRegistry = SsbNpmRegistry.respond(this.sbot, this.config) |
80 | |
81 | this.monitorBlobWants() |
82 | } |
83 | |
84 | App.prototype.go = function () { |
85 | var self = this |
86 | var server = http.createServer(function (req, res) { |
87 | new Serve(self, req, res).go() |
88 | }) |
89 | server.listen(self.port, self.host, onListening) |
90 | function onListening() { |
91 | var addr = server.address() |
92 | var host = addr.family === 'IPv6' ? '[' + addr.address + ']' : addr.address |
93 | self.log('Listening on http://' + host + ':' + addr.port) |
94 | } |
95 | |
96 | // invalidate cached About info when new About messages come in |
97 | if (!self.sbot.links) return console.error('missing sbot.links') |
98 | else pull( |
99 | self.sbot.links({rel: 'about', old: false, values: true}), |
100 | pull.drain(function (link) { |
101 | self.aboutCache.remove(link.dest) |
102 | }, function (err) { |
103 | if (err) throw err |
104 | }) |
105 | ) |
106 | |
107 | // keep alive ssb client connection |
108 | setInterval(self.sbot.whoami, 10e3) |
109 | } |
110 | |
111 | var logPrefix = '[' + pkg.name + ']' |
112 | App.prototype.log = console.log.bind(console, logPrefix) |
113 | App.prototype.error = console.error.bind(console, logPrefix) |
114 | |
115 | App.prototype.unboxContentWithKey = function (content, key, cb) { |
116 | if (!key) return this.unboxContent(content, cb) |
117 | var data |
118 | try { |
119 | var contentBuf = new Buffer(content.replace(/\.box.*$/, ''), 'base64') |
120 | var keyBuf = new Buffer(key, 'base64') |
121 | data = PrivateBox.multibox_open_body(contentBuf, keyBuf) |
122 | if (!data) return cb(new Error('failed to decrypt')) |
123 | data = JSON.parse(data.toString('utf8')) |
124 | } catch(e) { |
125 | return cb(new Error(e.stack || e)) |
126 | } |
127 | cb(null, data) |
128 | } |
129 | |
130 | App.prototype.unboxMsgWithKey = function (msg, key, cb) { |
131 | var self = this |
132 | var c = msg && msg.value && msg.value.content |
133 | if (typeof c !== 'string') cb(null, msg) |
134 | else self.unboxContentWithKey(c, key, function (err, content) { |
135 | if (err) { |
136 | self.error('unbox:', err) |
137 | return cb(null, msg) |
138 | } else if (!content) { |
139 | return cb(null, msg) |
140 | } |
141 | var m = {} |
142 | for (var k in msg) m[k] = msg[k] |
143 | m.value = {} |
144 | for (var k in msg.value) m.value[k] = msg.value[k] |
145 | m.value.content = content |
146 | m.value.private = true |
147 | cb(null, m) |
148 | }) |
149 | } |
150 | |
151 | App.prototype.unboxMsg = function (msg, cb) { |
152 | return this.unboxMsgWithKey(msg, null, cb) |
153 | } |
154 | |
155 | App.prototype.search = function (opts) { |
156 | var fsearch = this.sbot.fulltext && this.sbot.fulltext.search |
157 | if (fsearch) return fsearch(opts) |
158 | var search = this.sbot.search && this.sbot.search.query |
159 | if (search) return search({query: opts}) |
160 | return pull.error(new Error('Search needs ssb-fulltext or ssb-search plugin')) |
161 | } |
162 | |
163 | App.prototype.advancedSearch = function (opts) { |
164 | return pull( |
165 | opts.channel ? |
166 | this.sbot.backlinks.read({ |
167 | dest: '#' + opts.channel, |
168 | reverse: true, |
169 | }) |
170 | : opts.dest ? |
171 | this.sbot.links({ |
172 | values: true, |
173 | dest: opts.dest, |
174 | source: opts.source || undefined, |
175 | reverse: true, |
176 | }) |
177 | : opts.source ? |
178 | this.sbotCreateUserStream({ |
179 | reverse: true, |
180 | id: opts.source |
181 | }) |
182 | : |
183 | this.sbot.createFeedStream({ |
184 | reverse: true, |
185 | }), |
186 | this.unboxMessages(), |
187 | opts.text && pull.filter(filterByText(opts.text)) |
188 | ) |
189 | } |
190 | |
191 | function forSome(each) { |
192 | return function some(obj) { |
193 | if (obj == null) return false |
194 | if (typeof obj === 'string') return each(obj) |
195 | if (Array.isArray(obj)) return obj.some(some) |
196 | if (typeof obj === 'object') |
197 | for (var k in obj) if (some(obj[k])) return true |
198 | return false |
199 | } |
200 | } |
201 | |
202 | function filterByText(str) { |
203 | if (!str) return function () { return true } |
204 | var search = new RegExp(str, 'i') |
205 | var matches = forSome(search.test.bind(search)) |
206 | return function (msg) { |
207 | var c = msg.value.content |
208 | return c && matches(c) |
209 | } |
210 | } |
211 | |
212 | App.prototype.getMsgDecrypted = function (key, cb) { |
213 | var self = this |
214 | this.getMsg(key, function (err, msg) { |
215 | if (err) return cb(err) |
216 | self.unboxMsg(msg, cb) |
217 | }) |
218 | } |
219 | |
220 | App.prototype.getMsgOoo = function (key, cb) { |
221 | var ooo = this.sbot.ooo |
222 | if (!ooo) return cb(new Error('missing ssb-ooo plugin')) |
223 | ooo.get(key, cb) |
224 | } |
225 | |
226 | App.prototype.getMsgDecryptedOoo = function (key, cb) { |
227 | var self = this |
228 | this.getMsgOoo(key, function (err, msg) { |
229 | if (err) return cb(err) |
230 | self.unboxMsg(msg, cb) |
231 | }) |
232 | } |
233 | |
234 | App.prototype.publishMayRedirect = function (content, cb) { |
235 | var publishguard = this.sbot.publishguard |
236 | if (Array.isArray(content.recps)) { |
237 | var recps = content.recps.map(u.linkDest) |
238 | if (publishguard && publishguard.privatePublishGetUrl) { |
239 | return publishguard.privatePublishGetUrl({ |
240 | content: content, |
241 | recps: recps, |
242 | redirectBase: this.baseUrl + '/' |
243 | }, onPublishGetUrl) |
244 | } |
245 | this.sbot.private.publish(content, recps, cb) |
246 | } else { |
247 | if (publishguard && publishguard.publishGetUrl) { |
248 | return publishguard.publishGetUrl({ |
249 | content: content, |
250 | redirectBase: this.baseUrl + '/' |
251 | }, onPublishGetUrl) |
252 | } |
253 | this.sbot.publish(content, cb) |
254 | } |
255 | function onPublishGetUrl(err, url) { |
256 | if (err) return cb(err) |
257 | cb({redirectUrl: url}) |
258 | } |
259 | } |
260 | |
261 | App.prototype.publish = function (content, cb) { |
262 | var self = this |
263 | function tryPublish(triesLeft) { |
264 | if (Array.isArray(content.recps)) { |
265 | var recps = content.recps.map(u.linkDest) |
266 | self.sbot.private.publish(content, recps, next) |
267 | } else { |
268 | self.sbot.publish(content, next) |
269 | } |
270 | function next(err, msg) { |
271 | if (err) { |
272 | if (triesLeft > 0) { |
273 | if (/^expected previous:/.test(err.message)) { |
274 | return tryPublish(triesLeft-1) |
275 | } |
276 | } |
277 | } |
278 | return cb(err, msg) |
279 | } |
280 | } |
281 | tryPublish(2) |
282 | } |
283 | |
284 | App.prototype.wantSizeBlob = function (id, cb) { |
285 | // only want() the blob if we don't already have it |
286 | var self = this |
287 | var blobs = this.sbot.blobs |
288 | blobs.size(id, function (err, size) { |
289 | if (size != null) return cb(null, size) |
290 | self.blobWants[id] = true |
291 | blobs.want(id, function (err) { |
292 | if (err) return cb(err) |
293 | blobs.size(id, cb) |
294 | }) |
295 | }) |
296 | } |
297 | |
298 | App.prototype.addBlobRaw = function (cb) { |
299 | var done = multicb({pluck: 1, spread: true}) |
300 | var sink = pull( |
301 | u.pullLength(done()), |
302 | this.sbot.blobs.add(done()) |
303 | ) |
304 | done(function (err, size, hash) { |
305 | if (err) return cb(err) |
306 | cb(null, {link: hash, size: size}) |
307 | }) |
308 | return sink |
309 | } |
310 | |
311 | App.prototype.addBlob = function (isPrivate, cb) { |
312 | if (!isPrivate) return this.addBlobRaw(cb) |
313 | else return this.addBlobPrivate(cb) |
314 | } |
315 | |
316 | App.prototype.addBlobPrivate = function (cb) { |
317 | var bufs = [] |
318 | var self = this |
319 | // use the hash of the cleartext as the key to encrypt the blob |
320 | var hash = crypto.createHash('sha256') |
321 | return pull.drain(function (buf) { |
322 | bufs.push(buf) |
323 | hash.update(buf) |
324 | }, function (err) { |
325 | if (err) return cb(err) |
326 | var secret = hash.digest() |
327 | pull( |
328 | pull.values(bufs), |
329 | BoxStream.createBoxStream(secret, zeros), |
330 | self.addBlobRaw(function (err, link) { |
331 | if (err) return cb(err) |
332 | link.key = secret.toString('base64') |
333 | cb(null, link) |
334 | }) |
335 | ) |
336 | }) |
337 | } |
338 | |
339 | App.prototype.getBlob = function (id, key) { |
340 | if (!key) return this.sbot.blobs.get(id) |
341 | if (typeof key === 'string') key = new Buffer(key, 'base64') |
342 | return pull( |
343 | this.sbot.blobs.get(id), |
344 | BoxStream.createUnboxStream(key, zeros) |
345 | ) |
346 | } |
347 | |
348 | App.prototype.pushBlob = function (id, cb) { |
349 | console.error('pushing blob', id) |
350 | this.sbot.blobs.push(id, cb) |
351 | } |
352 | |
353 | App.prototype.readBlob = function (link) { |
354 | link = u.toLink(link) |
355 | return this.sbot.blobs.get({ |
356 | hash: link.link, |
357 | size: link.size, |
358 | }) |
359 | } |
360 | |
361 | App.prototype.readBlobSlice = function (link, opts) { |
362 | if (this.sbot.blobs.getSlice) return this.sbot.blobs.getSlice({ |
363 | hash: link.link, |
364 | size: link.size, |
365 | start: opts.start, |
366 | end: opts.end, |
367 | }) |
368 | return pull( |
369 | this.readBlob(link), |
370 | u.pullSlice(opts.start, opts.end) |
371 | ) |
372 | } |
373 | |
374 | App.prototype.ensureHasBlobs = function (links, cb) { |
375 | var self = this |
376 | var done = multicb({pluck: 1}) |
377 | links.filter(Boolean).forEach(function (link) { |
378 | var cb = done() |
379 | self.sbot.blobs.size(link.link, function (err, size) { |
380 | if (err) cb(err) |
381 | else if (size == null) cb(null, link) |
382 | else cb() |
383 | }) |
384 | }) |
385 | done(function (err, missingLinks) { |
386 | if (err) console.trace(err) |
387 | missingLinks = missingLinks.filter(Boolean) |
388 | if (missingLinks.length == 0) return cb() |
389 | return cb({name: 'BlobNotFoundError', links: missingLinks}) |
390 | }) |
391 | } |
392 | |
393 | App.prototype.getReverseNameSync = function (name) { |
394 | var id = this.reverseNameCache.get(name) |
395 | return id |
396 | } |
397 | |
398 | App.prototype.getReverseEmojiNameSync = function (name) { |
399 | return this.reverseEmojiNameCache.get(name) |
400 | } |
401 | |
402 | App.prototype.getNameSync = function (name) { |
403 | var about = this.aboutCache.get(name) |
404 | return about && about.name |
405 | } |
406 | |
407 | function sbotGet(sbot, id, cb) { |
408 | // ssb-ooo@1.0.1 (a50da3928500f3ac0fbead0a1b335a3dd5bbc096): raw=true |
409 | // ssb-ooo@1.1.0 (f7302d12e56d566b84205bbc0c8b882ae6fd9b12): ooo=false |
410 | if (sbot.ooo) { |
411 | sbot.get({id: id, raw: true, ooo: false}, cb) |
412 | } else { |
413 | sbot.get(id, cb) |
414 | } |
415 | } |
416 | |
417 | function getMsgWithValue(sbot, id, cb) { |
418 | if (!id) return cb() |
419 | sbotGet(sbot, id, function (err, value) { |
420 | if (err) return cb(err) |
421 | cb(null, {key: id, value: value}) |
422 | }) |
423 | } |
424 | |
425 | App.prototype._getAbout = function (id, cb) { |
426 | var self = this |
427 | if (!u.isRef(id)) return cb(null, {}) |
428 | self.about.get(id, function (err, about) { |
429 | if (err) return cb(err) |
430 | var sigil = id[0] || '@' |
431 | if (about.name && about.name[0] !== sigil) { |
432 | about.name = sigil + about.name |
433 | } |
434 | self.reverseNameCache.set(about.name, id) |
435 | cb(null, about) |
436 | }) |
437 | } |
438 | |
439 | App.prototype.pullGetMsg = function (id) { |
440 | return pull.asyncMap(this.getMsg)(pull.once(id)) |
441 | } |
442 | |
443 | App.prototype.createLogStream = function (opts) { |
444 | opts = opts || {} |
445 | return opts.sortByTimestamp |
446 | ? this.createFeedStream(opts) |
447 | : this.sbot.createLogStream(opts) |
448 | } |
449 | |
450 | App.prototype.createFeedStream = function (opts) { |
451 | // work around opts.gt being treated as opts.gte sometimes |
452 | return pull( |
453 | this.sbot.createFeedStream(opts), |
454 | pull.filter(function (msg) { |
455 | var ts = msg && msg.value && msg.value.timestamp |
456 | return typeof ts === 'number' && ts !== opts.gt && ts !== opts.lt |
457 | }) |
458 | ) |
459 | } |
460 | |
461 | var stateVals = { |
462 | connected: 3, |
463 | connecting: 2, |
464 | disconnecting: 1, |
465 | } |
466 | |
467 | function comparePeers(a, b) { |
468 | var aState = stateVals[a.state] || 0 |
469 | var bState = stateVals[b.state] || 0 |
470 | return (bState - aState) |
471 | || (b.stateChange|0 - a.stateChange|0) |
472 | } |
473 | |
474 | App.prototype.streamPeers = function (opts) { |
475 | var gossip = this.sbot.gossip |
476 | return u.readNext(function (cb) { |
477 | gossip.peers(function (err, peers) { |
478 | if (err) return cb(err) |
479 | if (opts) peers = peers.filter(function (peer) { |
480 | for (var k in opts) if (opts[k] !== peer[k]) return false |
481 | return true |
482 | }) |
483 | peers.sort(comparePeers) |
484 | cb(null, pull.values(peers)) |
485 | }) |
486 | }) |
487 | } |
488 | |
489 | App.prototype.getContact = function (source, dest, cb) { |
490 | var self = this |
491 | pull( |
492 | self.sbot.links({source: source, dest: dest, rel: 'contact', reverse: true, |
493 | values: true, meta: false, keys: false}), |
494 | pull.filter(function (value) { |
495 | var c = value && value.content |
496 | return c && c.type === 'contact' |
497 | }), |
498 | pull.take(1), |
499 | pull.reduce(function (acc, value) { |
500 | // trinary logic from ssb-friends |
501 | return value.content.following ? true |
502 | : value.content.flagged || value.content.blocking ? false |
503 | : null |
504 | }, null, cb) |
505 | ) |
506 | } |
507 | |
508 | App.prototype.unboxMessages = function () { |
509 | return paramap(this.unboxMsg, 16) |
510 | } |
511 | |
512 | App.prototype.streamChannels = function (opts) { |
513 | return pull( |
514 | this.sbotMessagesByType({type: 'channel', reverse: true}), |
515 | this.unboxMessages(), |
516 | pull.filter(function (msg) { |
517 | return msg.value.content.subscribed |
518 | }), |
519 | pull.map(function (msg) { |
520 | return msg.value.content.channel |
521 | }), |
522 | pull.unique() |
523 | ) |
524 | } |
525 | |
526 | App.prototype.streamMyChannels = function (id, opts) { |
527 | // use ssb-query plugin if it is available, since it has an index for |
528 | // author + type |
529 | if (this.sbot.query) return pull( |
530 | this.sbot.query.read({ |
531 | reverse: true, |
532 | query: [ |
533 | {$filter: { |
534 | value: { |
535 | author: id, |
536 | content: {type: 'channel'} |
537 | } |
538 | }}, |
539 | {$map: ['value', 'content']} |
540 | ] |
541 | }), |
542 | pull.unique('channel'), |
543 | pull.filter('subscribed'), |
544 | pull.map('channel') |
545 | ) |
546 | |
547 | return pull( |
548 | this.sbotCreateUserStream({id: id, reverse: true}), |
549 | this.unboxMessages(), |
550 | pull.map(function (msg) { |
551 | return msg.value.content |
552 | }), |
553 | pull.filter(function (c) { |
554 | return c.type === 'channel' |
555 | }), |
556 | pull.unique('channel'), |
557 | pull.filter('subscribed'), |
558 | pull.map('channel') |
559 | ) |
560 | } |
561 | |
562 | App.prototype.streamTags = function () { |
563 | return pull( |
564 | this.sbotMessagesByType({type: 'tag', reverse: true}), |
565 | this.unboxMessages(), |
566 | pull.filter(function (msg) { |
567 | return !msg.value.content.message |
568 | }) |
569 | ) |
570 | } |
571 | |
572 | function compareVoted(a, b) { |
573 | return b.value - a.value |
574 | } |
575 | |
576 | App.prototype.getVoted = function (_opts, cb) { |
577 | if (isNaN(_opts.limit)) return pull.error(new Error('missing limit')) |
578 | var self = this |
579 | var opts = { |
580 | type: 'vote', |
581 | limit: _opts.limit * 100, |
582 | reverse: !!_opts.reverse, |
583 | gt: _opts.gt || undefined, |
584 | lt: _opts.lt || undefined, |
585 | } |
586 | |
587 | var votedObj = {} |
588 | var votedArray = [] |
589 | var numItems = 0 |
590 | var firstTimestamp, lastTimestamp |
591 | pull( |
592 | self.sbotMessagesByType(opts), |
593 | self.unboxMessages(), |
594 | pull.take(function () { |
595 | return numItems < _opts.limit |
596 | }), |
597 | pull.drain(function (msg) { |
598 | if (!firstTimestamp) firstTimestamp = msg.timestamp |
599 | lastTimestamp = msg.timestamp |
600 | var vote = msg.value.content.vote |
601 | if (!vote) return |
602 | var target = u.linkDest(vote) |
603 | var votes = votedObj[target] |
604 | if (!votes) { |
605 | numItems++ |
606 | votes = {id: target, value: 0, feedsObj: {}, feeds: []} |
607 | votedObj[target] = votes |
608 | votedArray.push(votes) |
609 | } |
610 | if (msg.value.author in votes.feedsObj) { |
611 | if (!opts.reverse) return // leave latest vote value as-is |
612 | // remove old vote value |
613 | votes.value -= votes.feedsObj[msg.value.author] |
614 | } else { |
615 | votes.feeds.push(msg.value.author) |
616 | } |
617 | var value = vote.value > 0 ? 1 : vote.value < 0 ? -1 : 0 |
618 | votes.feedsObj[msg.value.author] = value |
619 | votes.value += value |
620 | }, function (err) { |
621 | if (err && err !== true) return cb(err) |
622 | var items = votedArray |
623 | if (opts.reverse) items.reverse() |
624 | items.sort(compareVoted) |
625 | cb(null, {items: items, |
626 | firstTimestamp: firstTimestamp, |
627 | lastTimestamp: lastTimestamp}) |
628 | }) |
629 | ) |
630 | } |
631 | |
632 | App.prototype.createAboutStreams = function (id) { |
633 | return this.about.createAboutStreams(id) |
634 | } |
635 | |
636 | App.prototype.streamEmojis = function () { |
637 | return pull( |
638 | cat([ |
639 | this.sbot.links({ |
640 | rel: 'mentions', |
641 | source: this.sbot.id, |
642 | dest: '&', |
643 | values: true |
644 | }), |
645 | this.sbot.links({rel: 'mentions', dest: '&', values: true}) |
646 | ]), |
647 | this.unboxMessages(), |
648 | pull.map(function (msg) { return msg.value.content.mentions }), |
649 | pull.flatten(), |
650 | pull.filter('emoji'), |
651 | pull.unique('link') |
652 | ) |
653 | } |
654 | |
655 | App.prototype.filter = function (plugin, opts, filter) { |
656 | // work around flumeview-query not picking the best index. |
657 | // %b+QdyLFQ21UGYwvV3AiD8FEr7mKlB8w9xx3h8WzSUb0=.sha256 |
658 | var limit = Number(opts.limit) |
659 | var index |
660 | if (plugin === this.sbot.backlinks) { |
661 | var c = filter && filter.value && filter.value.content |
662 | var filteringByType = c && c.type |
663 | if (opts.sortByTimestamp) index = 'DTA' |
664 | else if (filteringByType) index = 'DTS' |
665 | } |
666 | var filterOpts = { |
667 | $gt: opts.gt, |
668 | $lt: opts.lt, |
669 | } |
670 | return plugin.read({ |
671 | index: index, |
672 | reverse: opts.reverse, |
673 | limit: limit || undefined, |
674 | query: [{$filter: u.mergeOpts(filter, opts.sortByTimestamp ? { |
675 | value: { |
676 | timestamp: filterOpts |
677 | } |
678 | } : { |
679 | timestamp: filterOpts |
680 | })}] |
681 | }) |
682 | } |
683 | |
684 | App.prototype.filterMessages = function (opts) { |
685 | var self = this |
686 | var limit = Number(opts.limit) |
687 | return pull( |
688 | paramap(function (msg, cb) { |
689 | self.filterMsg(msg, opts, function (err, show) { |
690 | if (err) return cb(err) |
691 | cb(null, show ? msg : null) |
692 | }) |
693 | }, 4), |
694 | pull.filter(Boolean), |
695 | limit && pull.take(limit) |
696 | ) |
697 | } |
698 | |
699 | App.prototype.streamChannel = function (opts) { |
700 | // prefer ssb-backlinks to ssb-query because it also handles hashtag mentions |
701 | if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, { |
702 | dest: '#' + opts.channel, |
703 | }) |
704 | |
705 | if (this.sbot.query) return this.filter(this.sbot.query, opts, { |
706 | value: {content: {channel: opts.channel}}, |
707 | }) |
708 | |
709 | return pull.error(new Error( |
710 | 'Viewing channels/tags requires the ssb-backlinks or ssb-query plugin')) |
711 | } |
712 | |
713 | App.prototype.streamMentions = function (opts) { |
714 | if (!this.sbot.backlinks) return pull.error(new Error( |
715 | 'Viewing mentions requires the ssb-backlinks plugin')) |
716 | |
717 | if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, { |
718 | dest: this.sbot.id, |
719 | }) |
720 | } |
721 | |
722 | App.prototype.streamPrivate = function (opts) { |
723 | if (this.sbot.private && this.sbot.private.read) |
724 | return this.filter(this.sbot.private, opts, {}) |
725 | |
726 | return pull( |
727 | this.createLogStream(u.mergeOpts(opts)), |
728 | pull.filter(u.isMsgEncrypted), |
729 | this.unboxMessages(), |
730 | pull.filter(u.isMsgReadable) |
731 | ) |
732 | } |
733 | |
734 | App.prototype.blobMentions = function (opts) { |
735 | if (!this.sbot.links2) return pull.error(new Error( |
736 | 'missing ssb-links plugin')) |
737 | var filter = {rel: ['mentions', opts.name]} |
738 | if (opts.author) filter.source = opts.author |
739 | return this.sbot.links2.read({ |
740 | query: [ |
741 | {$filter: filter}, |
742 | {$filter: {dest: {$prefix: '&'}}}, |
743 | {$map: { |
744 | name: ['rel', 1], |
745 | size: ['rel', 2], |
746 | link: 'dest', |
747 | author: 'source', |
748 | time: 'ts' |
749 | }} |
750 | ] |
751 | }) |
752 | } |
753 | |
754 | App.prototype.monitorBlobWants = function () { |
755 | var self = this |
756 | self.blobWants = {} |
757 | pull( |
758 | this.sbot.blobs.createWants(), |
759 | pull.drain(function (wants) { |
760 | for (var id in wants) { |
761 | if (wants[id] < 0) self.blobWants[id] = true |
762 | else delete self.blobWants[id] |
763 | self.blobSizeCache.remove(id) |
764 | } |
765 | }, function (err) { |
766 | if (err) console.trace(err) |
767 | }) |
768 | ) |
769 | } |
770 | |
771 | App.prototype.getBlobState = function (id, cb) { |
772 | var self = this |
773 | if (self.blobWants[id]) return cb(null, 'wanted') |
774 | self.getBlobSize(id, function (err, size) { |
775 | if (err) return cb(err) |
776 | cb(null, size != null) |
777 | }) |
778 | } |
779 | |
780 | App.prototype.getNpmReadme = function (tarballId, cb) { |
781 | var self = this |
782 | // TODO: make this portable, and handle plaintext readmes |
783 | var tar = proc.spawn('tar', ['--ignore-case', '-Oxz', |
784 | 'package/README.md', 'package/readme.markdown', 'package/readme.mkd']) |
785 | var done = multicb({pluck: 1, spread: true}) |
786 | pull( |
787 | self.sbot.blobs.get(tarballId), |
788 | toPull.sink(tar.stdin, done()) |
789 | ) |
790 | pull( |
791 | toPull.source(tar.stdout), |
792 | pull.collect(done()) |
793 | ) |
794 | done(function (err, _, bufs) { |
795 | if (err) return cb(err) |
796 | var text = Buffer.concat(bufs).toString('utf8') |
797 | cb(null, text, true) |
798 | }) |
799 | } |
800 | |
801 | App.prototype.filterMsg = function (msg, opts, cb) { |
802 | var self = this |
803 | var myId = self.sbot.id |
804 | var author = msg.value && msg.value.author |
805 | var filter = opts.filter || self.msgFilter |
806 | if (filter === 'all') return cb(null, true) |
807 | var show = (filter !== 'invert') |
808 | var isPrivate = msg.value && typeof msg.value.content === 'string' |
809 | if (isPrivate && !self.showPrivates) return cb(null, !show) |
810 | if (author === myId |
811 | || author === opts.feed |
812 | || msg.key === opts.msgId) return cb(null, show) |
813 | self.follows.getFollows(myId, function (err, follows) { |
814 | if (err) return cb(err) |
815 | if (follows[author]) return cb(null, show) |
816 | self.getVotes(msg.key, function (err, votes) { |
817 | if (err) return cb(err) |
818 | for (var author in votes) { |
819 | if (follows[author] && votes[author] > 0) { |
820 | return cb(null, show) |
821 | } |
822 | } |
823 | return cb(null, !show) |
824 | }) |
825 | }) |
826 | } |
827 | |
828 | App.prototype.isFollowing = function (src, dest, cb) { |
829 | var self = this |
830 | self.follows.getFollows(src, function (err, follows) { |
831 | if (err) return cb(err) |
832 | return cb(null, follows[dest]) |
833 | }) |
834 | } |
835 | |
836 | App.prototype.getVotesStream = function (id) { |
837 | var links2 = this.sbot.links2 |
838 | if (links2 && links2.read) return links2.read({ |
839 | query: [ |
840 | {$filter: { |
841 | dest: id, |
842 | rel: [{$prefix: 'vote'}] |
843 | }}, |
844 | {$map: { |
845 | value: ['rel', 1], |
846 | author: 'source' |
847 | }} |
848 | ] |
849 | }) |
850 | |
851 | var backlinks = this.sbot.backlinks |
852 | if (backlinks && backlinks.read) return backlinks.read({ |
853 | query: [ |
854 | {$filter: { |
855 | dest: id, |
856 | value: { |
857 | content: { |
858 | type: 'vote', |
859 | vote: { |
860 | link: id |
861 | } |
862 | } |
863 | } |
864 | }}, |
865 | {$map: { |
866 | author: ['value', 'author'], |
867 | value: ['value', 'content', 'vote', 'value'] |
868 | }} |
869 | ] |
870 | }) |
871 | |
872 | return pull( |
873 | this.sbot.links({ |
874 | dest: id, |
875 | rel: 'vote', |
876 | keys: false, |
877 | meta: false, |
878 | values: true |
879 | }), |
880 | pull.map(function (value) { |
881 | var vote = value && value.content && value.content.vote |
882 | return { |
883 | author: value && value.author, |
884 | vote: vote && vote.value |
885 | } |
886 | }) |
887 | ) |
888 | } |
889 | |
890 | App.prototype._getVotes = function (id, cb) { |
891 | var votes = {} |
892 | pull( |
893 | this.getVotesStream(), |
894 | pull.drain(function (vote) { |
895 | votes[vote.author] = vote.value |
896 | }, function (err) { |
897 | cb(err, votes) |
898 | }) |
899 | ) |
900 | } |
901 | |
902 | App.prototype.getAddresses = function (id) { |
903 | if (!this.sbot.backlinks) { |
904 | if (!this.warned1) { |
905 | this.warned1 = true |
906 | console.trace('Getting peer addresses requires the ssb-backlinks plugin') |
907 | } |
908 | return pull.empty() |
909 | } |
910 | return pull( |
911 | this.sbot.backlinks.read({ |
912 | reverse: true, |
913 | query: [ |
914 | {$filter: { |
915 | dest: id, |
916 | value: { |
917 | content: { |
918 | type: 'pub', |
919 | address: { |
920 | key: id, |
921 | host: {$truthy: true}, |
922 | port: {$truthy: true}, |
923 | } |
924 | } |
925 | } |
926 | }}, |
927 | {$map: ['value', 'content', 'address']} |
928 | ] |
929 | }), |
930 | pull.map(function (addr) { |
931 | return addr.host + ':' + addr.port |
932 | }), |
933 | pull.unique() |
934 | ) |
935 | } |
936 | |
937 | App.prototype.getIdeaTitle = function (id, cb) { |
938 | if (!this.sbot.backlinks) return cb(null, String(id).substr(0, 8) + '…') |
939 | pull( |
940 | this.sbot.backlinks.read({ |
941 | reverse: true, |
942 | query: [ |
943 | {$filter: { |
944 | dest: id, |
945 | value: { |
946 | content: { |
947 | type: 'talenet-idea-update', |
948 | ideaKey: id, |
949 | title: {$truthy: true} |
950 | } |
951 | } |
952 | }}, |
953 | {$map: ['value', 'content', 'title']} |
954 | ] |
955 | }), |
956 | pull.take(1), |
957 | pull.collect(function (err, titles) { |
958 | if (err) return cb(err) |
959 | var title = titles && titles[0] |
960 | || (String(id).substr(0, 8) + '…') |
961 | cb(null, title) |
962 | }) |
963 | ) |
964 | } |
965 | |
966 | function traverse(obj, emit) { |
967 | emit(obj) |
968 | if (obj !== null && typeof obj === 'object') { |
969 | for (var k in obj) { |
970 | traverse(obj[k], emit) |
971 | } |
972 | } |
973 | } |
974 | |
975 | App.prototype.expandOoo = function (opts, cb) { |
976 | var self = this |
977 | var dest = opts.dest |
978 | var msgs = opts.msgs |
979 | if (!Array.isArray(msgs)) return cb(new TypeError('msgs should be array')) |
980 | |
981 | // algorithm: |
982 | // traverse all links in the initial message set. |
983 | // find linked-to messages not in the set. |
984 | // fetch those messages. |
985 | // if one links to the dest, add it to the set |
986 | // and look for more missing links to fetch. |
987 | // done when no more links to fetch |
988 | |
989 | var msgsO = {} |
990 | var getting = {} |
991 | var waiting = 0 |
992 | |
993 | function checkDone() { |
994 | if (waiting) return |
995 | var msgs = Object.keys(msgsO).map(function (key) { |
996 | return msgsO[key] |
997 | }) |
998 | cb(null, msgs) |
999 | } |
1000 | |
1001 | function getMsg(id) { |
1002 | if (msgsO[id] || getting[id]) return |
1003 | getting[id] = true |
1004 | waiting++ |
1005 | self.getMsgDecryptedOoo(id, function (err, msg) { |
1006 | waiting-- |
1007 | if (err) console.trace(err) |
1008 | else gotMsg(msg) |
1009 | checkDone() |
1010 | }) |
1011 | } |
1012 | |
1013 | var links = {} |
1014 | function addLink(id) { |
1015 | if (typeof id === 'string' && id[0] === '%' && u.isRef(id)) { |
1016 | links[id] = true |
1017 | } |
1018 | } |
1019 | |
1020 | msgs.forEach(function (msg) { |
1021 | if (msgs[msg.key]) return |
1022 | if (msg.value.content === false) return // missing root |
1023 | msgsO[msg.key] = msg |
1024 | traverse(msg, addLink) |
1025 | }) |
1026 | waiting++ |
1027 | for (var id in links) { |
1028 | getMsg(id) |
1029 | } |
1030 | waiting-- |
1031 | checkDone() |
1032 | |
1033 | function gotMsg(msg) { |
1034 | if (msgsO[msg.key]) return |
1035 | var links = [] |
1036 | var linkedToDest = msg.key === dest |
1037 | traverse(msg, function (id) { |
1038 | if (id === dest) linkedToDest = true |
1039 | links.push(id) |
1040 | }) |
1041 | if (linkedToDest) { |
1042 | msgsO[msg.key] = msg |
1043 | links.forEach(addLink) |
1044 | } |
1045 | } |
1046 | } |
1047 | |
1048 | App.prototype.getLineComments = function (opts, cb) { |
1049 | // get line comments for a git-update message and git object id. |
1050 | // line comments include message id, commit id and path |
1051 | // but we have message id and git object hash. |
1052 | // look up the git object hash for each line-comment |
1053 | // to verify that it is for the git object file we want |
1054 | var updateId = opts.obj.msg.key |
1055 | var objId = opts.hash |
1056 | var self = this |
1057 | var lineComments = {} |
1058 | pull( |
1059 | self.sbot.backlinks ? self.sbot.backlinks.read({ |
1060 | query: [ |
1061 | {$filter: { |
1062 | dest: updateId, |
1063 | value: { |
1064 | content: { |
1065 | type: 'line-comment', |
1066 | updateId: updateId, |
1067 | } |
1068 | } |
1069 | }} |
1070 | ] |
1071 | }) : pull( |
1072 | self.sbot.links({ |
1073 | dest: updateId, |
1074 | rel: 'updateId', |
1075 | values: true |
1076 | }), |
1077 | pull.filter(function (msg) { |
1078 | var c = msg && msg.value && msg.value.content |
1079 | return c && c.type === 'line-comment' |
1080 | && c.updateId === updateId |
1081 | }) |
1082 | ), |
1083 | paramap(function (msg, cb) { |
1084 | var c = msg.value.content |
1085 | self.git.getObjectAtPath({ |
1086 | msg: updateId, |
1087 | obj: c.commitId, |
1088 | path: c.filePath, |
1089 | }, function (err, info) { |
1090 | if (err) return cb(err) |
1091 | cb(null, { |
1092 | obj: info.obj, |
1093 | hash: info.hash, |
1094 | msg: msg, |
1095 | }) |
1096 | }) |
1097 | }, 4), |
1098 | pull.filter(function (info) { |
1099 | return info.hash === objId |
1100 | }), |
1101 | pull.drain(function (info) { |
1102 | lineComments[info.msg.value.content.line] = info |
1103 | }, function (err) { |
1104 | cb(err, lineComments) |
1105 | }) |
1106 | ) |
1107 | } |
1108 | |
1109 | App.prototype.sbotLinks = function (opts) { |
1110 | if (!this.sbot.links) return pull.error(new Error('missing sbot.links')) |
1111 | return this.sbot.links(opts) |
1112 | } |
1113 | |
1114 | App.prototype.sbotCreateUserStream = function (opts) { |
1115 | if (!this.sbot.createUserStream) return pull.error(new Error('missing sbot.createUserStream')) |
1116 | return this.sbot.createUserStream(opts) |
1117 | } |
1118 | |
1119 | App.prototype.sbotMessagesByType = function (opts) { |
1120 | if (!this.sbot.messagesByType) return pull.error(new Error('missing sbot.messagesByType')) |
1121 | return this.sbot.messagesByType(opts) |
1122 | } |
1123 | |
1124 | App.prototype.getThread = function (msg) { |
1125 | return cat([ |
1126 | pull.once(msg), |
1127 | this.sbot.backlinks ? this.sbot.backlinks.read({ |
1128 | query: [ |
1129 | {$filter: {dest: msg.key}} |
1130 | ] |
1131 | }) : this.sbotLinks({ |
1132 | dest: msg.key, |
1133 | values: true |
1134 | }) |
1135 | ]) |
1136 | } |
1137 | |
1138 | App.prototype.getShard = function (id, cb) { |
1139 | var self = this |
1140 | this.getMsgDecrypted(id, function (err, msg) { |
1141 | if (err) return cb(new Error('Unable to get shard message: ' + err.stack)) |
1142 | var c = msg.value.content || {} |
1143 | if (!c.shard) return cb(new Error('Message missing shard: ' + id)) |
1144 | self.unboxContent(c.shard, function (err, shard) { |
1145 | if (err) return cb(new Error('Unable to decrypt shard: ' + err.stack)) |
1146 | cb(null, shard) |
1147 | }) |
1148 | }) |
1149 | } |
1150 | |
1151 | App.prototype.sbotStatus = function (cb) { |
1152 | /* sbot.status is a "sync" method. if we are a plugin, it is sync. if we are |
1153 | * calling over muxrpc, it is async. */ |
1154 | var status |
1155 | try { |
1156 | status = this.sbot.status(cb) |
1157 | } catch(err) { |
1158 | return cb(err) |
1159 | } |
1160 | if (typeof status === 'object' && status !== null) return cb(null, status) |
1161 | } |
1162 | |
1163 | function writeAll(fd, buf, cb) { |
1164 | var offset = 0 |
1165 | var remaining = buf.length |
1166 | fs.write(fd, buf, function onWrite(err, bytesWritten) { |
1167 | if (err) return cb(err) |
1168 | offset += bytesWritten |
1169 | remaining -= bytesWritten |
1170 | if (remaining > 0) fs.write(fd, buf, offset, remaining, null, onWrite) |
1171 | else cb() |
1172 | }) |
1173 | } |
1174 | |
1175 | App.prototype.verifyGitObjectSignature = function (obj, cb) { |
1176 | var self = this |
1177 | var tmpPath = path.join(os.tmpdir(), '.git_vtag_tmp' + Math.random().toString('36')) |
1178 | // use a temp file to work around https://github.com/nodejs/node/issues/13542 |
1179 | function closeEnd(err) { |
1180 | fs.close(fd, function (err1) { |
1181 | fs.unlink(tmpPath, function (err2) { |
1182 | cb(err2 || err1 || err) |
1183 | }) |
1184 | }) |
1185 | } |
1186 | fs.open(tmpPath, 'w+', function (err, fd) { |
1187 | if (err) return cb(err) |
1188 | self.git.extractSignature(obj, function (err, parts) { |
1189 | if (err) return closeEnd(err) |
1190 | writeAll(fd, parts.signature, function (err) { |
1191 | if (err) return closeEnd(err) |
1192 | try { next(fd, parts) } |
1193 | catch(e) { closeEnd(e) } |
1194 | }) |
1195 | }) |
1196 | }) |
1197 | function next(fd, parts) { |
1198 | var readSig = fs.createReadStream(null, {fd: fd, start: 0}) |
1199 | var done = multicb({pluck: 1, spread: true}) |
1200 | var gpg = proc.spawn('gpg', ['--status-fd=1', '--keyid-format=long', |
1201 | '--verify', '/dev/fd/3', '-'], { |
1202 | stdio: ['pipe', 'pipe', 'pipe', readSig] |
1203 | }).on('close', done().bind(null, null)) |
1204 | .on('error', console.error.bind(console, 'gpg')) |
1205 | gpg.stdin.end(parts.payload) |
1206 | pull(toPull.source(gpg.stdout), u.pullConcat(done())) |
1207 | pull(toPull.source(gpg.stderr), u.pullConcat(done())) |
1208 | done(function (err, code, status, output) { |
1209 | if (err) return closeEnd(err) |
1210 | fs.unlink(tmpPath, function (err) { |
1211 | if (err) return cb(err) |
1212 | cb(null, { |
1213 | goodsig: status.includes('\n[GNUPG:] GOODSIG '), |
1214 | status: status.toString(), |
1215 | output: output.toString() |
1216 | }) |
1217 | }) |
1218 | }) |
1219 | } |
1220 | } |
1221 |
Built with git-ssb-web