git ssb

16+

cel / patchfoo



Tree: 5b924278be5b8a04c95c2e112fbf5827b0244697

Files: 5b924278be5b8a04c95c2e112fbf5827b0244697 / lib / app.js

20265 bytesRaw
1var http = require('http')
2var memo = require('asyncmemo')
3var lru = require('hashlru')
4var pkg = require('../package')
5var u = require('./util')
6var pull = require('pull-stream')
7var multicb = require('multicb')
8var paramap = require('pull-paramap')
9var Contacts = require('ssb-contact')
10var About = require('./about')
11var Serve = require('./serve')
12var Render = require('./render')
13var Git = require('./git')
14var cat = require('pull-cat')
15var proc = require('child_process')
16var toPull = require('stream-to-pull-stream')
17var BoxStream = require('pull-box-stream')
18var crypto = require('crypto')
19
20var zeros = new Buffer(24); zeros.fill(0)
21
22module.exports = App
23
24function App(sbot, config) {
25 this.sbot = sbot
26 this.config = config
27
28 var conf = config.patchfoo || {}
29 this.port = conf.port || 8027
30 this.host = conf.host || 'localhost'
31 this.msgFilter = conf.filter
32
33 var base = conf.base || '/'
34 this.opts = {
35 base: base,
36 blob_base: conf.blob_base || conf.img_base || base,
37 img_base: conf.img_base || base,
38 emoji_base: conf.emoji_base || (base + 'emoji/'),
39 encode_msgids: conf.encode_msgids == null ? true : Boolean(conf.encode_msgids),
40 }
41
42 sbot.get = memo({cache: lru(100)}, sbot.get)
43 this.about = new About(this, sbot.id)
44 this.getMsg = memo({cache: lru(100)}, getMsgWithValue, sbot)
45 this.getAbout = memo({cache: this.aboutCache = lru(500)},
46 this._getAbout.bind(this))
47 this.unboxContent = memo({cache: lru(100)}, sbot.private.unbox)
48 this.reverseNameCache = lru(500)
49 this.reverseEmojiNameCache = lru(500)
50 this.getBlobSize = memo({cache: this.blobSizeCache = lru(100)},
51 sbot.blobs.size.bind(sbot.blobs))
52 this.getFollows = memo(this._getFollows.bind(this))
53 this.getVotes = memo({cache: lru(100)}, this._getVotes.bind(this))
54
55 this.unboxMsg = this.unboxMsg.bind(this)
56
57 this.render = new Render(this, this.opts)
58 this.git = new Git(this)
59
60 this.monitorBlobWants()
61}
62
63App.prototype.go = function () {
64 var self = this
65 var server = http.createServer(function (req, res) {
66 new Serve(self, req, res).go()
67 })
68 if (self.host === 'localhost') server.listen(self.port, onListening)
69 else server.listen(self.port, self.host, onListening)
70 function onListening() {
71 var host = /:/.test(self.host) ? '[' + self.host + ']' : self.host
72 self.log('Listening on http://' + host + ':' + self.port)
73 }
74
75 // invalidate cached About info when new About messages come in
76 pull(
77 self.sbot.links({rel: 'about', old: false, values: true}),
78 pull.drain(function (link) {
79 self.aboutCache.remove(link.dest)
80 }, function (err) {
81 if (err) throw err
82 })
83 )
84
85 // keep alive ssb client connection
86 setInterval(self.sbot.whoami, 10e3)
87}
88
89var logPrefix = '[' + pkg.name + ']'
90App.prototype.log = console.log.bind(console, logPrefix)
91App.prototype.error = console.error.bind(console, logPrefix)
92
93App.prototype.unboxMsg = function (msg, cb) {
94 var self = this
95 var c = msg.value && msg.value.content
96 if (typeof c !== 'string') cb(null, msg)
97 else self.unboxContent(c, function (err, content) {
98 if (err) {
99 self.error('unbox:', err)
100 return cb(null, msg)
101 } else if (!content) {
102 return cb(null, msg)
103 }
104 var m = {}
105 for (var k in msg) m[k] = msg[k]
106 m.value = {}
107 for (var k in msg.value) m.value[k] = msg.value[k]
108 m.value.content = content
109 m.value.private = true
110 cb(null, m)
111 })
112}
113
114App.prototype.search = function (opts) {
115 var search = this.sbot.fulltext && this.sbot.fulltext.search
116 if (!search) return pull.error(new Error('Missing fulltext search plugin'))
117 return search(opts)
118}
119
120App.prototype.advancedSearch = function (opts) {
121 return pull(
122 opts.channel ?
123 this.sbot.backlinks.read({
124 dest: '#' + opts.channel,
125 reverse: true,
126 })
127 : opts.dest ?
128 this.sbot.links({
129 values: true,
130 dest: opts.dest,
131 source: opts.source || undefined,
132 reverse: true,
133 })
134 : opts.source ?
135 this.sbot.createUserStream({
136 reverse: true,
137 id: opts.source
138 })
139 :
140 this.sbot.createFeedStream({
141 reverse: true,
142 }),
143 opts.text && pull.filter(filterByText(opts.text))
144 )
145}
146
147function forSome(each) {
148 return function some(obj) {
149 if (obj == null) return false
150 if (typeof obj === 'string') return each(obj)
151 if (Array.isArray(obj)) return obj.some(some)
152 if (typeof obj === 'object')
153 for (var k in obj) if (some(obj[k])) return true
154 return false
155 }
156}
157
158function filterByText(str) {
159 if (!str) return function () { return true }
160 var search = new RegExp(str, 'i')
161 var matches = forSome(search.test.bind(search))
162 return function (msg) {
163 var c = msg.value.content
164 return c && matches(c)
165 }
166}
167
168App.prototype.getMsgDecrypted = function (key, cb) {
169 var self = this
170 this.getMsg(key, function (err, msg) {
171 if (err) return cb(err)
172 self.unboxMsg(msg, cb)
173 })
174}
175
176App.prototype.publish = function (content, cb) {
177 var self = this
178 function tryPublish(triesLeft) {
179 if (Array.isArray(content.recps)) {
180 recps = content.recps.map(u.linkDest)
181 self.sbot.private.publish(content, recps, next)
182 } else {
183 self.sbot.publish(content, next)
184 }
185 function next(err, msg) {
186 if (err) {
187 if (triesLeft > 0) {
188 if (/^expected previous:/.test(err.message)) {
189 return tryPublish(triesLeft-1)
190 }
191 }
192 }
193 return cb(err, msg)
194 }
195 }
196 tryPublish(2)
197}
198
199App.prototype.wantSizeBlob = function (id, cb) {
200 // only want() the blob if we don't already have it
201 var self = this
202 var blobs = this.sbot.blobs
203 blobs.size(id, function (err, size) {
204 if (size != null) return cb(null, size)
205 self.blobWants[id] = true
206 blobs.want(id, function (err) {
207 if (err) return cb(err)
208 blobs.size(id, cb)
209 })
210 })
211}
212
213App.prototype.addBlobRaw = function (cb) {
214 var done = multicb({pluck: 1, spread: true})
215 var sink = pull(
216 u.pullLength(done()),
217 this.sbot.blobs.add(done())
218 )
219 done(function (err, size, hash) {
220 if (err) return cb(err)
221 cb(null, {link: hash, size: size})
222 })
223 return sink
224}
225
226App.prototype.addBlob = function (isPrivate, cb) {
227 if (!isPrivate) return this.addBlobRaw(cb)
228 else return this.addBlobPrivate(cb)
229}
230
231App.prototype.addBlobPrivate = function (cb) {
232 var bufs = []
233 var self = this
234 // use the hash of the cleartext as the key to encrypt the blob
235 var hash = crypto.createHash('sha256')
236 return pull.drain(function (buf) {
237 bufs.push(buf)
238 hash.update(buf)
239 }, function (err) {
240 if (err) return cb(err)
241 var secret = hash.digest()
242 pull(
243 pull.values(bufs),
244 BoxStream.createBoxStream(secret, zeros),
245 self.addBlobRaw(function (err, link) {
246 if (err) return cb(err)
247 link.key = secret.toString('base64')
248 cb(null, link)
249 })
250 )
251 })
252}
253
254App.prototype.getBlob = function (id, key) {
255 if (!key) return this.sbot.blobs.get(id)
256 if (typeof key === 'string') key = new Buffer(key, 'base64')
257 return pull(
258 this.sbot.blobs.get(id),
259 BoxStream.createUnboxStream(key, zeros)
260 )
261}
262
263App.prototype.pushBlob = function (id, cb) {
264 console.error('pushing blob', id)
265 this.sbot.blobs.push(id, cb)
266}
267
268App.prototype.readBlob = function (link) {
269 link = u.toLink(link)
270 return this.sbot.blobs.get({
271 hash: link.link,
272 size: link.size,
273 })
274}
275
276App.prototype.readBlobSlice = function (link, opts) {
277 if (this.sbot.blobs.getSlice) return this.sbot.blobs.getSlice({
278 hash: link.link,
279 size: link.size,
280 start: opts.start,
281 end: opts.end,
282 })
283 return pull(
284 this.readBlob(link),
285 u.pullSlice(opts.start, opts.end)
286 )
287}
288
289App.prototype.ensureHasBlobs = function (links, cb) {
290 var self = this
291 var done = multicb({pluck: 1})
292 links.forEach(function (link) {
293 var cb = done()
294 self.sbot.blobs.size(link.link, function (err, size) {
295 if (err) cb(err)
296 else if (size == null) cb(null, link)
297 else cb()
298 })
299 })
300 done(function (err, missingLinks) {
301 if (err) console.trace(err)
302 missingLinks = missingLinks.filter(Boolean)
303 if (missingLinks.length == 0) return cb()
304 return cb({name: 'BlobNotFoundError', links: missingLinks})
305 })
306}
307
308App.prototype.getReverseNameSync = function (name) {
309 var id = this.reverseNameCache.get(name)
310 return id
311}
312
313App.prototype.getReverseEmojiNameSync = function (name) {
314 return this.reverseEmojiNameCache.get(name)
315}
316
317App.prototype.getNameSync = function (name) {
318 var about = this.aboutCache.get(name)
319 return about && about.name
320}
321
322function getMsgWithValue(sbot, id, cb) {
323 if (!id) return cb()
324 sbot.get(id, function (err, value) {
325 if (err) return cb(err)
326 cb(null, {key: id, value: value})
327 })
328}
329
330App.prototype._getAbout = function (id, cb) {
331 var self = this
332 if (!u.isRef(id)) return cb(null, {})
333 self.about.get(id, function (err, about) {
334 if (err) return cb(err)
335 var sigil = id[0] || '@'
336 if (about.name && about.name[0] !== sigil) {
337 about.name = sigil + about.name
338 }
339 self.reverseNameCache.set(about.name, id)
340 cb(null, about)
341 })
342}
343
344App.prototype.pullGetMsg = function (id) {
345 return pull.asyncMap(this.getMsg)(pull.once(id))
346}
347
348App.prototype.createLogStream = function (opts) {
349 opts = opts || {}
350 return opts.sortByTimestamp
351 ? this.createFeedStream(opts)
352 : this.sbot.createLogStream(opts)
353}
354
355App.prototype.createFeedStream = function (opts) {
356 // work around opts.gt being treated as opts.gte sometimes
357 if (opts.gt && opts.limit && !opts.reverse) return pull(
358 this.sbot.createFeedStream(u.mergeOpts(opts, {limit: opts.limit + 1})),
359 pull.filter(function (msg) {
360 return msg && msg.value.timestamp !== opts.gt
361 }),
362 opts.limit && pull.take(opts.limit)
363 )
364 return this.sbot.createFeedStream(opts)
365}
366
367var stateVals = {
368 connected: 3,
369 connecting: 2,
370 disconnecting: 1,
371}
372
373function comparePeers(a, b) {
374 var aState = stateVals[a.state] || 0
375 var bState = stateVals[b.state] || 0
376 return (bState - aState)
377 || (b.stateChange|0 - a.stateChange|0)
378}
379
380App.prototype.streamPeers = function (opts) {
381 var gossip = this.sbot.gossip
382 return u.readNext(function (cb) {
383 gossip.peers(function (err, peers) {
384 if (err) return cb(err)
385 if (opts) peers = peers.filter(function (peer) {
386 for (var k in opts) if (opts[k] !== peer[k]) return false
387 return true
388 })
389 peers.sort(comparePeers)
390 cb(null, pull.values(peers))
391 })
392 })
393}
394
395App.prototype.getFollow = function (source, dest, cb) {
396 var self = this
397 pull(
398 self.sbot.links({source: source, dest: dest, rel: 'contact', reverse: true,
399 values: true, meta: false, keys: false}),
400 pull.filter(function (value) {
401 var c = value && value.content
402 return c && c.type === 'contact'
403 }),
404 pull.take(1),
405 pull.collect(function (err, msgs) {
406 if (err) return cb(err)
407 cb(null, msgs[0] && !!msgs[0].content.following)
408 })
409 )
410}
411
412App.prototype.unboxMessages = function () {
413 return paramap(this.unboxMsg, 16)
414}
415
416App.prototype.streamChannels = function (opts) {
417 return pull(
418 this.sbot.messagesByType({type: 'channel', reverse: true}),
419 this.unboxMessages(),
420 pull.filter(function (msg) {
421 return msg.value.content.subscribed
422 }),
423 pull.map(function (msg) {
424 return msg.value.content.channel
425 }),
426 pull.unique()
427 )
428}
429
430App.prototype.streamMyChannels = function (id, opts) {
431 // use ssb-query plugin if it is available, since it has an index for
432 // author + type
433 if (this.sbot.query) return pull(
434 this.sbot.query.read({
435 reverse: true,
436 query: [
437 {$filter: {
438 value: {
439 author: id,
440 content: {type: 'channel', subscribed: true}
441 }
442 }},
443 {$map: ['value', 'content', 'channel']}
444 ]
445 }),
446 pull.unique()
447 )
448
449 return pull(
450 this.sbot.createUserStream({id: id, reverse: true}),
451 this.unboxMessages(),
452 pull.filter(function (msg) {
453 if (msg.value.content.type == 'channel') {
454 return msg.value.content.subscribed
455 }
456 }),
457 pull.map(function (msg) {
458 return msg.value.content.channel
459 }),
460 pull.unique()
461 )
462}
463
464App.prototype.createContactStreams = function (id) {
465 return new Contacts(this.sbot).createContactStreams(id)
466}
467
468function compareVoted(a, b) {
469 return b.value - a.value
470}
471
472App.prototype.getVoted = function (_opts, cb) {
473 if (isNaN(_opts.limit)) return pull.error(new Error('missing limit'))
474 var self = this
475 var opts = {
476 type: 'vote',
477 limit: _opts.limit * 100,
478 reverse: !!_opts.reverse,
479 gt: _opts.gt || undefined,
480 lt: _opts.lt || undefined,
481 }
482
483 var votedObj = {}
484 var votedArray = []
485 var numItems = 0
486 var firstTimestamp, lastTimestamp
487 pull(
488 self.sbot.messagesByType(opts),
489 self.unboxMessages(),
490 pull.take(function () {
491 return numItems < _opts.limit
492 }),
493 pull.drain(function (msg) {
494 if (!firstTimestamp) firstTimestamp = msg.timestamp
495 lastTimestamp = msg.timestamp
496 var vote = msg.value.content.vote
497 if (!vote) return
498 var target = u.linkDest(vote)
499 var votes = votedObj[target]
500 if (!votes) {
501 numItems++
502 votes = {id: target, value: 0, feedsObj: {}, feeds: []}
503 votedObj[target] = votes
504 votedArray.push(votes)
505 }
506 if (msg.value.author in votes.feedsObj) {
507 if (!opts.reverse) return // leave latest vote value as-is
508 // remove old vote value
509 votes.value -= votes.feedsObj[msg.value.author]
510 } else {
511 votes.feeds.push(msg.value.author)
512 }
513 var value = vote.value > 0 ? 1 : vote.value < 0 ? -1 : 0
514 votes.feedsObj[msg.value.author] = value
515 votes.value += value
516 }, function (err) {
517 if (err && err !== true) return cb(err)
518 var items = votedArray
519 if (opts.reverse) items.reverse()
520 items.sort(compareVoted)
521 cb(null, {items: items,
522 firstTimestamp: firstTimestamp,
523 lastTimestamp: lastTimestamp})
524 })
525 )
526}
527
528App.prototype.createAboutStreams = function (id) {
529 return this.about.createAboutStreams(id)
530}
531
532App.prototype.streamEmojis = function () {
533 return pull(
534 cat([
535 this.sbot.links({
536 rel: 'mentions',
537 source: this.sbot.id,
538 dest: '&',
539 values: true
540 }),
541 this.sbot.links({rel: 'mentions', dest: '&', values: true})
542 ]),
543 this.unboxMessages(),
544 pull.map(function (msg) { return msg.value.content.mentions }),
545 pull.flatten(),
546 pull.filter('emoji'),
547 pull.unique('link')
548 )
549}
550
551App.prototype.filter = function (plugin, opts, filter) {
552 // work around flumeview-query not picking the best index.
553 // %b+QdyLFQ21UGYwvV3AiD8FEr7mKlB8w9xx3h8WzSUb0=.sha256
554 var index
555 if (plugin === this.sbot.backlinks) {
556 var c = filter && filter.value && filter.value.content
557 var filteringByType = c && c.type
558 if (!filteringByType) index = 'DTS'
559 }
560 // work around flumeview-query not supporting $lt/$gt.
561 // %FCIv0D7JQyERznC18p8Dc1KtN6SLeJAl1sR5DAIr/Ek=.sha256
562 return pull(
563 plugin.read({
564 index: index,
565 reverse: opts.reverse,
566 limit: opts.limit && (opts.limit + 1),
567 query: [{$filter: u.mergeOpts(filter, {
568 timestamp: {
569 $gte: opts.gt,
570 $lte: opts.lt,
571 }
572 })}]
573 }),
574 pull.filter(function (msg) {
575 return msg && msg.timestamp !== opts.lt && msg.timestamp !== opts.gt
576 }),
577 opts.limit && pull.take(opts.limit)
578 )
579}
580
581App.prototype.streamChannel = function (opts) {
582 // prefer ssb-backlinks to ssb-query because it also handles hashtag mentions
583 if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, {
584 dest: '#' + opts.channel,
585 })
586
587 if (this.sbot.query) return this.filter(this.sbot.query, opts, {
588 value: {content: {channel: opts.channel}},
589 })
590
591 return pull.error(new Error(
592 'Viewing channels/tags requires the ssb-backlinks or ssb-query plugin'))
593}
594
595App.prototype.streamMentions = function (opts) {
596 if (!this.sbot.backlinks) return pull.error(new Error(
597 'Viewing mentions requires the ssb-backlinks plugin'))
598
599 if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, {
600 dest: this.sbot.id,
601 })
602}
603
604App.prototype.streamPrivate = function (opts) {
605 if (this.sbot.private.read) return this.filter(this.sbot.private, opts, {})
606
607 return pull(
608 this.createLogStream(u.mergeOpts(opts, {limit: null})),
609 pull.filter(u.isMsgEncrypted),
610 this.unboxMessages(),
611 pull.filter(u.isMsgReadable),
612 pull.take(opts.limit)
613 )
614}
615
616App.prototype.blobMentions = function (opts) {
617 if (!this.sbot.links2) return pull.error(new Error(
618 'missing ssb-links plugin'))
619 var filter = {rel: ['mentions', opts.name]}
620 if (opts.author) filter.source = opts.author
621 return this.sbot.links2.read({
622 query: [
623 {$filter: filter},
624 {$filter: {dest: {$prefix: '&'}}},
625 {$map: {
626 name: ['rel', 1],
627 size: ['rel', 2],
628 link: 'dest',
629 author: 'source',
630 time: 'ts'
631 }}
632 ]
633 })
634}
635
636App.prototype.monitorBlobWants = function () {
637 var self = this
638 self.blobWants = {}
639 pull(
640 this.sbot.blobs.createWants(),
641 pull.drain(function (wants) {
642 for (var id in wants) {
643 if (wants[id] < 0) self.blobWants[id] = true
644 else delete self.blobWants[id]
645 self.blobSizeCache.remove(id)
646 }
647 }, function (err) {
648 if (err) console.trace(err)
649 })
650 )
651}
652
653App.prototype.getBlobState = function (id, cb) {
654 var self = this
655 if (self.blobWants[id]) return cb(null, 'wanted')
656 self.getBlobSize(id, function (err, size) {
657 if (err) return cb(err)
658 cb(null, size != null)
659 })
660}
661
662App.prototype.getNpmReadme = function (tarballId, cb) {
663 var self = this
664 // TODO: make this portable, and handle plaintext readmes
665 var tar = proc.spawn('tar', ['--ignore-case', '-Oxz',
666 'package/README.md', 'package/readme.markdown', 'package/readme.mkd'])
667 var done = multicb({pluck: 1, spread: true})
668 pull(
669 self.sbot.blobs.get(tarballId),
670 toPull.sink(tar.stdin, done())
671 )
672 pull(
673 toPull.source(tar.stdout),
674 pull.collect(done())
675 )
676 done(function (err, _, bufs) {
677 if (err) return cb(err)
678 var text = Buffer.concat(bufs).toString('utf8')
679 cb(null, text, true)
680 })
681}
682
683App.prototype.filterMsg = function (msg, opts, cb) {
684 var self = this
685 var myId = self.sbot.id
686 var author = msg.value && msg.value.author
687 var filter = opts.filter || self.msgFilter
688 var show = (filter !== 'invert')
689 if (filter === 'all'
690 || author === myId
691 || author === opts.feed
692 || msg.key === opts.msgId) return cb(null, show)
693 self.getFollows(myId, function (err, follows) {
694 if (err) return cb(err)
695 if (follows[author]) return cb(null, show)
696 self.getVotes(msg.key, function (err, votes) {
697 if (err) return cb(err)
698 for (var author in votes) {
699 if (follows[author] && votes[author] > 0) {
700 return cb(null, show)
701 }
702 }
703 return cb(null, !show)
704 })
705 })
706}
707
708App.prototype.isFollowing = function (src, dest, cb) {
709 var self = this
710 self.getFollows(src, function (err, follows) {
711 if (err) return cb(err)
712 return cb(null, follows[dest])
713 })
714}
715
716App.prototype._getFollows = function (id, cb) {
717 var follows = {}
718 function ready(err) {
719 if (!cb) return
720 var _cb = cb
721 cb = null
722 _cb(err, follows)
723 }
724 pull(
725 this.sbot.links2.read({
726 live: true,
727 query: [
728 {$filter: {
729 source: id,
730 rel: [{$prefix: 'contact'}]
731 }},
732 {$map: {
733 following: ['rel', 1],
734 feed: 'dest'
735 }}
736 ]
737 }),
738 pull.drain(function (link) {
739 if (link.sync) return ready()
740 follows[link.feed] = link.following
741 }, ready)
742 )
743}
744
745App.prototype._getVotes = function (id, cb) {
746 var votes = {}
747 pull(
748 this.sbot.links2.read({
749 query: [
750 {$filter: {
751 dest: id,
752 rel: [{$prefix: 'vote'}]
753 }},
754 {$map: {
755 value: ['rel', 1],
756 author: 'source'
757 }}
758 ]
759 }),
760 pull.drain(function (vote) {
761 votes[vote.author] = vote.value
762 }, function (err) {
763 cb(err, votes)
764 })
765 )
766}
767

Built with git-ssb-web