Files: 5a8a31c7e4523ef4de4c20e28443f9ea18961ce4 / lib / app.js
20303 bytesRaw
1 | var http = require('http') |
2 | var memo = require('asyncmemo') |
3 | var lru = require('hashlru') |
4 | var pkg = require('../package') |
5 | var u = require('./util') |
6 | var pull = require('pull-stream') |
7 | var multicb = require('multicb') |
8 | var paramap = require('pull-paramap') |
9 | var getContacts = require('./contact') |
10 | var About = require('./about') |
11 | var Serve = require('./serve') |
12 | var Render = require('./render') |
13 | var Git = require('./git') |
14 | var cat = require('pull-cat') |
15 | var proc = require('child_process') |
16 | var toPull = require('stream-to-pull-stream') |
17 | var BoxStream = require('pull-box-stream') |
18 | var crypto = require('crypto') |
19 | |
20 | var zeros = new Buffer(24); zeros.fill(0) |
21 | |
22 | module.exports = App |
23 | |
24 | function App(sbot, config) { |
25 | this.sbot = sbot |
26 | this.config = config |
27 | |
28 | var conf = config.patchfoo || {} |
29 | this.port = conf.port || 8027 |
30 | this.host = conf.host || 'localhost' |
31 | this.msgFilter = conf.filter |
32 | |
33 | var base = conf.base || '/' |
34 | this.opts = { |
35 | base: base, |
36 | blob_base: conf.blob_base || conf.img_base || base, |
37 | img_base: conf.img_base || (base + 'image/'), |
38 | emoji_base: conf.emoji_base || (base + 'emoji/'), |
39 | encode_msgids: conf.encode_msgids == null ? true : Boolean(conf.encode_msgids), |
40 | } |
41 | |
42 | sbot.get = memo({cache: lru(100)}, sbot.get) |
43 | this.about = new About(this, sbot.id) |
44 | this.getMsg = memo({cache: lru(100)}, getMsgWithValue, sbot) |
45 | this.getAbout = memo({cache: this.aboutCache = lru(500)}, |
46 | this._getAbout.bind(this)) |
47 | this.unboxContent = memo({cache: lru(100)}, sbot.private.unbox) |
48 | this.reverseNameCache = lru(500) |
49 | this.reverseEmojiNameCache = lru(500) |
50 | this.getBlobSize = memo({cache: this.blobSizeCache = lru(100)}, |
51 | sbot.blobs.size.bind(sbot.blobs)) |
52 | this.getFollows = memo(this._getFollows.bind(this)) |
53 | this.getVotes = memo({cache: lru(100)}, this._getVotes.bind(this)) |
54 | this.getContacts = getContacts.bind(null, this.sbot) |
55 | |
56 | this.unboxMsg = this.unboxMsg.bind(this) |
57 | |
58 | this.render = new Render(this, this.opts) |
59 | this.git = new Git(this) |
60 | |
61 | this.monitorBlobWants() |
62 | } |
63 | |
64 | App.prototype.go = function () { |
65 | var self = this |
66 | var server = http.createServer(function (req, res) { |
67 | new Serve(self, req, res).go() |
68 | }) |
69 | if (self.host === 'localhost') server.listen(self.port, onListening) |
70 | else server.listen(self.port, self.host, onListening) |
71 | function onListening() { |
72 | var host = /:/.test(self.host) ? '[' + self.host + ']' : self.host |
73 | self.log('Listening on http://' + host + ':' + self.port) |
74 | } |
75 | |
76 | // invalidate cached About info when new About messages come in |
77 | pull( |
78 | self.sbot.links({rel: 'about', old: false, values: true}), |
79 | pull.drain(function (link) { |
80 | self.aboutCache.remove(link.dest) |
81 | }, function (err) { |
82 | if (err) throw err |
83 | }) |
84 | ) |
85 | |
86 | // keep alive ssb client connection |
87 | setInterval(self.sbot.whoami, 10e3) |
88 | } |
89 | |
90 | var logPrefix = '[' + pkg.name + ']' |
91 | App.prototype.log = console.log.bind(console, logPrefix) |
92 | App.prototype.error = console.error.bind(console, logPrefix) |
93 | |
94 | App.prototype.unboxMsg = function (msg, cb) { |
95 | var self = this |
96 | var c = msg.value && msg.value.content |
97 | if (typeof c !== 'string') cb(null, msg) |
98 | else self.unboxContent(c, function (err, content) { |
99 | if (err) { |
100 | self.error('unbox:', err) |
101 | return cb(null, msg) |
102 | } else if (!content) { |
103 | return cb(null, msg) |
104 | } |
105 | var m = {} |
106 | for (var k in msg) m[k] = msg[k] |
107 | m.value = {} |
108 | for (var k in msg.value) m.value[k] = msg.value[k] |
109 | m.value.content = content |
110 | m.value.private = true |
111 | cb(null, m) |
112 | }) |
113 | } |
114 | |
115 | App.prototype.search = function (opts) { |
116 | var search = this.sbot.fulltext && this.sbot.fulltext.search |
117 | if (!search) return pull.error(new Error('Missing fulltext search plugin')) |
118 | return search(opts) |
119 | } |
120 | |
121 | App.prototype.advancedSearch = function (opts) { |
122 | return pull( |
123 | opts.channel ? |
124 | this.sbot.backlinks.read({ |
125 | dest: '#' + opts.channel, |
126 | reverse: true, |
127 | }) |
128 | : opts.dest ? |
129 | this.sbot.links({ |
130 | values: true, |
131 | dest: opts.dest, |
132 | source: opts.source || undefined, |
133 | reverse: true, |
134 | }) |
135 | : opts.source ? |
136 | this.sbot.createUserStream({ |
137 | reverse: true, |
138 | id: opts.source |
139 | }) |
140 | : |
141 | this.sbot.createFeedStream({ |
142 | reverse: true, |
143 | }), |
144 | this.unboxMessages(), |
145 | opts.text && pull.filter(filterByText(opts.text)) |
146 | ) |
147 | } |
148 | |
149 | function forSome(each) { |
150 | return function some(obj) { |
151 | if (obj == null) return false |
152 | if (typeof obj === 'string') return each(obj) |
153 | if (Array.isArray(obj)) return obj.some(some) |
154 | if (typeof obj === 'object') |
155 | for (var k in obj) if (some(obj[k])) return true |
156 | return false |
157 | } |
158 | } |
159 | |
160 | function filterByText(str) { |
161 | if (!str) return function () { return true } |
162 | var search = new RegExp(str, 'i') |
163 | var matches = forSome(search.test.bind(search)) |
164 | return function (msg) { |
165 | var c = msg.value.content |
166 | return c && matches(c) |
167 | } |
168 | } |
169 | |
170 | App.prototype.getMsgDecrypted = function (key, cb) { |
171 | var self = this |
172 | this.getMsg(key, function (err, msg) { |
173 | if (err) return cb(err) |
174 | self.unboxMsg(msg, cb) |
175 | }) |
176 | } |
177 | |
178 | App.prototype.publish = function (content, cb) { |
179 | var self = this |
180 | function tryPublish(triesLeft) { |
181 | if (Array.isArray(content.recps)) { |
182 | recps = content.recps.map(u.linkDest) |
183 | self.sbot.private.publish(content, recps, next) |
184 | } else { |
185 | self.sbot.publish(content, next) |
186 | } |
187 | function next(err, msg) { |
188 | if (err) { |
189 | if (triesLeft > 0) { |
190 | if (/^expected previous:/.test(err.message)) { |
191 | return tryPublish(triesLeft-1) |
192 | } |
193 | } |
194 | } |
195 | return cb(err, msg) |
196 | } |
197 | } |
198 | tryPublish(2) |
199 | } |
200 | |
201 | App.prototype.wantSizeBlob = function (id, cb) { |
202 | // only want() the blob if we don't already have it |
203 | var self = this |
204 | var blobs = this.sbot.blobs |
205 | blobs.size(id, function (err, size) { |
206 | if (size != null) return cb(null, size) |
207 | self.blobWants[id] = true |
208 | blobs.want(id, function (err) { |
209 | if (err) return cb(err) |
210 | blobs.size(id, cb) |
211 | }) |
212 | }) |
213 | } |
214 | |
215 | App.prototype.addBlobRaw = function (cb) { |
216 | var done = multicb({pluck: 1, spread: true}) |
217 | var sink = pull( |
218 | u.pullLength(done()), |
219 | this.sbot.blobs.add(done()) |
220 | ) |
221 | done(function (err, size, hash) { |
222 | if (err) return cb(err) |
223 | cb(null, {link: hash, size: size}) |
224 | }) |
225 | return sink |
226 | } |
227 | |
228 | App.prototype.addBlob = function (isPrivate, cb) { |
229 | if (!isPrivate) return this.addBlobRaw(cb) |
230 | else return this.addBlobPrivate(cb) |
231 | } |
232 | |
233 | App.prototype.addBlobPrivate = function (cb) { |
234 | var bufs = [] |
235 | var self = this |
236 | // use the hash of the cleartext as the key to encrypt the blob |
237 | var hash = crypto.createHash('sha256') |
238 | return pull.drain(function (buf) { |
239 | bufs.push(buf) |
240 | hash.update(buf) |
241 | }, function (err) { |
242 | if (err) return cb(err) |
243 | var secret = hash.digest() |
244 | pull( |
245 | pull.values(bufs), |
246 | BoxStream.createBoxStream(secret, zeros), |
247 | self.addBlobRaw(function (err, link) { |
248 | if (err) return cb(err) |
249 | link.key = secret.toString('base64') |
250 | cb(null, link) |
251 | }) |
252 | ) |
253 | }) |
254 | } |
255 | |
256 | App.prototype.getBlob = function (id, key) { |
257 | if (!key) return this.sbot.blobs.get(id) |
258 | if (typeof key === 'string') key = new Buffer(key, 'base64') |
259 | return pull( |
260 | this.sbot.blobs.get(id), |
261 | BoxStream.createUnboxStream(key, zeros) |
262 | ) |
263 | } |
264 | |
265 | App.prototype.pushBlob = function (id, cb) { |
266 | console.error('pushing blob', id) |
267 | this.sbot.blobs.push(id, cb) |
268 | } |
269 | |
270 | App.prototype.readBlob = function (link) { |
271 | link = u.toLink(link) |
272 | return this.sbot.blobs.get({ |
273 | hash: link.link, |
274 | size: link.size, |
275 | }) |
276 | } |
277 | |
278 | App.prototype.readBlobSlice = function (link, opts) { |
279 | if (this.sbot.blobs.getSlice) return this.sbot.blobs.getSlice({ |
280 | hash: link.link, |
281 | size: link.size, |
282 | start: opts.start, |
283 | end: opts.end, |
284 | }) |
285 | return pull( |
286 | this.readBlob(link), |
287 | u.pullSlice(opts.start, opts.end) |
288 | ) |
289 | } |
290 | |
291 | App.prototype.ensureHasBlobs = function (links, cb) { |
292 | var self = this |
293 | var done = multicb({pluck: 1}) |
294 | links.forEach(function (link) { |
295 | var cb = done() |
296 | self.sbot.blobs.size(link.link, function (err, size) { |
297 | if (err) cb(err) |
298 | else if (size == null) cb(null, link) |
299 | else cb() |
300 | }) |
301 | }) |
302 | done(function (err, missingLinks) { |
303 | if (err) console.trace(err) |
304 | missingLinks = missingLinks.filter(Boolean) |
305 | if (missingLinks.length == 0) return cb() |
306 | return cb({name: 'BlobNotFoundError', links: missingLinks}) |
307 | }) |
308 | } |
309 | |
310 | App.prototype.getReverseNameSync = function (name) { |
311 | var id = this.reverseNameCache.get(name) |
312 | return id |
313 | } |
314 | |
315 | App.prototype.getReverseEmojiNameSync = function (name) { |
316 | return this.reverseEmojiNameCache.get(name) |
317 | } |
318 | |
319 | App.prototype.getNameSync = function (name) { |
320 | var about = this.aboutCache.get(name) |
321 | return about && about.name |
322 | } |
323 | |
324 | function getMsgWithValue(sbot, id, cb) { |
325 | if (!id) return cb() |
326 | sbot.get(id, function (err, value) { |
327 | if (err) return cb(err) |
328 | cb(null, {key: id, value: value}) |
329 | }) |
330 | } |
331 | |
332 | App.prototype._getAbout = function (id, cb) { |
333 | var self = this |
334 | if (!u.isRef(id)) return cb(null, {}) |
335 | self.about.get(id, function (err, about) { |
336 | if (err) return cb(err) |
337 | var sigil = id[0] || '@' |
338 | if (about.name && about.name[0] !== sigil) { |
339 | about.name = sigil + about.name |
340 | } |
341 | self.reverseNameCache.set(about.name, id) |
342 | cb(null, about) |
343 | }) |
344 | } |
345 | |
346 | App.prototype.pullGetMsg = function (id) { |
347 | return pull.asyncMap(this.getMsg)(pull.once(id)) |
348 | } |
349 | |
350 | App.prototype.createLogStream = function (opts) { |
351 | opts = opts || {} |
352 | return opts.sortByTimestamp |
353 | ? this.createFeedStream(opts) |
354 | : this.sbot.createLogStream(opts) |
355 | } |
356 | |
357 | App.prototype.createFeedStream = function (opts) { |
358 | // work around opts.gt being treated as opts.gte sometimes |
359 | var limit = Number(opts.limit) |
360 | if (opts.gt && limit && !opts.reverse) return pull( |
361 | this.sbot.createFeedStream(u.mergeOpts(opts, {limit: opts.limit + 1})), |
362 | pull.filter(function (msg) { |
363 | return msg && msg.value.timestamp !== opts.gt |
364 | }), |
365 | limit && pull.take(limit) |
366 | ) |
367 | return this.sbot.createFeedStream(opts) |
368 | } |
369 | |
370 | var stateVals = { |
371 | connected: 3, |
372 | connecting: 2, |
373 | disconnecting: 1, |
374 | } |
375 | |
376 | function comparePeers(a, b) { |
377 | var aState = stateVals[a.state] || 0 |
378 | var bState = stateVals[b.state] || 0 |
379 | return (bState - aState) |
380 | || (b.stateChange|0 - a.stateChange|0) |
381 | } |
382 | |
383 | App.prototype.streamPeers = function (opts) { |
384 | var gossip = this.sbot.gossip |
385 | return u.readNext(function (cb) { |
386 | gossip.peers(function (err, peers) { |
387 | if (err) return cb(err) |
388 | if (opts) peers = peers.filter(function (peer) { |
389 | for (var k in opts) if (opts[k] !== peer[k]) return false |
390 | return true |
391 | }) |
392 | peers.sort(comparePeers) |
393 | cb(null, pull.values(peers)) |
394 | }) |
395 | }) |
396 | } |
397 | |
398 | App.prototype.getContact = function (source, dest, cb) { |
399 | var self = this |
400 | pull( |
401 | self.sbot.links({source: source, dest: dest, rel: 'contact', |
402 | values: true, meta: false, keys: false}), |
403 | pull.filter(function (value) { |
404 | var c = value && value.content |
405 | return c && c.type === 'contact' |
406 | }), |
407 | pull.reduce(function (acc, value) { |
408 | // trinary logic from ssb-friends |
409 | return value.content.following ? true |
410 | : value.content.flagged || value.content.blocking ? false |
411 | : acc |
412 | }, null, cb) |
413 | ) |
414 | } |
415 | |
416 | App.prototype.unboxMessages = function () { |
417 | return paramap(this.unboxMsg, 16) |
418 | } |
419 | |
420 | App.prototype.streamChannels = function (opts) { |
421 | return pull( |
422 | this.sbot.messagesByType({type: 'channel', reverse: true}), |
423 | this.unboxMessages(), |
424 | pull.filter(function (msg) { |
425 | return msg.value.content.subscribed |
426 | }), |
427 | pull.map(function (msg) { |
428 | return msg.value.content.channel |
429 | }), |
430 | pull.unique() |
431 | ) |
432 | } |
433 | |
434 | App.prototype.streamMyChannels = function (id, opts) { |
435 | // use ssb-query plugin if it is available, since it has an index for |
436 | // author + type |
437 | if (this.sbot.query) return pull( |
438 | this.sbot.query.read({ |
439 | reverse: true, |
440 | query: [ |
441 | {$filter: { |
442 | value: { |
443 | author: id, |
444 | content: {type: 'channel', subscribed: true} |
445 | } |
446 | }}, |
447 | {$map: ['value', 'content', 'channel']} |
448 | ] |
449 | }), |
450 | pull.unique() |
451 | ) |
452 | |
453 | return pull( |
454 | this.sbot.createUserStream({id: id, reverse: true}), |
455 | this.unboxMessages(), |
456 | pull.filter(function (msg) { |
457 | if (msg.value.content.type == 'channel') { |
458 | return msg.value.content.subscribed |
459 | } |
460 | }), |
461 | pull.map(function (msg) { |
462 | return msg.value.content.channel |
463 | }), |
464 | pull.unique() |
465 | ) |
466 | } |
467 | |
468 | function compareVoted(a, b) { |
469 | return b.value - a.value |
470 | } |
471 | |
472 | App.prototype.getVoted = function (_opts, cb) { |
473 | if (isNaN(_opts.limit)) return pull.error(new Error('missing limit')) |
474 | var self = this |
475 | var opts = { |
476 | type: 'vote', |
477 | limit: _opts.limit * 100, |
478 | reverse: !!_opts.reverse, |
479 | gt: _opts.gt || undefined, |
480 | lt: _opts.lt || undefined, |
481 | } |
482 | |
483 | var votedObj = {} |
484 | var votedArray = [] |
485 | var numItems = 0 |
486 | var firstTimestamp, lastTimestamp |
487 | pull( |
488 | self.sbot.messagesByType(opts), |
489 | self.unboxMessages(), |
490 | pull.take(function () { |
491 | return numItems < _opts.limit |
492 | }), |
493 | pull.drain(function (msg) { |
494 | if (!firstTimestamp) firstTimestamp = msg.timestamp |
495 | lastTimestamp = msg.timestamp |
496 | var vote = msg.value.content.vote |
497 | if (!vote) return |
498 | var target = u.linkDest(vote) |
499 | var votes = votedObj[target] |
500 | if (!votes) { |
501 | numItems++ |
502 | votes = {id: target, value: 0, feedsObj: {}, feeds: []} |
503 | votedObj[target] = votes |
504 | votedArray.push(votes) |
505 | } |
506 | if (msg.value.author in votes.feedsObj) { |
507 | if (!opts.reverse) return // leave latest vote value as-is |
508 | // remove old vote value |
509 | votes.value -= votes.feedsObj[msg.value.author] |
510 | } else { |
511 | votes.feeds.push(msg.value.author) |
512 | } |
513 | var value = vote.value > 0 ? 1 : vote.value < 0 ? -1 : 0 |
514 | votes.feedsObj[msg.value.author] = value |
515 | votes.value += value |
516 | }, function (err) { |
517 | if (err && err !== true) return cb(err) |
518 | var items = votedArray |
519 | if (opts.reverse) items.reverse() |
520 | items.sort(compareVoted) |
521 | cb(null, {items: items, |
522 | firstTimestamp: firstTimestamp, |
523 | lastTimestamp: lastTimestamp}) |
524 | }) |
525 | ) |
526 | } |
527 | |
528 | App.prototype.createAboutStreams = function (id) { |
529 | return this.about.createAboutStreams(id) |
530 | } |
531 | |
532 | App.prototype.streamEmojis = function () { |
533 | return pull( |
534 | cat([ |
535 | this.sbot.links({ |
536 | rel: 'mentions', |
537 | source: this.sbot.id, |
538 | dest: '&', |
539 | values: true |
540 | }), |
541 | this.sbot.links({rel: 'mentions', dest: '&', values: true}) |
542 | ]), |
543 | this.unboxMessages(), |
544 | pull.map(function (msg) { return msg.value.content.mentions }), |
545 | pull.flatten(), |
546 | pull.filter('emoji'), |
547 | pull.unique('link') |
548 | ) |
549 | } |
550 | |
551 | App.prototype.filter = function (plugin, opts, filter) { |
552 | // work around flumeview-query not picking the best index. |
553 | // %b+QdyLFQ21UGYwvV3AiD8FEr7mKlB8w9xx3h8WzSUb0=.sha256 |
554 | var limit = Number(opts.limit) |
555 | var index |
556 | if (plugin === this.sbot.backlinks) { |
557 | var c = filter && filter.value && filter.value.content |
558 | var filteringByType = c && c.type |
559 | if (!filteringByType) index = 'DTS' |
560 | } |
561 | // work around flumeview-query not supporting $lt/$gt. |
562 | // %FCIv0D7JQyERznC18p8Dc1KtN6SLeJAl1sR5DAIr/Ek=.sha256 |
563 | return pull( |
564 | plugin.read({ |
565 | index: index, |
566 | reverse: opts.reverse, |
567 | limit: limit ? (limit + 1) : undefined, |
568 | query: [{$filter: u.mergeOpts(filter, { |
569 | timestamp: { |
570 | $gte: opts.gt, |
571 | $lte: opts.lt, |
572 | } |
573 | })}] |
574 | }), |
575 | pull.filter(function (msg) { |
576 | return msg && msg.timestamp !== opts.lt && msg.timestamp !== opts.gt |
577 | }), |
578 | limit && pull.take(limit) |
579 | ) |
580 | } |
581 | |
582 | App.prototype.streamChannel = function (opts) { |
583 | // prefer ssb-backlinks to ssb-query because it also handles hashtag mentions |
584 | if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, { |
585 | dest: '#' + opts.channel, |
586 | }) |
587 | |
588 | if (this.sbot.query) return this.filter(this.sbot.query, opts, { |
589 | value: {content: {channel: opts.channel}}, |
590 | }) |
591 | |
592 | return pull.error(new Error( |
593 | 'Viewing channels/tags requires the ssb-backlinks or ssb-query plugin')) |
594 | } |
595 | |
596 | App.prototype.streamMentions = function (opts) { |
597 | if (!this.sbot.backlinks) return pull.error(new Error( |
598 | 'Viewing mentions requires the ssb-backlinks plugin')) |
599 | |
600 | if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, { |
601 | dest: this.sbot.id, |
602 | }) |
603 | } |
604 | |
605 | App.prototype.streamPrivate = function (opts) { |
606 | if (this.sbot.private.read) return this.filter(this.sbot.private, opts, {}) |
607 | |
608 | return pull( |
609 | this.createLogStream(u.mergeOpts(opts)), |
610 | pull.filter(u.isMsgEncrypted), |
611 | this.unboxMessages(), |
612 | pull.filter(u.isMsgReadable) |
613 | ) |
614 | } |
615 | |
616 | App.prototype.blobMentions = function (opts) { |
617 | if (!this.sbot.links2) return pull.error(new Error( |
618 | 'missing ssb-links plugin')) |
619 | var filter = {rel: ['mentions', opts.name]} |
620 | if (opts.author) filter.source = opts.author |
621 | return this.sbot.links2.read({ |
622 | query: [ |
623 | {$filter: filter}, |
624 | {$filter: {dest: {$prefix: '&'}}}, |
625 | {$map: { |
626 | name: ['rel', 1], |
627 | size: ['rel', 2], |
628 | link: 'dest', |
629 | author: 'source', |
630 | time: 'ts' |
631 | }} |
632 | ] |
633 | }) |
634 | } |
635 | |
636 | App.prototype.monitorBlobWants = function () { |
637 | var self = this |
638 | self.blobWants = {} |
639 | pull( |
640 | this.sbot.blobs.createWants(), |
641 | pull.drain(function (wants) { |
642 | for (var id in wants) { |
643 | if (wants[id] < 0) self.blobWants[id] = true |
644 | else delete self.blobWants[id] |
645 | self.blobSizeCache.remove(id) |
646 | } |
647 | }, function (err) { |
648 | if (err) console.trace(err) |
649 | }) |
650 | ) |
651 | } |
652 | |
653 | App.prototype.getBlobState = function (id, cb) { |
654 | var self = this |
655 | if (self.blobWants[id]) return cb(null, 'wanted') |
656 | self.getBlobSize(id, function (err, size) { |
657 | if (err) return cb(err) |
658 | cb(null, size != null) |
659 | }) |
660 | } |
661 | |
662 | App.prototype.getNpmReadme = function (tarballId, cb) { |
663 | var self = this |
664 | // TODO: make this portable, and handle plaintext readmes |
665 | var tar = proc.spawn('tar', ['--ignore-case', '-Oxz', |
666 | 'package/README.md', 'package/readme.markdown', 'package/readme.mkd']) |
667 | var done = multicb({pluck: 1, spread: true}) |
668 | pull( |
669 | self.sbot.blobs.get(tarballId), |
670 | toPull.sink(tar.stdin, done()) |
671 | ) |
672 | pull( |
673 | toPull.source(tar.stdout), |
674 | pull.collect(done()) |
675 | ) |
676 | done(function (err, _, bufs) { |
677 | if (err) return cb(err) |
678 | var text = Buffer.concat(bufs).toString('utf8') |
679 | cb(null, text, true) |
680 | }) |
681 | } |
682 | |
683 | App.prototype.filterMsg = function (msg, opts, cb) { |
684 | var self = this |
685 | var myId = self.sbot.id |
686 | var author = msg.value && msg.value.author |
687 | var filter = opts.filter || self.msgFilter |
688 | var show = (filter !== 'invert') |
689 | if (filter === 'all' |
690 | || author === myId |
691 | || author === opts.feed |
692 | || msg.key === opts.msgId) return cb(null, show) |
693 | self.getFollows(myId, function (err, follows) { |
694 | if (err) return cb(err) |
695 | if (follows[author]) return cb(null, show) |
696 | self.getVotes(msg.key, function (err, votes) { |
697 | if (err) return cb(err) |
698 | for (var author in votes) { |
699 | if (follows[author] && votes[author] > 0) { |
700 | return cb(null, show) |
701 | } |
702 | } |
703 | return cb(null, !show) |
704 | }) |
705 | }) |
706 | } |
707 | |
708 | App.prototype.isFollowing = function (src, dest, cb) { |
709 | var self = this |
710 | self.getFollows(src, function (err, follows) { |
711 | if (err) return cb(err) |
712 | return cb(null, follows[dest]) |
713 | }) |
714 | } |
715 | |
716 | App.prototype._getFollows = function (id, cb) { |
717 | var follows = {} |
718 | function ready(err) { |
719 | if (!cb) return |
720 | var _cb = cb |
721 | cb = null |
722 | _cb(err, follows) |
723 | } |
724 | pull( |
725 | this.sbot.links2.read({ |
726 | live: true, |
727 | query: [ |
728 | {$filter: { |
729 | source: id, |
730 | rel: [{$prefix: 'contact'}] |
731 | }}, |
732 | {$map: { |
733 | following: ['rel', 1], |
734 | feed: 'dest' |
735 | }} |
736 | ] |
737 | }), |
738 | pull.drain(function (link) { |
739 | if (link.sync) return ready() |
740 | follows[link.feed] = link.following |
741 | }, ready) |
742 | ) |
743 | } |
744 | |
745 | App.prototype._getVotes = function (id, cb) { |
746 | var votes = {} |
747 | pull( |
748 | this.sbot.links2.read({ |
749 | query: [ |
750 | {$filter: { |
751 | dest: id, |
752 | rel: [{$prefix: 'vote'}] |
753 | }}, |
754 | {$map: { |
755 | value: ['rel', 1], |
756 | author: 'source' |
757 | }} |
758 | ] |
759 | }), |
760 | pull.drain(function (vote) { |
761 | votes[vote.author] = vote.value |
762 | }, function (err) { |
763 | cb(err, votes) |
764 | }) |
765 | ) |
766 | } |
767 |
Built with git-ssb-web