git ssb

16+

cel / patchfoo



Tree: 5a8a31c7e4523ef4de4c20e28443f9ea18961ce4

Files: 5a8a31c7e4523ef4de4c20e28443f9ea18961ce4 / lib / app.js

20303 bytesRaw
1var http = require('http')
2var memo = require('asyncmemo')
3var lru = require('hashlru')
4var pkg = require('../package')
5var u = require('./util')
6var pull = require('pull-stream')
7var multicb = require('multicb')
8var paramap = require('pull-paramap')
9var getContacts = require('./contact')
10var About = require('./about')
11var Serve = require('./serve')
12var Render = require('./render')
13var Git = require('./git')
14var cat = require('pull-cat')
15var proc = require('child_process')
16var toPull = require('stream-to-pull-stream')
17var BoxStream = require('pull-box-stream')
18var crypto = require('crypto')
19
20var zeros = new Buffer(24); zeros.fill(0)
21
22module.exports = App
23
24function App(sbot, config) {
25 this.sbot = sbot
26 this.config = config
27
28 var conf = config.patchfoo || {}
29 this.port = conf.port || 8027
30 this.host = conf.host || 'localhost'
31 this.msgFilter = conf.filter
32
33 var base = conf.base || '/'
34 this.opts = {
35 base: base,
36 blob_base: conf.blob_base || conf.img_base || base,
37 img_base: conf.img_base || (base + 'image/'),
38 emoji_base: conf.emoji_base || (base + 'emoji/'),
39 encode_msgids: conf.encode_msgids == null ? true : Boolean(conf.encode_msgids),
40 }
41
42 sbot.get = memo({cache: lru(100)}, sbot.get)
43 this.about = new About(this, sbot.id)
44 this.getMsg = memo({cache: lru(100)}, getMsgWithValue, sbot)
45 this.getAbout = memo({cache: this.aboutCache = lru(500)},
46 this._getAbout.bind(this))
47 this.unboxContent = memo({cache: lru(100)}, sbot.private.unbox)
48 this.reverseNameCache = lru(500)
49 this.reverseEmojiNameCache = lru(500)
50 this.getBlobSize = memo({cache: this.blobSizeCache = lru(100)},
51 sbot.blobs.size.bind(sbot.blobs))
52 this.getFollows = memo(this._getFollows.bind(this))
53 this.getVotes = memo({cache: lru(100)}, this._getVotes.bind(this))
54 this.getContacts = getContacts.bind(null, this.sbot)
55
56 this.unboxMsg = this.unboxMsg.bind(this)
57
58 this.render = new Render(this, this.opts)
59 this.git = new Git(this)
60
61 this.monitorBlobWants()
62}
63
64App.prototype.go = function () {
65 var self = this
66 var server = http.createServer(function (req, res) {
67 new Serve(self, req, res).go()
68 })
69 if (self.host === 'localhost') server.listen(self.port, onListening)
70 else server.listen(self.port, self.host, onListening)
71 function onListening() {
72 var host = /:/.test(self.host) ? '[' + self.host + ']' : self.host
73 self.log('Listening on http://' + host + ':' + self.port)
74 }
75
76 // invalidate cached About info when new About messages come in
77 pull(
78 self.sbot.links({rel: 'about', old: false, values: true}),
79 pull.drain(function (link) {
80 self.aboutCache.remove(link.dest)
81 }, function (err) {
82 if (err) throw err
83 })
84 )
85
86 // keep alive ssb client connection
87 setInterval(self.sbot.whoami, 10e3)
88}
89
90var logPrefix = '[' + pkg.name + ']'
91App.prototype.log = console.log.bind(console, logPrefix)
92App.prototype.error = console.error.bind(console, logPrefix)
93
94App.prototype.unboxMsg = function (msg, cb) {
95 var self = this
96 var c = msg.value && msg.value.content
97 if (typeof c !== 'string') cb(null, msg)
98 else self.unboxContent(c, function (err, content) {
99 if (err) {
100 self.error('unbox:', err)
101 return cb(null, msg)
102 } else if (!content) {
103 return cb(null, msg)
104 }
105 var m = {}
106 for (var k in msg) m[k] = msg[k]
107 m.value = {}
108 for (var k in msg.value) m.value[k] = msg.value[k]
109 m.value.content = content
110 m.value.private = true
111 cb(null, m)
112 })
113}
114
115App.prototype.search = function (opts) {
116 var search = this.sbot.fulltext && this.sbot.fulltext.search
117 if (!search) return pull.error(new Error('Missing fulltext search plugin'))
118 return search(opts)
119}
120
121App.prototype.advancedSearch = function (opts) {
122 return pull(
123 opts.channel ?
124 this.sbot.backlinks.read({
125 dest: '#' + opts.channel,
126 reverse: true,
127 })
128 : opts.dest ?
129 this.sbot.links({
130 values: true,
131 dest: opts.dest,
132 source: opts.source || undefined,
133 reverse: true,
134 })
135 : opts.source ?
136 this.sbot.createUserStream({
137 reverse: true,
138 id: opts.source
139 })
140 :
141 this.sbot.createFeedStream({
142 reverse: true,
143 }),
144 this.unboxMessages(),
145 opts.text && pull.filter(filterByText(opts.text))
146 )
147}
148
149function forSome(each) {
150 return function some(obj) {
151 if (obj == null) return false
152 if (typeof obj === 'string') return each(obj)
153 if (Array.isArray(obj)) return obj.some(some)
154 if (typeof obj === 'object')
155 for (var k in obj) if (some(obj[k])) return true
156 return false
157 }
158}
159
160function filterByText(str) {
161 if (!str) return function () { return true }
162 var search = new RegExp(str, 'i')
163 var matches = forSome(search.test.bind(search))
164 return function (msg) {
165 var c = msg.value.content
166 return c && matches(c)
167 }
168}
169
170App.prototype.getMsgDecrypted = function (key, cb) {
171 var self = this
172 this.getMsg(key, function (err, msg) {
173 if (err) return cb(err)
174 self.unboxMsg(msg, cb)
175 })
176}
177
178App.prototype.publish = function (content, cb) {
179 var self = this
180 function tryPublish(triesLeft) {
181 if (Array.isArray(content.recps)) {
182 recps = content.recps.map(u.linkDest)
183 self.sbot.private.publish(content, recps, next)
184 } else {
185 self.sbot.publish(content, next)
186 }
187 function next(err, msg) {
188 if (err) {
189 if (triesLeft > 0) {
190 if (/^expected previous:/.test(err.message)) {
191 return tryPublish(triesLeft-1)
192 }
193 }
194 }
195 return cb(err, msg)
196 }
197 }
198 tryPublish(2)
199}
200
201App.prototype.wantSizeBlob = function (id, cb) {
202 // only want() the blob if we don't already have it
203 var self = this
204 var blobs = this.sbot.blobs
205 blobs.size(id, function (err, size) {
206 if (size != null) return cb(null, size)
207 self.blobWants[id] = true
208 blobs.want(id, function (err) {
209 if (err) return cb(err)
210 blobs.size(id, cb)
211 })
212 })
213}
214
215App.prototype.addBlobRaw = function (cb) {
216 var done = multicb({pluck: 1, spread: true})
217 var sink = pull(
218 u.pullLength(done()),
219 this.sbot.blobs.add(done())
220 )
221 done(function (err, size, hash) {
222 if (err) return cb(err)
223 cb(null, {link: hash, size: size})
224 })
225 return sink
226}
227
228App.prototype.addBlob = function (isPrivate, cb) {
229 if (!isPrivate) return this.addBlobRaw(cb)
230 else return this.addBlobPrivate(cb)
231}
232
233App.prototype.addBlobPrivate = function (cb) {
234 var bufs = []
235 var self = this
236 // use the hash of the cleartext as the key to encrypt the blob
237 var hash = crypto.createHash('sha256')
238 return pull.drain(function (buf) {
239 bufs.push(buf)
240 hash.update(buf)
241 }, function (err) {
242 if (err) return cb(err)
243 var secret = hash.digest()
244 pull(
245 pull.values(bufs),
246 BoxStream.createBoxStream(secret, zeros),
247 self.addBlobRaw(function (err, link) {
248 if (err) return cb(err)
249 link.key = secret.toString('base64')
250 cb(null, link)
251 })
252 )
253 })
254}
255
256App.prototype.getBlob = function (id, key) {
257 if (!key) return this.sbot.blobs.get(id)
258 if (typeof key === 'string') key = new Buffer(key, 'base64')
259 return pull(
260 this.sbot.blobs.get(id),
261 BoxStream.createUnboxStream(key, zeros)
262 )
263}
264
265App.prototype.pushBlob = function (id, cb) {
266 console.error('pushing blob', id)
267 this.sbot.blobs.push(id, cb)
268}
269
270App.prototype.readBlob = function (link) {
271 link = u.toLink(link)
272 return this.sbot.blobs.get({
273 hash: link.link,
274 size: link.size,
275 })
276}
277
278App.prototype.readBlobSlice = function (link, opts) {
279 if (this.sbot.blobs.getSlice) return this.sbot.blobs.getSlice({
280 hash: link.link,
281 size: link.size,
282 start: opts.start,
283 end: opts.end,
284 })
285 return pull(
286 this.readBlob(link),
287 u.pullSlice(opts.start, opts.end)
288 )
289}
290
291App.prototype.ensureHasBlobs = function (links, cb) {
292 var self = this
293 var done = multicb({pluck: 1})
294 links.forEach(function (link) {
295 var cb = done()
296 self.sbot.blobs.size(link.link, function (err, size) {
297 if (err) cb(err)
298 else if (size == null) cb(null, link)
299 else cb()
300 })
301 })
302 done(function (err, missingLinks) {
303 if (err) console.trace(err)
304 missingLinks = missingLinks.filter(Boolean)
305 if (missingLinks.length == 0) return cb()
306 return cb({name: 'BlobNotFoundError', links: missingLinks})
307 })
308}
309
310App.prototype.getReverseNameSync = function (name) {
311 var id = this.reverseNameCache.get(name)
312 return id
313}
314
315App.prototype.getReverseEmojiNameSync = function (name) {
316 return this.reverseEmojiNameCache.get(name)
317}
318
319App.prototype.getNameSync = function (name) {
320 var about = this.aboutCache.get(name)
321 return about && about.name
322}
323
324function getMsgWithValue(sbot, id, cb) {
325 if (!id) return cb()
326 sbot.get(id, function (err, value) {
327 if (err) return cb(err)
328 cb(null, {key: id, value: value})
329 })
330}
331
332App.prototype._getAbout = function (id, cb) {
333 var self = this
334 if (!u.isRef(id)) return cb(null, {})
335 self.about.get(id, function (err, about) {
336 if (err) return cb(err)
337 var sigil = id[0] || '@'
338 if (about.name && about.name[0] !== sigil) {
339 about.name = sigil + about.name
340 }
341 self.reverseNameCache.set(about.name, id)
342 cb(null, about)
343 })
344}
345
346App.prototype.pullGetMsg = function (id) {
347 return pull.asyncMap(this.getMsg)(pull.once(id))
348}
349
350App.prototype.createLogStream = function (opts) {
351 opts = opts || {}
352 return opts.sortByTimestamp
353 ? this.createFeedStream(opts)
354 : this.sbot.createLogStream(opts)
355}
356
357App.prototype.createFeedStream = function (opts) {
358 // work around opts.gt being treated as opts.gte sometimes
359 var limit = Number(opts.limit)
360 if (opts.gt && limit && !opts.reverse) return pull(
361 this.sbot.createFeedStream(u.mergeOpts(opts, {limit: opts.limit + 1})),
362 pull.filter(function (msg) {
363 return msg && msg.value.timestamp !== opts.gt
364 }),
365 limit && pull.take(limit)
366 )
367 return this.sbot.createFeedStream(opts)
368}
369
370var stateVals = {
371 connected: 3,
372 connecting: 2,
373 disconnecting: 1,
374}
375
376function comparePeers(a, b) {
377 var aState = stateVals[a.state] || 0
378 var bState = stateVals[b.state] || 0
379 return (bState - aState)
380 || (b.stateChange|0 - a.stateChange|0)
381}
382
383App.prototype.streamPeers = function (opts) {
384 var gossip = this.sbot.gossip
385 return u.readNext(function (cb) {
386 gossip.peers(function (err, peers) {
387 if (err) return cb(err)
388 if (opts) peers = peers.filter(function (peer) {
389 for (var k in opts) if (opts[k] !== peer[k]) return false
390 return true
391 })
392 peers.sort(comparePeers)
393 cb(null, pull.values(peers))
394 })
395 })
396}
397
398App.prototype.getContact = function (source, dest, cb) {
399 var self = this
400 pull(
401 self.sbot.links({source: source, dest: dest, rel: 'contact',
402 values: true, meta: false, keys: false}),
403 pull.filter(function (value) {
404 var c = value && value.content
405 return c && c.type === 'contact'
406 }),
407 pull.reduce(function (acc, value) {
408 // trinary logic from ssb-friends
409 return value.content.following ? true
410 : value.content.flagged || value.content.blocking ? false
411 : acc
412 }, null, cb)
413 )
414}
415
416App.prototype.unboxMessages = function () {
417 return paramap(this.unboxMsg, 16)
418}
419
420App.prototype.streamChannels = function (opts) {
421 return pull(
422 this.sbot.messagesByType({type: 'channel', reverse: true}),
423 this.unboxMessages(),
424 pull.filter(function (msg) {
425 return msg.value.content.subscribed
426 }),
427 pull.map(function (msg) {
428 return msg.value.content.channel
429 }),
430 pull.unique()
431 )
432}
433
434App.prototype.streamMyChannels = function (id, opts) {
435 // use ssb-query plugin if it is available, since it has an index for
436 // author + type
437 if (this.sbot.query) return pull(
438 this.sbot.query.read({
439 reverse: true,
440 query: [
441 {$filter: {
442 value: {
443 author: id,
444 content: {type: 'channel', subscribed: true}
445 }
446 }},
447 {$map: ['value', 'content', 'channel']}
448 ]
449 }),
450 pull.unique()
451 )
452
453 return pull(
454 this.sbot.createUserStream({id: id, reverse: true}),
455 this.unboxMessages(),
456 pull.filter(function (msg) {
457 if (msg.value.content.type == 'channel') {
458 return msg.value.content.subscribed
459 }
460 }),
461 pull.map(function (msg) {
462 return msg.value.content.channel
463 }),
464 pull.unique()
465 )
466}
467
468function compareVoted(a, b) {
469 return b.value - a.value
470}
471
472App.prototype.getVoted = function (_opts, cb) {
473 if (isNaN(_opts.limit)) return pull.error(new Error('missing limit'))
474 var self = this
475 var opts = {
476 type: 'vote',
477 limit: _opts.limit * 100,
478 reverse: !!_opts.reverse,
479 gt: _opts.gt || undefined,
480 lt: _opts.lt || undefined,
481 }
482
483 var votedObj = {}
484 var votedArray = []
485 var numItems = 0
486 var firstTimestamp, lastTimestamp
487 pull(
488 self.sbot.messagesByType(opts),
489 self.unboxMessages(),
490 pull.take(function () {
491 return numItems < _opts.limit
492 }),
493 pull.drain(function (msg) {
494 if (!firstTimestamp) firstTimestamp = msg.timestamp
495 lastTimestamp = msg.timestamp
496 var vote = msg.value.content.vote
497 if (!vote) return
498 var target = u.linkDest(vote)
499 var votes = votedObj[target]
500 if (!votes) {
501 numItems++
502 votes = {id: target, value: 0, feedsObj: {}, feeds: []}
503 votedObj[target] = votes
504 votedArray.push(votes)
505 }
506 if (msg.value.author in votes.feedsObj) {
507 if (!opts.reverse) return // leave latest vote value as-is
508 // remove old vote value
509 votes.value -= votes.feedsObj[msg.value.author]
510 } else {
511 votes.feeds.push(msg.value.author)
512 }
513 var value = vote.value > 0 ? 1 : vote.value < 0 ? -1 : 0
514 votes.feedsObj[msg.value.author] = value
515 votes.value += value
516 }, function (err) {
517 if (err && err !== true) return cb(err)
518 var items = votedArray
519 if (opts.reverse) items.reverse()
520 items.sort(compareVoted)
521 cb(null, {items: items,
522 firstTimestamp: firstTimestamp,
523 lastTimestamp: lastTimestamp})
524 })
525 )
526}
527
528App.prototype.createAboutStreams = function (id) {
529 return this.about.createAboutStreams(id)
530}
531
532App.prototype.streamEmojis = function () {
533 return pull(
534 cat([
535 this.sbot.links({
536 rel: 'mentions',
537 source: this.sbot.id,
538 dest: '&',
539 values: true
540 }),
541 this.sbot.links({rel: 'mentions', dest: '&', values: true})
542 ]),
543 this.unboxMessages(),
544 pull.map(function (msg) { return msg.value.content.mentions }),
545 pull.flatten(),
546 pull.filter('emoji'),
547 pull.unique('link')
548 )
549}
550
551App.prototype.filter = function (plugin, opts, filter) {
552 // work around flumeview-query not picking the best index.
553 // %b+QdyLFQ21UGYwvV3AiD8FEr7mKlB8w9xx3h8WzSUb0=.sha256
554 var limit = Number(opts.limit)
555 var index
556 if (plugin === this.sbot.backlinks) {
557 var c = filter && filter.value && filter.value.content
558 var filteringByType = c && c.type
559 if (!filteringByType) index = 'DTS'
560 }
561 // work around flumeview-query not supporting $lt/$gt.
562 // %FCIv0D7JQyERznC18p8Dc1KtN6SLeJAl1sR5DAIr/Ek=.sha256
563 return pull(
564 plugin.read({
565 index: index,
566 reverse: opts.reverse,
567 limit: limit ? (limit + 1) : undefined,
568 query: [{$filter: u.mergeOpts(filter, {
569 timestamp: {
570 $gte: opts.gt,
571 $lte: opts.lt,
572 }
573 })}]
574 }),
575 pull.filter(function (msg) {
576 return msg && msg.timestamp !== opts.lt && msg.timestamp !== opts.gt
577 }),
578 limit && pull.take(limit)
579 )
580}
581
582App.prototype.streamChannel = function (opts) {
583 // prefer ssb-backlinks to ssb-query because it also handles hashtag mentions
584 if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, {
585 dest: '#' + opts.channel,
586 })
587
588 if (this.sbot.query) return this.filter(this.sbot.query, opts, {
589 value: {content: {channel: opts.channel}},
590 })
591
592 return pull.error(new Error(
593 'Viewing channels/tags requires the ssb-backlinks or ssb-query plugin'))
594}
595
596App.prototype.streamMentions = function (opts) {
597 if (!this.sbot.backlinks) return pull.error(new Error(
598 'Viewing mentions requires the ssb-backlinks plugin'))
599
600 if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, {
601 dest: this.sbot.id,
602 })
603}
604
605App.prototype.streamPrivate = function (opts) {
606 if (this.sbot.private.read) return this.filter(this.sbot.private, opts, {})
607
608 return pull(
609 this.createLogStream(u.mergeOpts(opts)),
610 pull.filter(u.isMsgEncrypted),
611 this.unboxMessages(),
612 pull.filter(u.isMsgReadable)
613 )
614}
615
616App.prototype.blobMentions = function (opts) {
617 if (!this.sbot.links2) return pull.error(new Error(
618 'missing ssb-links plugin'))
619 var filter = {rel: ['mentions', opts.name]}
620 if (opts.author) filter.source = opts.author
621 return this.sbot.links2.read({
622 query: [
623 {$filter: filter},
624 {$filter: {dest: {$prefix: '&'}}},
625 {$map: {
626 name: ['rel', 1],
627 size: ['rel', 2],
628 link: 'dest',
629 author: 'source',
630 time: 'ts'
631 }}
632 ]
633 })
634}
635
636App.prototype.monitorBlobWants = function () {
637 var self = this
638 self.blobWants = {}
639 pull(
640 this.sbot.blobs.createWants(),
641 pull.drain(function (wants) {
642 for (var id in wants) {
643 if (wants[id] < 0) self.blobWants[id] = true
644 else delete self.blobWants[id]
645 self.blobSizeCache.remove(id)
646 }
647 }, function (err) {
648 if (err) console.trace(err)
649 })
650 )
651}
652
653App.prototype.getBlobState = function (id, cb) {
654 var self = this
655 if (self.blobWants[id]) return cb(null, 'wanted')
656 self.getBlobSize(id, function (err, size) {
657 if (err) return cb(err)
658 cb(null, size != null)
659 })
660}
661
662App.prototype.getNpmReadme = function (tarballId, cb) {
663 var self = this
664 // TODO: make this portable, and handle plaintext readmes
665 var tar = proc.spawn('tar', ['--ignore-case', '-Oxz',
666 'package/README.md', 'package/readme.markdown', 'package/readme.mkd'])
667 var done = multicb({pluck: 1, spread: true})
668 pull(
669 self.sbot.blobs.get(tarballId),
670 toPull.sink(tar.stdin, done())
671 )
672 pull(
673 toPull.source(tar.stdout),
674 pull.collect(done())
675 )
676 done(function (err, _, bufs) {
677 if (err) return cb(err)
678 var text = Buffer.concat(bufs).toString('utf8')
679 cb(null, text, true)
680 })
681}
682
683App.prototype.filterMsg = function (msg, opts, cb) {
684 var self = this
685 var myId = self.sbot.id
686 var author = msg.value && msg.value.author
687 var filter = opts.filter || self.msgFilter
688 var show = (filter !== 'invert')
689 if (filter === 'all'
690 || author === myId
691 || author === opts.feed
692 || msg.key === opts.msgId) return cb(null, show)
693 self.getFollows(myId, function (err, follows) {
694 if (err) return cb(err)
695 if (follows[author]) return cb(null, show)
696 self.getVotes(msg.key, function (err, votes) {
697 if (err) return cb(err)
698 for (var author in votes) {
699 if (follows[author] && votes[author] > 0) {
700 return cb(null, show)
701 }
702 }
703 return cb(null, !show)
704 })
705 })
706}
707
708App.prototype.isFollowing = function (src, dest, cb) {
709 var self = this
710 self.getFollows(src, function (err, follows) {
711 if (err) return cb(err)
712 return cb(null, follows[dest])
713 })
714}
715
716App.prototype._getFollows = function (id, cb) {
717 var follows = {}
718 function ready(err) {
719 if (!cb) return
720 var _cb = cb
721 cb = null
722 _cb(err, follows)
723 }
724 pull(
725 this.sbot.links2.read({
726 live: true,
727 query: [
728 {$filter: {
729 source: id,
730 rel: [{$prefix: 'contact'}]
731 }},
732 {$map: {
733 following: ['rel', 1],
734 feed: 'dest'
735 }}
736 ]
737 }),
738 pull.drain(function (link) {
739 if (link.sync) return ready()
740 follows[link.feed] = link.following
741 }, ready)
742 )
743}
744
745App.prototype._getVotes = function (id, cb) {
746 var votes = {}
747 pull(
748 this.sbot.links2.read({
749 query: [
750 {$filter: {
751 dest: id,
752 rel: [{$prefix: 'vote'}]
753 }},
754 {$map: {
755 value: ['rel', 1],
756 author: 'source'
757 }}
758 ]
759 }),
760 pull.drain(function (vote) {
761 votes[vote.author] = vote.value
762 }, function (err) {
763 cb(err, votes)
764 })
765 )
766}
767

Built with git-ssb-web