Files: 93cedb4dbe760934ac0138bd9f087079662060b8 / lib / app.js
20223 bytesRaw
1 | var http = require('http') |
2 | var memo = require('asyncmemo') |
3 | var lru = require('hashlru') |
4 | var pkg = require('../package') |
5 | var u = require('./util') |
6 | var pull = require('pull-stream') |
7 | var multicb = require('multicb') |
8 | var paramap = require('pull-paramap') |
9 | var Contacts = require('ssb-contact') |
10 | var About = require('./about') |
11 | var Serve = require('./serve') |
12 | var Render = require('./render') |
13 | var Git = require('./git') |
14 | var cat = require('pull-cat') |
15 | var proc = require('child_process') |
16 | var toPull = require('stream-to-pull-stream') |
17 | var BoxStream = require('pull-box-stream') |
18 | var crypto = require('crypto') |
19 | |
20 | var zeros = new Buffer(24); zeros.fill(0) |
21 | |
22 | module.exports = App |
23 | |
24 | function App(sbot, config) { |
25 | this.sbot = sbot |
26 | this.config = config |
27 | |
28 | var conf = config.patchfoo || {} |
29 | this.port = conf.port || 8027 |
30 | this.host = conf.host || 'localhost' |
31 | this.msgFilter = conf.filter |
32 | |
33 | var base = conf.base || '/' |
34 | this.opts = { |
35 | base: base, |
36 | blob_base: conf.blob_base || conf.img_base || base, |
37 | img_base: conf.img_base || base, |
38 | emoji_base: conf.emoji_base || (base + 'emoji/'), |
39 | encode_msgids: conf.encode_msgids == null ? true : Boolean(conf.encode_msgids), |
40 | } |
41 | |
42 | sbot.get = memo({cache: lru(100)}, sbot.get) |
43 | this.about = new About(this, sbot.id) |
44 | this.getMsg = memo({cache: lru(100)}, getMsgWithValue, sbot) |
45 | this.getAbout = memo({cache: this.aboutCache = lru(500)}, |
46 | this._getAbout.bind(this)) |
47 | this.unboxContent = memo({cache: lru(100)}, sbot.private.unbox) |
48 | this.reverseNameCache = lru(500) |
49 | this.reverseEmojiNameCache = lru(500) |
50 | this.getBlobSize = memo({cache: this.blobSizeCache = lru(100)}, |
51 | sbot.blobs.size.bind(sbot.blobs)) |
52 | this.getFollows = memo(this._getFollows.bind(this)) |
53 | this.getVotes = memo({cache: lru(100)}, this._getVotes.bind(this)) |
54 | |
55 | this.unboxMsg = this.unboxMsg.bind(this) |
56 | |
57 | this.render = new Render(this, this.opts) |
58 | this.git = new Git(this) |
59 | |
60 | this.monitorBlobWants() |
61 | } |
62 | |
63 | App.prototype.go = function () { |
64 | var self = this |
65 | var server = http.createServer(function (req, res) { |
66 | new Serve(self, req, res).go() |
67 | }) |
68 | if (self.host === 'localhost') server.listen(self.port, onListening) |
69 | else server.listen(self.port, self.host, onListening) |
70 | function onListening() { |
71 | var host = /:/.test(self.host) ? '[' + self.host + ']' : self.host |
72 | self.log('Listening on http://' + host + ':' + self.port) |
73 | } |
74 | |
75 | // invalidate cached About info when new About messages come in |
76 | pull( |
77 | self.sbot.links({rel: 'about', old: false, values: true}), |
78 | pull.drain(function (link) { |
79 | self.aboutCache.remove(link.dest) |
80 | }, function (err) { |
81 | if (err) throw err |
82 | }) |
83 | ) |
84 | |
85 | // keep alive ssb client connection |
86 | setInterval(self.sbot.whoami, 10e3) |
87 | } |
88 | |
89 | var logPrefix = '[' + pkg.name + ']' |
90 | App.prototype.log = console.log.bind(console, logPrefix) |
91 | App.prototype.error = console.error.bind(console, logPrefix) |
92 | |
93 | App.prototype.unboxMsg = function (msg, cb) { |
94 | var self = this |
95 | var c = msg.value && msg.value.content |
96 | if (typeof c !== 'string') cb(null, msg) |
97 | else self.unboxContent(c, function (err, content) { |
98 | if (err) { |
99 | self.error('unbox:', err) |
100 | return cb(null, msg) |
101 | } else if (!content) { |
102 | return cb(null, msg) |
103 | } |
104 | var m = {} |
105 | for (var k in msg) m[k] = msg[k] |
106 | m.value = {} |
107 | for (var k in msg.value) m.value[k] = msg.value[k] |
108 | m.value.content = content |
109 | m.value.private = true |
110 | cb(null, m) |
111 | }) |
112 | } |
113 | |
114 | App.prototype.search = function (opts) { |
115 | var search = this.sbot.fulltext && this.sbot.fulltext.search |
116 | if (!search) return pull.error(new Error('Missing fulltext search plugin')) |
117 | return search(opts) |
118 | } |
119 | |
120 | App.prototype.advancedSearch = function (opts) { |
121 | return pull( |
122 | opts.channel ? |
123 | this.sbot.backlinks.read({ |
124 | dest: '#' + opts.channel, |
125 | reverse: true, |
126 | }) |
127 | : opts.dest ? |
128 | this.sbot.links({ |
129 | values: true, |
130 | dest: opts.dest, |
131 | source: opts.source || undefined, |
132 | reverse: true, |
133 | }) |
134 | : opts.source ? |
135 | this.sbot.createUserStream({ |
136 | reverse: true, |
137 | id: opts.source |
138 | }) |
139 | : |
140 | this.sbot.createFeedStream({ |
141 | reverse: true, |
142 | }), |
143 | opts.text && pull.filter(filterByText(opts.text)) |
144 | ) |
145 | } |
146 | |
147 | function forSome(each) { |
148 | return function some(obj) { |
149 | if (obj == null) return false |
150 | if (typeof obj === 'string') return each(obj) |
151 | if (Array.isArray(obj)) return obj.some(some) |
152 | if (typeof obj === 'object') |
153 | for (var k in obj) if (some(obj[k])) return true |
154 | return false |
155 | } |
156 | } |
157 | |
158 | function filterByText(str) { |
159 | if (!str) return function () { return true } |
160 | var search = new RegExp(str, 'i') |
161 | var matches = forSome(search.test.bind(search)) |
162 | return function (msg) { |
163 | var c = msg.value.content |
164 | return c && matches(c) |
165 | } |
166 | } |
167 | |
168 | App.prototype.getMsgDecrypted = function (key, cb) { |
169 | var self = this |
170 | this.getMsg(key, function (err, msg) { |
171 | if (err) return cb(err) |
172 | self.unboxMsg(msg, cb) |
173 | }) |
174 | } |
175 | |
176 | App.prototype.publish = function (content, cb) { |
177 | var self = this |
178 | function tryPublish(triesLeft) { |
179 | if (Array.isArray(content.recps)) { |
180 | recps = content.recps.map(u.linkDest) |
181 | self.sbot.private.publish(content, recps, next) |
182 | } else { |
183 | self.sbot.publish(content, next) |
184 | } |
185 | function next(err, msg) { |
186 | if (err) { |
187 | if (triesLeft > 0) { |
188 | if (/^expected previous:/.test(err.message)) { |
189 | return tryPublish(triesLeft-1) |
190 | } |
191 | } |
192 | } |
193 | return cb(err, msg) |
194 | } |
195 | } |
196 | tryPublish(2) |
197 | } |
198 | |
199 | App.prototype.wantSizeBlob = function (id, cb) { |
200 | // only want() the blob if we don't already have it |
201 | var self = this |
202 | var blobs = this.sbot.blobs |
203 | blobs.size(id, function (err, size) { |
204 | if (size != null) return cb(null, size) |
205 | self.blobWants[id] = true |
206 | blobs.want(id, function (err) { |
207 | if (err) return cb(err) |
208 | blobs.size(id, cb) |
209 | }) |
210 | }) |
211 | } |
212 | |
213 | App.prototype.addBlobRaw = function (cb) { |
214 | var done = multicb({pluck: 1, spread: true}) |
215 | var sink = pull( |
216 | u.pullLength(done()), |
217 | this.sbot.blobs.add(done()) |
218 | ) |
219 | done(function (err, size, hash) { |
220 | if (err) return cb(err) |
221 | cb(null, {link: hash, size: size}) |
222 | }) |
223 | return sink |
224 | } |
225 | |
226 | App.prototype.addBlob = function (isPrivate, cb) { |
227 | if (!isPrivate) return this.addBlobRaw(cb) |
228 | else return this.addBlobPrivate(cb) |
229 | } |
230 | |
231 | App.prototype.addBlobPrivate = function (cb) { |
232 | var bufs = [] |
233 | var self = this |
234 | // use the hash of the cleartext as the key to encrypt the blob |
235 | var hash = crypto.createHash('sha256') |
236 | return pull.drain(function (buf) { |
237 | bufs.push(buf) |
238 | hash.update(buf) |
239 | }, function (err) { |
240 | if (err) return cb(err) |
241 | var secret = hash.digest() |
242 | pull( |
243 | pull.values(bufs), |
244 | BoxStream.createBoxStream(secret, zeros), |
245 | self.addBlobRaw(function (err, link) { |
246 | if (err) return cb(err) |
247 | link.key = secret.toString('base64') |
248 | cb(null, link) |
249 | }) |
250 | ) |
251 | }) |
252 | } |
253 | |
254 | App.prototype.getBlob = function (id, key) { |
255 | if (!key) return this.sbot.blobs.get(id) |
256 | if (typeof key === 'string') key = new Buffer(key, 'base64') |
257 | return pull( |
258 | this.sbot.blobs.get(id), |
259 | BoxStream.createUnboxStream(key, zeros) |
260 | ) |
261 | } |
262 | |
263 | App.prototype.pushBlob = function (id, cb) { |
264 | console.error('pushing blob', id) |
265 | this.sbot.blobs.push(id, cb) |
266 | } |
267 | |
268 | App.prototype.readBlob = function (link) { |
269 | link = u.toLink(link) |
270 | return this.sbot.blobs.get({ |
271 | hash: link.link, |
272 | size: link.size, |
273 | }) |
274 | } |
275 | |
276 | App.prototype.readBlobSlice = function (link, opts) { |
277 | if (this.sbot.blobs.getSlice) return this.sbot.blobs.getSlice({ |
278 | hash: link.link, |
279 | size: link.size, |
280 | start: opts.start, |
281 | end: opts.end, |
282 | }) |
283 | return pull( |
284 | this.readBlob(link), |
285 | u.pullSlice(opts.start, opts.end) |
286 | ) |
287 | } |
288 | |
289 | App.prototype.ensureHasBlobs = function (links, cb) { |
290 | var self = this |
291 | var done = multicb({pluck: 1}) |
292 | links.forEach(function (link) { |
293 | var cb = done() |
294 | self.sbot.blobs.size(link.link, function (err, size) { |
295 | if (err) cb(err) |
296 | else if (size == null) cb(null, link) |
297 | else cb() |
298 | }) |
299 | }) |
300 | done(function (err, missingLinks) { |
301 | if (err) console.trace(err) |
302 | missingLinks = missingLinks.filter(Boolean) |
303 | if (missingLinks.length == 0) return cb() |
304 | return cb({name: 'BlobNotFoundError', links: missingLinks}) |
305 | }) |
306 | } |
307 | |
308 | App.prototype.getReverseNameSync = function (name) { |
309 | var id = this.reverseNameCache.get(name) |
310 | return id |
311 | } |
312 | |
313 | App.prototype.getReverseEmojiNameSync = function (name) { |
314 | return this.reverseEmojiNameCache.get(name) |
315 | } |
316 | |
317 | App.prototype.getNameSync = function (name) { |
318 | var about = this.aboutCache.get(name) |
319 | return about && about.name |
320 | } |
321 | |
322 | function getMsgWithValue(sbot, id, cb) { |
323 | if (!id) return cb() |
324 | sbot.get(id, function (err, value) { |
325 | if (err) return cb(err) |
326 | cb(null, {key: id, value: value}) |
327 | }) |
328 | } |
329 | |
330 | App.prototype._getAbout = function (id, cb) { |
331 | var self = this |
332 | if (!u.isRef(id)) return cb(null, {}) |
333 | self.about.get(id, function (err, about) { |
334 | if (err) return cb(err) |
335 | var sigil = id[0] || '@' |
336 | if (about.name && about.name[0] !== sigil) { |
337 | about.name = sigil + about.name |
338 | } |
339 | self.reverseNameCache.set(about.name, id) |
340 | cb(null, about) |
341 | }) |
342 | } |
343 | |
344 | App.prototype.pullGetMsg = function (id) { |
345 | return pull.asyncMap(this.getMsg)(pull.once(id)) |
346 | } |
347 | |
348 | App.prototype.createLogStream = function (opts) { |
349 | opts = opts || {} |
350 | return opts.sortByTimestamp |
351 | ? this.createFeedStream(opts) |
352 | : this.sbot.createLogStream(opts) |
353 | } |
354 | |
355 | App.prototype.createFeedStream = function (opts) { |
356 | // work around opts.gt being treated as opts.gte sometimes |
357 | if (opts.gt && opts.limit && !opts.reverse) return pull( |
358 | this.sbot.createFeedStream(u.mergeOpts(opts, {limit: opts.limit + 1})), |
359 | pull.filter(function (msg) { |
360 | return msg && msg.value.timestamp !== opts.gt |
361 | }), |
362 | opts.limit && pull.take(opts.limit) |
363 | ) |
364 | return this.sbot.createFeedStream(opts) |
365 | } |
366 | |
367 | var stateVals = { |
368 | connected: 3, |
369 | connecting: 2, |
370 | disconnecting: 1, |
371 | } |
372 | |
373 | function comparePeers(a, b) { |
374 | var aState = stateVals[a.state] || 0 |
375 | var bState = stateVals[b.state] || 0 |
376 | return (bState - aState) |
377 | || (b.stateChange|0 - a.stateChange|0) |
378 | } |
379 | |
380 | App.prototype.streamPeers = function (opts) { |
381 | var gossip = this.sbot.gossip |
382 | return u.readNext(function (cb) { |
383 | gossip.peers(function (err, peers) { |
384 | if (err) return cb(err) |
385 | if (opts) peers = peers.filter(function (peer) { |
386 | for (var k in opts) if (opts[k] !== peer[k]) return false |
387 | return true |
388 | }) |
389 | peers.sort(comparePeers) |
390 | cb(null, pull.values(peers)) |
391 | }) |
392 | }) |
393 | } |
394 | |
395 | App.prototype.getFollow = function (source, dest, cb) { |
396 | var self = this |
397 | pull( |
398 | self.sbot.links({source: source, dest: dest, rel: 'contact', reverse: true, |
399 | values: true, meta: false, keys: false}), |
400 | pull.filter(function (value) { |
401 | var c = value && value.content |
402 | return c && c.type === 'contact' |
403 | }), |
404 | pull.take(1), |
405 | pull.collect(function (err, msgs) { |
406 | if (err) return cb(err) |
407 | cb(null, msgs[0] && !!msgs[0].content.following) |
408 | }) |
409 | ) |
410 | } |
411 | |
412 | App.prototype.unboxMessages = function () { |
413 | return paramap(this.unboxMsg, 16) |
414 | } |
415 | |
416 | App.prototype.streamChannels = function (opts) { |
417 | return pull( |
418 | this.sbot.messagesByType({type: 'channel', reverse: true}), |
419 | this.unboxMessages(), |
420 | pull.filter(function (msg) { |
421 | return msg.value.content.subscribed |
422 | }), |
423 | pull.map(function (msg) { |
424 | return msg.value.content.channel |
425 | }), |
426 | pull.unique() |
427 | ) |
428 | } |
429 | |
430 | App.prototype.streamMyChannels = function (id, opts) { |
431 | // use ssb-query plugin if it is available, since it has an index for |
432 | // author + type |
433 | if (this.sbot.query) return pull( |
434 | this.sbot.query.read({ |
435 | reverse: true, |
436 | query: [ |
437 | {$filter: { |
438 | value: { |
439 | author: id, |
440 | content: {type: 'channel', subscribed: true} |
441 | } |
442 | }}, |
443 | {$map: ['value', 'content', 'channel']} |
444 | ] |
445 | }), |
446 | pull.unique() |
447 | ) |
448 | |
449 | return pull( |
450 | this.sbot.createUserStream({id: id, reverse: true}), |
451 | this.unboxMessages(), |
452 | pull.filter(function (msg) { |
453 | if (msg.value.content.type == 'channel') { |
454 | return msg.value.content.subscribed |
455 | } |
456 | }), |
457 | pull.map(function (msg) { |
458 | return msg.value.content.channel |
459 | }), |
460 | pull.unique() |
461 | ) |
462 | } |
463 | |
464 | App.prototype.createContactStreams = function (id) { |
465 | return new Contacts(this.sbot).createContactStreams(id) |
466 | } |
467 | |
468 | function compareVoted(a, b) { |
469 | return b.value - a.value |
470 | } |
471 | |
472 | App.prototype.getVoted = function (_opts, cb) { |
473 | if (isNaN(_opts.limit)) return pull.error(new Error('missing limit')) |
474 | var self = this |
475 | var opts = { |
476 | type: 'vote', |
477 | limit: _opts.limit * 100, |
478 | reverse: !!_opts.reverse, |
479 | gt: _opts.gt || undefined, |
480 | lt: _opts.lt || undefined, |
481 | } |
482 | |
483 | var votedObj = {} |
484 | var votedArray = [] |
485 | var numItems = 0 |
486 | var firstTimestamp, lastTimestamp |
487 | pull( |
488 | self.sbot.messagesByType(opts), |
489 | self.unboxMessages(), |
490 | pull.take(function () { |
491 | return numItems < _opts.limit |
492 | }), |
493 | pull.drain(function (msg) { |
494 | if (!firstTimestamp) firstTimestamp = msg.timestamp |
495 | lastTimestamp = msg.timestamp |
496 | var vote = msg.value.content.vote |
497 | if (!vote) return |
498 | var target = u.linkDest(vote) |
499 | var votes = votedObj[target] |
500 | if (!votes) { |
501 | numItems++ |
502 | votes = {id: target, value: 0, feedsObj: {}, feeds: []} |
503 | votedObj[target] = votes |
504 | votedArray.push(votes) |
505 | } |
506 | if (msg.value.author in votes.feedsObj) { |
507 | if (!opts.reverse) return // leave latest vote value as-is |
508 | // remove old vote value |
509 | votes.value -= votes.feedsObj[msg.value.author] |
510 | } else { |
511 | votes.feeds.push(msg.value.author) |
512 | } |
513 | var value = vote.value > 0 ? 1 : vote.value < 0 ? -1 : 0 |
514 | votes.feedsObj[msg.value.author] = value |
515 | votes.value += value |
516 | }, function (err) { |
517 | if (err && err !== true) return cb(err) |
518 | var items = votedArray |
519 | if (opts.reverse) items.reverse() |
520 | items.sort(compareVoted) |
521 | cb(null, {items: items, |
522 | firstTimestamp: firstTimestamp, |
523 | lastTimestamp: lastTimestamp}) |
524 | }) |
525 | ) |
526 | } |
527 | |
528 | App.prototype.createAboutStreams = function (id) { |
529 | return this.about.createAboutStreams(id) |
530 | } |
531 | |
532 | App.prototype.streamEmojis = function () { |
533 | return pull( |
534 | cat([ |
535 | this.sbot.links({ |
536 | rel: 'mentions', |
537 | source: this.sbot.id, |
538 | dest: '&', |
539 | values: true |
540 | }), |
541 | this.sbot.links({rel: 'mentions', dest: '&', values: true}) |
542 | ]), |
543 | this.unboxMessages(), |
544 | pull.map(function (msg) { return msg.value.content.mentions }), |
545 | pull.flatten(), |
546 | pull.filter('emoji'), |
547 | pull.unique('link') |
548 | ) |
549 | } |
550 | |
551 | App.prototype.filter = function (plugin, opts, filter) { |
552 | // work around flumeview-query not picking the best index. |
553 | // %b+QdyLFQ21UGYwvV3AiD8FEr7mKlB8w9xx3h8WzSUb0=.sha256 |
554 | var index |
555 | if (plugin === this.sbot.backlinks) { |
556 | var c = filter && filter.value && filter.value.content |
557 | var filteringByType = c && c.type |
558 | if (!filteringByType) index = 'DTS' |
559 | } |
560 | // work around flumeview-query not supporting $lt/$gt. |
561 | // %FCIv0D7JQyERznC18p8Dc1KtN6SLeJAl1sR5DAIr/Ek=.sha256 |
562 | return pull( |
563 | plugin.read({ |
564 | index: index, |
565 | reverse: opts.reverse, |
566 | limit: opts.limit && (opts.limit + 1), |
567 | query: [{$filter: u.mergeOpts(filter, { |
568 | timestamp: { |
569 | $gte: opts.gt, |
570 | $lte: opts.lt, |
571 | } |
572 | })}] |
573 | }), |
574 | pull.filter(function (msg) { |
575 | return msg && msg.timestamp !== opts.lt && msg.timestamp !== opts.gt |
576 | }), |
577 | opts.limit && pull.take(opts.limit) |
578 | ) |
579 | } |
580 | |
581 | App.prototype.streamChannel = function (opts) { |
582 | // prefer ssb-backlinks to ssb-query because it also handles hashtag mentions |
583 | if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, { |
584 | dest: '#' + opts.channel, |
585 | }) |
586 | |
587 | if (this.sbot.query) return this.filter(this.sbot.query, opts, { |
588 | value: {content: {channel: opts.channel}}, |
589 | }) |
590 | |
591 | return pull.error(new Error( |
592 | 'Viewing channels/tags requires the ssb-backlinks or ssb-query plugin')) |
593 | } |
594 | |
595 | App.prototype.streamMentions = function (opts) { |
596 | if (!this.sbot.backlinks) return pull.error(new Error( |
597 | 'Viewing mentions requires the ssb-backlinks plugin')) |
598 | |
599 | if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, { |
600 | dest: this.sbot.id, |
601 | }) |
602 | } |
603 | |
604 | App.prototype.streamPrivate = function (opts) { |
605 | if (this.sbot.private.read) return this.filter(this.sbot.private, opts, {}) |
606 | |
607 | return pull( |
608 | this.createLogStream(u.mergeOpts(opts)), |
609 | pull.filter(u.isMsgEncrypted), |
610 | this.unboxMessages(), |
611 | pull.filter(u.isMsgReadable) |
612 | ) |
613 | } |
614 | |
615 | App.prototype.blobMentions = function (opts) { |
616 | if (!this.sbot.links2) return pull.error(new Error( |
617 | 'missing ssb-links plugin')) |
618 | var filter = {rel: ['mentions', opts.name]} |
619 | if (opts.author) filter.source = opts.author |
620 | return this.sbot.links2.read({ |
621 | query: [ |
622 | {$filter: filter}, |
623 | {$filter: {dest: {$prefix: '&'}}}, |
624 | {$map: { |
625 | name: ['rel', 1], |
626 | size: ['rel', 2], |
627 | link: 'dest', |
628 | author: 'source', |
629 | time: 'ts' |
630 | }} |
631 | ] |
632 | }) |
633 | } |
634 | |
635 | App.prototype.monitorBlobWants = function () { |
636 | var self = this |
637 | self.blobWants = {} |
638 | pull( |
639 | this.sbot.blobs.createWants(), |
640 | pull.drain(function (wants) { |
641 | for (var id in wants) { |
642 | if (wants[id] < 0) self.blobWants[id] = true |
643 | else delete self.blobWants[id] |
644 | self.blobSizeCache.remove(id) |
645 | } |
646 | }, function (err) { |
647 | if (err) console.trace(err) |
648 | }) |
649 | ) |
650 | } |
651 | |
652 | App.prototype.getBlobState = function (id, cb) { |
653 | var self = this |
654 | if (self.blobWants[id]) return cb(null, 'wanted') |
655 | self.getBlobSize(id, function (err, size) { |
656 | if (err) return cb(err) |
657 | cb(null, size != null) |
658 | }) |
659 | } |
660 | |
661 | App.prototype.getNpmReadme = function (tarballId, cb) { |
662 | var self = this |
663 | // TODO: make this portable, and handle plaintext readmes |
664 | var tar = proc.spawn('tar', ['--ignore-case', '-Oxz', |
665 | 'package/README.md', 'package/readme.markdown', 'package/readme.mkd']) |
666 | var done = multicb({pluck: 1, spread: true}) |
667 | pull( |
668 | self.sbot.blobs.get(tarballId), |
669 | toPull.sink(tar.stdin, done()) |
670 | ) |
671 | pull( |
672 | toPull.source(tar.stdout), |
673 | pull.collect(done()) |
674 | ) |
675 | done(function (err, _, bufs) { |
676 | if (err) return cb(err) |
677 | var text = Buffer.concat(bufs).toString('utf8') |
678 | cb(null, text, true) |
679 | }) |
680 | } |
681 | |
682 | App.prototype.filterMsg = function (msg, opts, cb) { |
683 | var self = this |
684 | var myId = self.sbot.id |
685 | var author = msg.value && msg.value.author |
686 | var filter = opts.filter || self.msgFilter |
687 | var show = (filter !== 'invert') |
688 | if (filter === 'all' |
689 | || author === myId |
690 | || author === opts.feed |
691 | || msg.key === opts.msgId) return cb(null, show) |
692 | self.getFollows(myId, function (err, follows) { |
693 | if (err) return cb(err) |
694 | if (follows[author]) return cb(null, show) |
695 | self.getVotes(msg.key, function (err, votes) { |
696 | if (err) return cb(err) |
697 | for (var author in votes) { |
698 | if (follows[author] && votes[author] > 0) { |
699 | return cb(null, show) |
700 | } |
701 | } |
702 | return cb(null, !show) |
703 | }) |
704 | }) |
705 | } |
706 | |
707 | App.prototype.isFollowing = function (src, dest, cb) { |
708 | var self = this |
709 | self.getFollows(src, function (err, follows) { |
710 | if (err) return cb(err) |
711 | return cb(null, follows[dest]) |
712 | }) |
713 | } |
714 | |
715 | App.prototype._getFollows = function (id, cb) { |
716 | var follows = {} |
717 | function ready(err) { |
718 | if (!cb) return |
719 | var _cb = cb |
720 | cb = null |
721 | _cb(err, follows) |
722 | } |
723 | pull( |
724 | this.sbot.links2.read({ |
725 | live: true, |
726 | query: [ |
727 | {$filter: { |
728 | source: id, |
729 | rel: [{$prefix: 'contact'}] |
730 | }}, |
731 | {$map: { |
732 | following: ['rel', 1], |
733 | feed: 'dest' |
734 | }} |
735 | ] |
736 | }), |
737 | pull.drain(function (link) { |
738 | if (link.sync) return ready() |
739 | follows[link.feed] = link.following |
740 | }, ready) |
741 | ) |
742 | } |
743 | |
744 | App.prototype._getVotes = function (id, cb) { |
745 | var votes = {} |
746 | pull( |
747 | this.sbot.links2.read({ |
748 | query: [ |
749 | {$filter: { |
750 | dest: id, |
751 | rel: [{$prefix: 'vote'}] |
752 | }}, |
753 | {$map: { |
754 | value: ['rel', 1], |
755 | author: 'source' |
756 | }} |
757 | ] |
758 | }), |
759 | pull.drain(function (vote) { |
760 | votes[vote.author] = vote.value |
761 | }, function (err) { |
762 | cb(err, votes) |
763 | }) |
764 | ) |
765 | } |
766 |
Built with git-ssb-web