Files: 946379b16a68ffd636d8fed3925367cfa8e2e6f6 / lib / app.js
27381 bytesRaw
1 | var http = require('http') |
2 | var memo = require('asyncmemo') |
3 | var lru = require('hashlru') |
4 | var pkg = require('../package') |
5 | var u = require('./util') |
6 | var pull = require('pull-stream') |
7 | var multicb = require('multicb') |
8 | var paramap = require('pull-paramap') |
9 | var Contacts = require('ssb-contact') |
10 | var About = require('./about') |
11 | var Follows = require('./follows') |
12 | var Serve = require('./serve') |
13 | var Render = require('./render') |
14 | var Git = require('ssb-git') |
15 | var cat = require('pull-cat') |
16 | var proc = require('child_process') |
17 | var toPull = require('stream-to-pull-stream') |
18 | var BoxStream = require('pull-box-stream') |
19 | var crypto = require('crypto') |
20 | var SsbNpmRegistry = require('ssb-npm-registry') |
21 | |
22 | var zeros = new Buffer(24); zeros.fill(0) |
23 | |
24 | module.exports = App |
25 | |
26 | function App(sbot, config) { |
27 | this.sbot = sbot |
28 | this.config = config |
29 | |
30 | var conf = config.patchfoo || {} |
31 | this.port = conf.port || 8027 |
32 | this.host = conf.host || 'localhost' |
33 | this.msgFilter = conf.filter |
34 | this.showPrivates = conf.showPrivates == null ? true : conf.showPrivates |
35 | this.previewVotes = conf.previewVotes == null ? false : conf.previewVotes |
36 | this.previewContacts = conf.previewContacts == null ? false : conf.previewContacts |
37 | this.useOoo = conf.ooo == null ? false : conf.ooo |
38 | |
39 | var base = conf.base || '/' |
40 | this.opts = { |
41 | base: base, |
42 | blob_base: conf.blob_base || conf.img_base || base, |
43 | img_base: conf.img_base || (base + 'image/'), |
44 | emoji_base: conf.emoji_base || (base + 'emoji/'), |
45 | encode_msgids: conf.encode_msgids == null ? true : Boolean(conf.encode_msgids), |
46 | codeInTextareas: conf.codeInTextareas, |
47 | } |
48 | |
49 | this.about = new About(this, sbot.id) |
50 | this.msgCache = lru(100) |
51 | this.getMsg = memo({cache: this.msgCache}, getMsgWithValue, sbot) |
52 | this.getMsgOoo = memo({cache: this.msgCache}, this.getMsgOoo) |
53 | this.getAbout = memo({cache: this.aboutCache = lru(500)}, |
54 | this._getAbout.bind(this)) |
55 | this.unboxContent = memo({cache: lru(100)}, function(value, cb){sbot.private.unbox(value, cb)}) |
56 | this.reverseNameCache = lru(500) |
57 | this.reverseEmojiNameCache = lru(500) |
58 | this.getBlobSize = memo({cache: this.blobSizeCache = lru(100)}, |
59 | sbot.blobs.size.bind(sbot.blobs)) |
60 | this.getVotes = memo({cache: lru(100)}, this._getVotes.bind(this)) |
61 | this.getIdeaTitle = memo({cache: lru(100)}, this.getIdeaTitle) |
62 | |
63 | this.unboxMsg = this.unboxMsg.bind(this) |
64 | |
65 | this.render = new Render(this, this.opts) |
66 | this.git = new Git(this.sbot, this.config) |
67 | this.contacts = new Contacts(this.sbot) |
68 | this.follows = new Follows(this.sbot, this.contacts) |
69 | this.serveSsbNpmRegistry = SsbNpmRegistry.respond(this.sbot, this.config) |
70 | |
71 | this.monitorBlobWants() |
72 | } |
73 | |
74 | App.prototype.go = function () { |
75 | var self = this |
76 | var server = http.createServer(function (req, res) { |
77 | new Serve(self, req, res).go() |
78 | }) |
79 | if (self.host === 'localhost') server.listen(self.port, onListening) |
80 | else server.listen(self.port, self.host, onListening) |
81 | function onListening() { |
82 | var host = /:/.test(self.host) ? '[' + self.host + ']' : self.host |
83 | self.log('Listening on http://' + host + ':' + self.port) |
84 | } |
85 | |
86 | // invalidate cached About info when new About messages come in |
87 | pull( |
88 | self.sbot.links({rel: 'about', old: false, values: true}), |
89 | pull.drain(function (link) { |
90 | self.aboutCache.remove(link.dest) |
91 | }, function (err) { |
92 | if (err) throw err |
93 | }) |
94 | ) |
95 | |
96 | // keep alive ssb client connection |
97 | setInterval(self.sbot.whoami, 10e3) |
98 | } |
99 | |
100 | var logPrefix = '[' + pkg.name + ']' |
101 | App.prototype.log = console.log.bind(console, logPrefix) |
102 | App.prototype.error = console.error.bind(console, logPrefix) |
103 | |
104 | App.prototype.unboxMsg = function (msg, cb) { |
105 | var self = this |
106 | var c = msg && msg.value && msg.value.content |
107 | if (typeof c !== 'string') cb(null, msg) |
108 | else self.unboxContent(c, function (err, content) { |
109 | if (err) { |
110 | self.error('unbox:', err) |
111 | return cb(null, msg) |
112 | } else if (!content) { |
113 | return cb(null, msg) |
114 | } |
115 | var m = {} |
116 | for (var k in msg) m[k] = msg[k] |
117 | m.value = {} |
118 | for (var k in msg.value) m.value[k] = msg.value[k] |
119 | m.value.content = content |
120 | m.value.private = true |
121 | cb(null, m) |
122 | }) |
123 | } |
124 | |
125 | App.prototype.search = function (opts) { |
126 | var fsearch = this.sbot.fulltext && this.sbot.fulltext.search |
127 | if (fsearch) return fsearch(opts) |
128 | var search = this.sbot.search && this.sbot.search.query |
129 | if (search) return search({query: opts}) |
130 | return pull.error(new Error('Search needs ssb-fulltext or ssb-search plugin')) |
131 | } |
132 | |
133 | App.prototype.advancedSearch = function (opts) { |
134 | return pull( |
135 | opts.channel ? |
136 | this.sbot.backlinks.read({ |
137 | dest: '#' + opts.channel, |
138 | reverse: true, |
139 | }) |
140 | : opts.dest ? |
141 | this.sbot.links({ |
142 | values: true, |
143 | dest: opts.dest, |
144 | source: opts.source || undefined, |
145 | reverse: true, |
146 | }) |
147 | : opts.source ? |
148 | this.sbot.createUserStream({ |
149 | reverse: true, |
150 | id: opts.source |
151 | }) |
152 | : |
153 | this.sbot.createFeedStream({ |
154 | reverse: true, |
155 | }), |
156 | this.unboxMessages(), |
157 | opts.text && pull.filter(filterByText(opts.text)) |
158 | ) |
159 | } |
160 | |
161 | function forSome(each) { |
162 | return function some(obj) { |
163 | if (obj == null) return false |
164 | if (typeof obj === 'string') return each(obj) |
165 | if (Array.isArray(obj)) return obj.some(some) |
166 | if (typeof obj === 'object') |
167 | for (var k in obj) if (some(obj[k])) return true |
168 | return false |
169 | } |
170 | } |
171 | |
172 | function filterByText(str) { |
173 | if (!str) return function () { return true } |
174 | var search = new RegExp(str, 'i') |
175 | var matches = forSome(search.test.bind(search)) |
176 | return function (msg) { |
177 | var c = msg.value.content |
178 | return c && matches(c) |
179 | } |
180 | } |
181 | |
182 | App.prototype.getMsgDecrypted = function (key, cb) { |
183 | var self = this |
184 | this.getMsg(key, function (err, msg) { |
185 | if (err) return cb(err) |
186 | self.unboxMsg(msg, cb) |
187 | }) |
188 | } |
189 | |
190 | App.prototype.getMsgOoo = function (key, cb) { |
191 | var ooo = this.sbot.ooo |
192 | if (!ooo) return cb(new Error('missing ssb-ooo plugin')) |
193 | ooo.get(key, cb) |
194 | } |
195 | |
196 | App.prototype.getMsgDecryptedOoo = function (key, cb) { |
197 | var self = this |
198 | this.getMsgOoo(key, function (err, msg) { |
199 | if (err) return cb(err) |
200 | self.unboxMsg(msg, cb) |
201 | }) |
202 | } |
203 | |
204 | App.prototype.publish = function (content, cb) { |
205 | var self = this |
206 | function tryPublish(triesLeft) { |
207 | if (Array.isArray(content.recps)) { |
208 | recps = content.recps.map(u.linkDest) |
209 | self.sbot.private.publish(content, recps, next) |
210 | } else { |
211 | self.sbot.publish(content, next) |
212 | } |
213 | function next(err, msg) { |
214 | if (err) { |
215 | if (triesLeft > 0) { |
216 | if (/^expected previous:/.test(err.message)) { |
217 | return tryPublish(triesLeft-1) |
218 | } |
219 | } |
220 | } |
221 | return cb(err, msg) |
222 | } |
223 | } |
224 | tryPublish(2) |
225 | } |
226 | |
227 | App.prototype.wantSizeBlob = function (id, cb) { |
228 | // only want() the blob if we don't already have it |
229 | var self = this |
230 | var blobs = this.sbot.blobs |
231 | blobs.size(id, function (err, size) { |
232 | if (size != null) return cb(null, size) |
233 | self.blobWants[id] = true |
234 | blobs.want(id, function (err) { |
235 | if (err) return cb(err) |
236 | blobs.size(id, cb) |
237 | }) |
238 | }) |
239 | } |
240 | |
241 | App.prototype.addBlobRaw = function (cb) { |
242 | var done = multicb({pluck: 1, spread: true}) |
243 | var sink = pull( |
244 | u.pullLength(done()), |
245 | this.sbot.blobs.add(done()) |
246 | ) |
247 | done(function (err, size, hash) { |
248 | if (err) return cb(err) |
249 | cb(null, {link: hash, size: size}) |
250 | }) |
251 | return sink |
252 | } |
253 | |
254 | App.prototype.addBlob = function (isPrivate, cb) { |
255 | if (!isPrivate) return this.addBlobRaw(cb) |
256 | else return this.addBlobPrivate(cb) |
257 | } |
258 | |
259 | App.prototype.addBlobPrivate = function (cb) { |
260 | var bufs = [] |
261 | var self = this |
262 | // use the hash of the cleartext as the key to encrypt the blob |
263 | var hash = crypto.createHash('sha256') |
264 | return pull.drain(function (buf) { |
265 | bufs.push(buf) |
266 | hash.update(buf) |
267 | }, function (err) { |
268 | if (err) return cb(err) |
269 | var secret = hash.digest() |
270 | pull( |
271 | pull.values(bufs), |
272 | BoxStream.createBoxStream(secret, zeros), |
273 | self.addBlobRaw(function (err, link) { |
274 | if (err) return cb(err) |
275 | link.key = secret.toString('base64') |
276 | cb(null, link) |
277 | }) |
278 | ) |
279 | }) |
280 | } |
281 | |
282 | App.prototype.getBlob = function (id, key) { |
283 | if (!key) return this.sbot.blobs.get(id) |
284 | if (typeof key === 'string') key = new Buffer(key, 'base64') |
285 | return pull( |
286 | this.sbot.blobs.get(id), |
287 | BoxStream.createUnboxStream(key, zeros) |
288 | ) |
289 | } |
290 | |
291 | App.prototype.pushBlob = function (id, cb) { |
292 | console.error('pushing blob', id) |
293 | this.sbot.blobs.push(id, cb) |
294 | } |
295 | |
296 | App.prototype.readBlob = function (link) { |
297 | link = u.toLink(link) |
298 | return this.sbot.blobs.get({ |
299 | hash: link.link, |
300 | size: link.size, |
301 | }) |
302 | } |
303 | |
304 | App.prototype.readBlobSlice = function (link, opts) { |
305 | if (this.sbot.blobs.getSlice) return this.sbot.blobs.getSlice({ |
306 | hash: link.link, |
307 | size: link.size, |
308 | start: opts.start, |
309 | end: opts.end, |
310 | }) |
311 | return pull( |
312 | this.readBlob(link), |
313 | u.pullSlice(opts.start, opts.end) |
314 | ) |
315 | } |
316 | |
317 | App.prototype.ensureHasBlobs = function (links, cb) { |
318 | var self = this |
319 | var done = multicb({pluck: 1}) |
320 | links.filter(Boolean).forEach(function (link) { |
321 | var cb = done() |
322 | self.sbot.blobs.size(link.link, function (err, size) { |
323 | if (err) cb(err) |
324 | else if (size == null) cb(null, link) |
325 | else cb() |
326 | }) |
327 | }) |
328 | done(function (err, missingLinks) { |
329 | if (err) console.trace(err) |
330 | missingLinks = missingLinks.filter(Boolean) |
331 | if (missingLinks.length == 0) return cb() |
332 | return cb({name: 'BlobNotFoundError', links: missingLinks}) |
333 | }) |
334 | } |
335 | |
336 | App.prototype.getReverseNameSync = function (name) { |
337 | var id = this.reverseNameCache.get(name) |
338 | return id |
339 | } |
340 | |
341 | App.prototype.getReverseEmojiNameSync = function (name) { |
342 | return this.reverseEmojiNameCache.get(name) |
343 | } |
344 | |
345 | App.prototype.getNameSync = function (name) { |
346 | var about = this.aboutCache.get(name) |
347 | return about && about.name |
348 | } |
349 | |
350 | function sbotGet(sbot, id, cb) { |
351 | // try sbot.get via ssb-ooo a50da3928500f3ac0fbead0a1b335a3dd5bbc096 first |
352 | sbot.get({id: id, raw: true}, function (err, value) { |
353 | if (err && err.message === 'Param 0 must by of type number') { |
354 | return sbot.get(id, cb) |
355 | } |
356 | cb(err, value) |
357 | }) |
358 | } |
359 | |
360 | function getMsgWithValue(sbot, id, cb) { |
361 | if (!id) return cb() |
362 | sbotGet(sbot, id, function (err, value) { |
363 | if (err) return cb(err) |
364 | cb(null, {key: id, value: value}) |
365 | }) |
366 | } |
367 | |
368 | App.prototype._getAbout = function (id, cb) { |
369 | var self = this |
370 | if (!u.isRef(id)) return cb(null, {}) |
371 | self.about.get(id, function (err, about) { |
372 | if (err) return cb(err) |
373 | var sigil = id[0] || '@' |
374 | if (about.name && about.name[0] !== sigil) { |
375 | about.name = sigil + about.name |
376 | } |
377 | self.reverseNameCache.set(about.name, id) |
378 | cb(null, about) |
379 | }) |
380 | } |
381 | |
382 | App.prototype.pullGetMsg = function (id) { |
383 | return pull.asyncMap(this.getMsg)(pull.once(id)) |
384 | } |
385 | |
386 | App.prototype.createLogStream = function (opts) { |
387 | opts = opts || {} |
388 | return opts.sortByTimestamp |
389 | ? this.createFeedStream(opts) |
390 | : this.sbot.createLogStream(opts) |
391 | } |
392 | |
393 | App.prototype.createFeedStream = function (opts) { |
394 | // work around opts.gt being treated as opts.gte sometimes |
395 | return pull( |
396 | this.sbot.createFeedStream(opts), |
397 | pull.filter(function (msg) { |
398 | var ts = msg && msg.value && msg.value.timestamp |
399 | return typeof ts === 'number' && ts !== opts.gt && ts !== opts.lt |
400 | }) |
401 | ) |
402 | } |
403 | |
404 | var stateVals = { |
405 | connected: 3, |
406 | connecting: 2, |
407 | disconnecting: 1, |
408 | } |
409 | |
410 | function comparePeers(a, b) { |
411 | var aState = stateVals[a.state] || 0 |
412 | var bState = stateVals[b.state] || 0 |
413 | return (bState - aState) |
414 | || (b.stateChange|0 - a.stateChange|0) |
415 | } |
416 | |
417 | App.prototype.streamPeers = function (opts) { |
418 | var gossip = this.sbot.gossip |
419 | return u.readNext(function (cb) { |
420 | gossip.peers(function (err, peers) { |
421 | if (err) return cb(err) |
422 | if (opts) peers = peers.filter(function (peer) { |
423 | for (var k in opts) if (opts[k] !== peer[k]) return false |
424 | return true |
425 | }) |
426 | peers.sort(comparePeers) |
427 | cb(null, pull.values(peers)) |
428 | }) |
429 | }) |
430 | } |
431 | |
432 | App.prototype.getContact = function (source, dest, cb) { |
433 | var self = this |
434 | pull( |
435 | self.sbot.links({source: source, dest: dest, rel: 'contact', reverse: true, |
436 | values: true, meta: false, keys: false}), |
437 | pull.filter(function (value) { |
438 | var c = value && value.content |
439 | return c && c.type === 'contact' |
440 | }), |
441 | pull.take(1), |
442 | pull.reduce(function (acc, value) { |
443 | // trinary logic from ssb-friends |
444 | return value.content.following ? true |
445 | : value.content.flagged || value.content.blocking ? false |
446 | : null |
447 | }, null, cb) |
448 | ) |
449 | } |
450 | |
451 | App.prototype.unboxMessages = function () { |
452 | return paramap(this.unboxMsg, 16) |
453 | } |
454 | |
455 | App.prototype.streamChannels = function (opts) { |
456 | return pull( |
457 | this.sbot.messagesByType({type: 'channel', reverse: true}), |
458 | this.unboxMessages(), |
459 | pull.filter(function (msg) { |
460 | return msg.value.content.subscribed |
461 | }), |
462 | pull.map(function (msg) { |
463 | return msg.value.content.channel |
464 | }), |
465 | pull.unique() |
466 | ) |
467 | } |
468 | |
469 | App.prototype.streamMyChannels = function (id, opts) { |
470 | // use ssb-query plugin if it is available, since it has an index for |
471 | // author + type |
472 | if (this.sbot.query) return pull( |
473 | this.sbot.query.read({ |
474 | reverse: true, |
475 | query: [ |
476 | {$filter: { |
477 | value: { |
478 | author: id, |
479 | content: {type: 'channel'} |
480 | } |
481 | }}, |
482 | {$map: ['value', 'content']} |
483 | ] |
484 | }), |
485 | pull.unique('channel'), |
486 | pull.filter('subscribed'), |
487 | pull.map('channel') |
488 | ) |
489 | |
490 | return pull( |
491 | this.sbot.createUserStream({id: id, reverse: true}), |
492 | this.unboxMessages(), |
493 | pull.map(function (msg) { |
494 | return msg.value.content |
495 | }), |
496 | pull.filter(function (c) { |
497 | return c.type === 'channel' |
498 | }), |
499 | pull.unique('channel'), |
500 | pull.filter('subscribed'), |
501 | pull.map('channel') |
502 | ) |
503 | } |
504 | |
505 | App.prototype.streamTags = function () { |
506 | return pull( |
507 | this.sbot.messagesByType({type: 'tag', reverse: true}), |
508 | this.unboxMessages(), |
509 | pull.filter(function (msg) { |
510 | return !msg.value.content.message |
511 | }) |
512 | ) |
513 | } |
514 | |
515 | function compareVoted(a, b) { |
516 | return b.value - a.value |
517 | } |
518 | |
519 | App.prototype.getVoted = function (_opts, cb) { |
520 | if (isNaN(_opts.limit)) return pull.error(new Error('missing limit')) |
521 | var self = this |
522 | var opts = { |
523 | type: 'vote', |
524 | limit: _opts.limit * 100, |
525 | reverse: !!_opts.reverse, |
526 | gt: _opts.gt || undefined, |
527 | lt: _opts.lt || undefined, |
528 | } |
529 | |
530 | var votedObj = {} |
531 | var votedArray = [] |
532 | var numItems = 0 |
533 | var firstTimestamp, lastTimestamp |
534 | pull( |
535 | self.sbot.messagesByType(opts), |
536 | self.unboxMessages(), |
537 | pull.take(function () { |
538 | return numItems < _opts.limit |
539 | }), |
540 | pull.drain(function (msg) { |
541 | if (!firstTimestamp) firstTimestamp = msg.timestamp |
542 | lastTimestamp = msg.timestamp |
543 | var vote = msg.value.content.vote |
544 | if (!vote) return |
545 | var target = u.linkDest(vote) |
546 | var votes = votedObj[target] |
547 | if (!votes) { |
548 | numItems++ |
549 | votes = {id: target, value: 0, feedsObj: {}, feeds: []} |
550 | votedObj[target] = votes |
551 | votedArray.push(votes) |
552 | } |
553 | if (msg.value.author in votes.feedsObj) { |
554 | if (!opts.reverse) return // leave latest vote value as-is |
555 | // remove old vote value |
556 | votes.value -= votes.feedsObj[msg.value.author] |
557 | } else { |
558 | votes.feeds.push(msg.value.author) |
559 | } |
560 | var value = vote.value > 0 ? 1 : vote.value < 0 ? -1 : 0 |
561 | votes.feedsObj[msg.value.author] = value |
562 | votes.value += value |
563 | }, function (err) { |
564 | if (err && err !== true) return cb(err) |
565 | var items = votedArray |
566 | if (opts.reverse) items.reverse() |
567 | items.sort(compareVoted) |
568 | cb(null, {items: items, |
569 | firstTimestamp: firstTimestamp, |
570 | lastTimestamp: lastTimestamp}) |
571 | }) |
572 | ) |
573 | } |
574 | |
575 | App.prototype.createAboutStreams = function (id) { |
576 | return this.about.createAboutStreams(id) |
577 | } |
578 | |
579 | App.prototype.streamEmojis = function () { |
580 | return pull( |
581 | cat([ |
582 | this.sbot.links({ |
583 | rel: 'mentions', |
584 | source: this.sbot.id, |
585 | dest: '&', |
586 | values: true |
587 | }), |
588 | this.sbot.links({rel: 'mentions', dest: '&', values: true}) |
589 | ]), |
590 | this.unboxMessages(), |
591 | pull.map(function (msg) { return msg.value.content.mentions }), |
592 | pull.flatten(), |
593 | pull.filter('emoji'), |
594 | pull.unique('link') |
595 | ) |
596 | } |
597 | |
598 | App.prototype.filter = function (plugin, opts, filter) { |
599 | // work around flumeview-query not picking the best index. |
600 | // %b+QdyLFQ21UGYwvV3AiD8FEr7mKlB8w9xx3h8WzSUb0=.sha256 |
601 | var limit = Number(opts.limit) |
602 | var index |
603 | if (plugin === this.sbot.backlinks) { |
604 | var c = filter && filter.value && filter.value.content |
605 | var filteringByType = c && c.type |
606 | if (opts.sortByTimestamp) index = 'DTA' |
607 | else if (filteringByType) index = 'DTS' |
608 | } |
609 | var filterOpts = { |
610 | $gt: opts.gt, |
611 | $lt: opts.lt, |
612 | } |
613 | return plugin.read({ |
614 | index: index, |
615 | reverse: opts.reverse, |
616 | limit: limit || undefined, |
617 | query: [{$filter: u.mergeOpts(filter, opts.sortByTimestamp ? { |
618 | value: { |
619 | timestamp: filterOpts |
620 | } |
621 | } : { |
622 | timestamp: filterOpts |
623 | })}] |
624 | }) |
625 | } |
626 | |
627 | App.prototype.filterMessages = function (opts) { |
628 | var self = this |
629 | var limit = Number(opts.limit) |
630 | return pull( |
631 | paramap(function (msg, cb) { |
632 | self.filterMsg(msg, opts, function (err, show) { |
633 | if (err) return cb(err) |
634 | cb(null, show ? msg : null) |
635 | }) |
636 | }, 4), |
637 | pull.filter(Boolean), |
638 | limit && pull.take(limit) |
639 | ) |
640 | } |
641 | |
642 | App.prototype.streamChannel = function (opts) { |
643 | // prefer ssb-backlinks to ssb-query because it also handles hashtag mentions |
644 | if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, { |
645 | dest: '#' + opts.channel, |
646 | }) |
647 | |
648 | if (this.sbot.query) return this.filter(this.sbot.query, opts, { |
649 | value: {content: {channel: opts.channel}}, |
650 | }) |
651 | |
652 | return pull.error(new Error( |
653 | 'Viewing channels/tags requires the ssb-backlinks or ssb-query plugin')) |
654 | } |
655 | |
656 | App.prototype.streamMentions = function (opts) { |
657 | if (!this.sbot.backlinks) return pull.error(new Error( |
658 | 'Viewing mentions requires the ssb-backlinks plugin')) |
659 | |
660 | if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, { |
661 | dest: this.sbot.id, |
662 | }) |
663 | } |
664 | |
665 | App.prototype.streamPrivate = function (opts) { |
666 | if (this.sbot.private && this.sbot.private.read) |
667 | return this.filter(this.sbot.private, opts, {}) |
668 | |
669 | return pull( |
670 | this.createLogStream(u.mergeOpts(opts)), |
671 | pull.filter(u.isMsgEncrypted), |
672 | this.unboxMessages(), |
673 | pull.filter(u.isMsgReadable) |
674 | ) |
675 | } |
676 | |
677 | App.prototype.blobMentions = function (opts) { |
678 | if (!this.sbot.links2) return pull.error(new Error( |
679 | 'missing ssb-links plugin')) |
680 | var filter = {rel: ['mentions', opts.name]} |
681 | if (opts.author) filter.source = opts.author |
682 | return this.sbot.links2.read({ |
683 | query: [ |
684 | {$filter: filter}, |
685 | {$filter: {dest: {$prefix: '&'}}}, |
686 | {$map: { |
687 | name: ['rel', 1], |
688 | size: ['rel', 2], |
689 | link: 'dest', |
690 | author: 'source', |
691 | time: 'ts' |
692 | }} |
693 | ] |
694 | }) |
695 | } |
696 | |
697 | App.prototype.monitorBlobWants = function () { |
698 | var self = this |
699 | self.blobWants = {} |
700 | pull( |
701 | this.sbot.blobs.createWants(), |
702 | pull.drain(function (wants) { |
703 | for (var id in wants) { |
704 | if (wants[id] < 0) self.blobWants[id] = true |
705 | else delete self.blobWants[id] |
706 | self.blobSizeCache.remove(id) |
707 | } |
708 | }, function (err) { |
709 | if (err) console.trace(err) |
710 | }) |
711 | ) |
712 | } |
713 | |
714 | App.prototype.getBlobState = function (id, cb) { |
715 | var self = this |
716 | if (self.blobWants[id]) return cb(null, 'wanted') |
717 | self.getBlobSize(id, function (err, size) { |
718 | if (err) return cb(err) |
719 | cb(null, size != null) |
720 | }) |
721 | } |
722 | |
723 | App.prototype.getNpmReadme = function (tarballId, cb) { |
724 | var self = this |
725 | // TODO: make this portable, and handle plaintext readmes |
726 | var tar = proc.spawn('tar', ['--ignore-case', '-Oxz', |
727 | 'package/README.md', 'package/readme.markdown', 'package/readme.mkd']) |
728 | var done = multicb({pluck: 1, spread: true}) |
729 | pull( |
730 | self.sbot.blobs.get(tarballId), |
731 | toPull.sink(tar.stdin, done()) |
732 | ) |
733 | pull( |
734 | toPull.source(tar.stdout), |
735 | pull.collect(done()) |
736 | ) |
737 | done(function (err, _, bufs) { |
738 | if (err) return cb(err) |
739 | var text = Buffer.concat(bufs).toString('utf8') |
740 | cb(null, text, true) |
741 | }) |
742 | } |
743 | |
744 | App.prototype.filterMsg = function (msg, opts, cb) { |
745 | var self = this |
746 | var myId = self.sbot.id |
747 | var author = msg.value && msg.value.author |
748 | var filter = opts.filter || self.msgFilter |
749 | if (filter === 'all') return cb(null, true) |
750 | var show = (filter !== 'invert') |
751 | var isPrivate = msg.value && typeof msg.value.content === 'string' |
752 | if (isPrivate && !self.showPrivates) return cb(null, !show) |
753 | if (author === myId |
754 | || author === opts.feed |
755 | || msg.key === opts.msgId) return cb(null, show) |
756 | self.follows.getFollows(myId, function (err, follows) { |
757 | if (err) return cb(err) |
758 | if (follows[author]) return cb(null, show) |
759 | self.getVotes(msg.key, function (err, votes) { |
760 | if (err) return cb(err) |
761 | for (var author in votes) { |
762 | if (follows[author] && votes[author] > 0) { |
763 | return cb(null, show) |
764 | } |
765 | } |
766 | return cb(null, !show) |
767 | }) |
768 | }) |
769 | } |
770 | |
771 | App.prototype.isFollowing = function (src, dest, cb) { |
772 | var self = this |
773 | self.follows.getFollows(src, function (err, follows) { |
774 | if (err) return cb(err) |
775 | return cb(null, follows[dest]) |
776 | }) |
777 | } |
778 | |
779 | App.prototype._getVotes = function (id, cb) { |
780 | var votes = {} |
781 | pull( |
782 | this.sbot.links2.read({ |
783 | query: [ |
784 | {$filter: { |
785 | dest: id, |
786 | rel: [{$prefix: 'vote'}] |
787 | }}, |
788 | {$map: { |
789 | value: ['rel', 1], |
790 | author: 'source' |
791 | }} |
792 | ] |
793 | }), |
794 | pull.drain(function (vote) { |
795 | votes[vote.author] = vote.value |
796 | }, function (err) { |
797 | cb(err, votes) |
798 | }) |
799 | ) |
800 | } |
801 | |
802 | App.prototype.getAddresses = function (id) { |
803 | if (!this.sbot.backlinks) { |
804 | if (!this.warned1) { |
805 | this.warned1 = true |
806 | console.trace('Getting peer addresses requires the ssb-backlinks plugin') |
807 | } |
808 | return pull.empty() |
809 | } |
810 | return pull( |
811 | this.sbot.backlinks.read({ |
812 | reverse: true, |
813 | query: [ |
814 | {$filter: { |
815 | dest: id, |
816 | value: { |
817 | content: { |
818 | type: 'pub', |
819 | address: { |
820 | key: id, |
821 | host: {$truthy: true}, |
822 | port: {$truthy: true}, |
823 | } |
824 | } |
825 | } |
826 | }}, |
827 | {$map: ['value', 'content', 'address']} |
828 | ] |
829 | }), |
830 | pull.map(function (addr) { |
831 | return addr.host + ':' + addr.port |
832 | }), |
833 | pull.unique() |
834 | ) |
835 | } |
836 | |
837 | App.prototype.getIdeaTitle = function (id, cb) { |
838 | if (!this.sbot.backlinks) return cb(null, String(id).substr(0, 8) + '…') |
839 | pull( |
840 | this.sbot.backlinks.read({ |
841 | reverse: true, |
842 | query: [ |
843 | {$filter: { |
844 | dest: id, |
845 | value: { |
846 | content: { |
847 | type: 'talenet-idea-update', |
848 | ideaKey: id, |
849 | title: {$truthy: true} |
850 | } |
851 | } |
852 | }}, |
853 | {$map: ['value', 'content', 'title']} |
854 | ] |
855 | }), |
856 | pull.take(1), |
857 | pull.collect(function (err, titles) { |
858 | if (err) return cb(err) |
859 | var title = titles && titles[0] |
860 | || (String(id).substr(0, 8) + '…') |
861 | cb(null, title) |
862 | }) |
863 | ) |
864 | } |
865 | |
866 | function traverse(obj, emit) { |
867 | emit(obj) |
868 | if (obj !== null && typeof obj === 'object') { |
869 | for (var k in obj) { |
870 | traverse(obj[k], emit) |
871 | } |
872 | } |
873 | } |
874 | |
875 | App.prototype.expandOoo = function (opts, cb) { |
876 | var self = this |
877 | var dest = opts.dest |
878 | var msgs = opts.msgs |
879 | if (!Array.isArray(msgs)) return cb(new TypeError('msgs should be array')) |
880 | |
881 | // algorithm: |
882 | // traverse all links in the initial message set. |
883 | // find linked-to messages not in the set. |
884 | // fetch those messages. |
885 | // if one links to the dest, add it to the set |
886 | // and look for more missing links to fetch. |
887 | // done when no more links to fetch |
888 | |
889 | var msgsO = {} |
890 | var getting = {} |
891 | var waiting = 0 |
892 | |
893 | function checkDone() { |
894 | if (waiting) return |
895 | var msgs = Object.keys(msgsO).map(function (key) { |
896 | return msgsO[key] |
897 | }) |
898 | cb(null, msgs) |
899 | } |
900 | |
901 | function getMsg(id) { |
902 | if (msgsO[id] || getting[id]) return |
903 | getting[id] = true |
904 | waiting++ |
905 | self.getMsgDecryptedOoo(id, function (err, msg) { |
906 | waiting-- |
907 | if (err) console.trace(err) |
908 | else gotMsg(msg) |
909 | checkDone() |
910 | }) |
911 | } |
912 | |
913 | var links = {} |
914 | function addLink(id) { |
915 | if (typeof id === 'string' && id[0] === '%' && u.isRef(id)) { |
916 | links[id] = true |
917 | } |
918 | } |
919 | |
920 | msgs.forEach(function (msg) { |
921 | if (msgs[msg.key]) return |
922 | if (msg.value.content === false) return // missing root |
923 | msgsO[msg.key] = msg |
924 | traverse(msg, addLink) |
925 | }) |
926 | waiting++ |
927 | for (var id in links) { |
928 | getMsg(id) |
929 | } |
930 | waiting-- |
931 | checkDone() |
932 | |
933 | function gotMsg(msg) { |
934 | if (msgsO[msg.key]) return |
935 | var links = [] |
936 | var linkedToDest = msg.key === dest |
937 | traverse(msg, function (id) { |
938 | if (id === dest) linkedToDest = true |
939 | links.push(id) |
940 | }) |
941 | if (linkedToDest) { |
942 | msgsO[msg.key] = msg |
943 | links.forEach(addLink) |
944 | } |
945 | } |
946 | } |
947 | |
948 | App.prototype.getLineComments = function (opts, cb) { |
949 | // get line comments for a git-update message and git object id. |
950 | // line comments include message id, commit id and path |
951 | // but we have message id and git object hash. |
952 | // look up the git object hash for each line-comment |
953 | // to verify that it is for the git object file we want |
954 | var updateId = opts.obj.msg.key |
955 | var objId = opts.hash |
956 | var self = this |
957 | var lineComments = {} |
958 | pull( |
959 | self.sbot.backlinks ? self.sbot.backlinks.read({ |
960 | query: [ |
961 | {$filter: { |
962 | dest: updateId, |
963 | value: { |
964 | content: { |
965 | type: 'line-comment', |
966 | updateId: updateId, |
967 | } |
968 | } |
969 | }} |
970 | ] |
971 | }) : pull( |
972 | self.sbot.links({ |
973 | dest: updateId, |
974 | rel: 'updateId', |
975 | values: true |
976 | }), |
977 | pull.filter(function (msg) { |
978 | var c = msg && msg.value && msg.value.content |
979 | return c && c.type === 'line-comment' |
980 | && c.updateId === updateId |
981 | }) |
982 | ), |
983 | paramap(function (msg, cb) { |
984 | var c = msg.value.content |
985 | self.git.getObjectAtPath({ |
986 | msg: updateId, |
987 | obj: c.commitId, |
988 | path: c.filePath, |
989 | }, function (err, info) { |
990 | if (err) return cb(err) |
991 | cb(null, { |
992 | obj: info.obj, |
993 | hash: info.hash, |
994 | msg: msg, |
995 | }) |
996 | }) |
997 | }, 4), |
998 | pull.filter(function (info) { |
999 | return info.hash === objId |
1000 | }), |
1001 | pull.drain(function (info) { |
1002 | lineComments[info.msg.value.content.line] = info |
1003 | }, function (err) { |
1004 | cb(err, lineComments) |
1005 | }) |
1006 | ) |
1007 | } |
1008 | |
1009 | App.prototype.getThread = function (msg) { |
1010 | return cat([ |
1011 | pull.once(msg), |
1012 | this.sbot.backlinks ? this.sbot.backlinks.read({ |
1013 | query: [ |
1014 | {$filter: {dest: msg.key}} |
1015 | ] |
1016 | }) : this.sbot.links({ |
1017 | dest: msg.key, |
1018 | values: true |
1019 | }) |
1020 | ]) |
1021 | } |
1022 | |
1023 | App.prototype.getShard = function (id, cb) { |
1024 | var self = this |
1025 | this.getMsgDecrypted(id, function (err, msg) { |
1026 | if (err) return cb(new Error('Unable to get shard message: ' + err.stack)) |
1027 | var c = msg.value.content || {} |
1028 | if (!c.shard) return cb(new Error('Message missing shard: ' + id)) |
1029 | self.unboxContent(c.shard, function (err, shard) { |
1030 | if (err) return cb(new Error('Unable to decrypt shard: ' + err.stack)) |
1031 | cb(null, shard) |
1032 | }) |
1033 | }) |
1034 | } |
1035 |
Built with git-ssb-web