git ssb

16+

cel / patchfoo



Tree: c329eae0e6ff3d36d7f21349980d51d9dd587faa

Files: c329eae0e6ff3d36d7f21349980d51d9dd587faa / lib / app.js

20562 bytesRaw
1var http = require('http')
2var memo = require('asyncmemo')
3var lru = require('hashlru')
4var pkg = require('../package')
5var u = require('./util')
6var pull = require('pull-stream')
7var multicb = require('multicb')
8var paramap = require('pull-paramap')
9var Contacts = require('ssb-contact')
10var About = require('./about')
11var Follows = require('./follows')
12var Serve = require('./serve')
13var Render = require('./render')
14var Git = require('./git')
15var cat = require('pull-cat')
16var proc = require('child_process')
17var toPull = require('stream-to-pull-stream')
18var BoxStream = require('pull-box-stream')
19var crypto = require('crypto')
20
21var zeros = new Buffer(24); zeros.fill(0)
22
23module.exports = App
24
25function App(sbot, config) {
26 this.sbot = sbot
27 this.config = config
28
29 var conf = config.patchfoo || {}
30 this.port = conf.port || 8027
31 this.host = conf.host || 'localhost'
32 this.msgFilter = conf.filter
33
34 var base = conf.base || '/'
35 this.opts = {
36 base: base,
37 blob_base: conf.blob_base || conf.img_base || base,
38 img_base: conf.img_base || (base + 'image/'),
39 emoji_base: conf.emoji_base || (base + 'emoji/'),
40 encode_msgids: conf.encode_msgids == null ? true : Boolean(conf.encode_msgids),
41 }
42
43 sbot.get = memo({cache: lru(100)}, sbot.get)
44 this.about = new About(this, sbot.id)
45 this.getMsg = memo({cache: lru(100)}, getMsgWithValue, sbot)
46 this.getAbout = memo({cache: this.aboutCache = lru(500)},
47 this._getAbout.bind(this))
48 this.unboxContent = memo({cache: lru(100)}, sbot.private.unbox)
49 this.reverseNameCache = lru(500)
50 this.reverseEmojiNameCache = lru(500)
51 this.getBlobSize = memo({cache: this.blobSizeCache = lru(100)},
52 sbot.blobs.size.bind(sbot.blobs))
53 this.getVotes = memo({cache: lru(100)}, this._getVotes.bind(this))
54
55 this.unboxMsg = this.unboxMsg.bind(this)
56
57 this.render = new Render(this, this.opts)
58 this.git = new Git(this)
59 this.contacts = new Contacts(this.sbot)
60 this.follows = new Follows(this.sbot, this.contacts)
61
62 this.monitorBlobWants()
63}
64
65App.prototype.go = function () {
66 var self = this
67 var server = http.createServer(function (req, res) {
68 new Serve(self, req, res).go()
69 })
70 if (self.host === 'localhost') server.listen(self.port, onListening)
71 else server.listen(self.port, self.host, onListening)
72 function onListening() {
73 var host = /:/.test(self.host) ? '[' + self.host + ']' : self.host
74 self.log('Listening on http://' + host + ':' + self.port)
75 }
76
77 // invalidate cached About info when new About messages come in
78 pull(
79 self.sbot.links({rel: 'about', old: false, values: true}),
80 pull.drain(function (link) {
81 self.aboutCache.remove(link.dest)
82 }, function (err) {
83 if (err) throw err
84 })
85 )
86
87 // keep alive ssb client connection
88 setInterval(self.sbot.whoami, 10e3)
89}
90
91var logPrefix = '[' + pkg.name + ']'
92App.prototype.log = console.log.bind(console, logPrefix)
93App.prototype.error = console.error.bind(console, logPrefix)
94
95App.prototype.unboxMsg = function (msg, cb) {
96 var self = this
97 var c = msg.value && msg.value.content
98 if (typeof c !== 'string') cb(null, msg)
99 else self.unboxContent(c, function (err, content) {
100 if (err) {
101 self.error('unbox:', err)
102 return cb(null, msg)
103 } else if (!content) {
104 return cb(null, msg)
105 }
106 var m = {}
107 for (var k in msg) m[k] = msg[k]
108 m.value = {}
109 for (var k in msg.value) m.value[k] = msg.value[k]
110 m.value.content = content
111 m.value.private = true
112 cb(null, m)
113 })
114}
115
116App.prototype.search = function (opts) {
117 var search = this.sbot.fulltext && this.sbot.fulltext.search
118 if (!search) return pull.error(new Error('Missing fulltext search plugin'))
119 return search(opts)
120}
121
122App.prototype.advancedSearch = function (opts) {
123 return pull(
124 opts.channel ?
125 this.sbot.backlinks.read({
126 dest: '#' + opts.channel,
127 reverse: true,
128 })
129 : opts.dest ?
130 this.sbot.links({
131 values: true,
132 dest: opts.dest,
133 source: opts.source || undefined,
134 reverse: true,
135 })
136 : opts.source ?
137 this.sbot.createUserStream({
138 reverse: true,
139 id: opts.source
140 })
141 :
142 this.sbot.createFeedStream({
143 reverse: true,
144 }),
145 this.unboxMessages(),
146 opts.text && pull.filter(filterByText(opts.text))
147 )
148}
149
150function forSome(each) {
151 return function some(obj) {
152 if (obj == null) return false
153 if (typeof obj === 'string') return each(obj)
154 if (Array.isArray(obj)) return obj.some(some)
155 if (typeof obj === 'object')
156 for (var k in obj) if (some(obj[k])) return true
157 return false
158 }
159}
160
161function filterByText(str) {
162 if (!str) return function () { return true }
163 var search = new RegExp(str, 'i')
164 var matches = forSome(search.test.bind(search))
165 return function (msg) {
166 var c = msg.value.content
167 return c && matches(c)
168 }
169}
170
171App.prototype.getMsgDecrypted = function (key, cb) {
172 var self = this
173 this.getMsg(key, function (err, msg) {
174 if (err) return cb(err)
175 self.unboxMsg(msg, cb)
176 })
177}
178
179App.prototype.publish = function (content, cb) {
180 var self = this
181 function tryPublish(triesLeft) {
182 if (Array.isArray(content.recps)) {
183 recps = content.recps.map(u.linkDest)
184 self.sbot.private.publish(content, recps, next)
185 } else {
186 self.sbot.publish(content, next)
187 }
188 function next(err, msg) {
189 if (err) {
190 if (triesLeft > 0) {
191 if (/^expected previous:/.test(err.message)) {
192 return tryPublish(triesLeft-1)
193 }
194 }
195 }
196 return cb(err, msg)
197 }
198 }
199 tryPublish(2)
200}
201
202App.prototype.wantSizeBlob = function (id, cb) {
203 // only want() the blob if we don't already have it
204 var self = this
205 var blobs = this.sbot.blobs
206 blobs.size(id, function (err, size) {
207 if (size != null) return cb(null, size)
208 self.blobWants[id] = true
209 blobs.want(id, function (err) {
210 if (err) return cb(err)
211 blobs.size(id, cb)
212 })
213 })
214}
215
216App.prototype.addBlobRaw = function (cb) {
217 var done = multicb({pluck: 1, spread: true})
218 var sink = pull(
219 u.pullLength(done()),
220 this.sbot.blobs.add(done())
221 )
222 done(function (err, size, hash) {
223 if (err) return cb(err)
224 cb(null, {link: hash, size: size})
225 })
226 return sink
227}
228
229App.prototype.addBlob = function (isPrivate, cb) {
230 if (!isPrivate) return this.addBlobRaw(cb)
231 else return this.addBlobPrivate(cb)
232}
233
234App.prototype.addBlobPrivate = function (cb) {
235 var bufs = []
236 var self = this
237 // use the hash of the cleartext as the key to encrypt the blob
238 var hash = crypto.createHash('sha256')
239 return pull.drain(function (buf) {
240 bufs.push(buf)
241 hash.update(buf)
242 }, function (err) {
243 if (err) return cb(err)
244 var secret = hash.digest()
245 pull(
246 pull.values(bufs),
247 BoxStream.createBoxStream(secret, zeros),
248 self.addBlobRaw(function (err, link) {
249 if (err) return cb(err)
250 link.key = secret.toString('base64')
251 cb(null, link)
252 })
253 )
254 })
255}
256
257App.prototype.getBlob = function (id, key) {
258 if (!key) return this.sbot.blobs.get(id)
259 if (typeof key === 'string') key = new Buffer(key, 'base64')
260 return pull(
261 this.sbot.blobs.get(id),
262 BoxStream.createUnboxStream(key, zeros)
263 )
264}
265
266App.prototype.pushBlob = function (id, cb) {
267 console.error('pushing blob', id)
268 this.sbot.blobs.push(id, cb)
269}
270
271App.prototype.readBlob = function (link) {
272 link = u.toLink(link)
273 return this.sbot.blobs.get({
274 hash: link.link,
275 size: link.size,
276 })
277}
278
279App.prototype.readBlobSlice = function (link, opts) {
280 if (this.sbot.blobs.getSlice) return this.sbot.blobs.getSlice({
281 hash: link.link,
282 size: link.size,
283 start: opts.start,
284 end: opts.end,
285 })
286 return pull(
287 this.readBlob(link),
288 u.pullSlice(opts.start, opts.end)
289 )
290}
291
292App.prototype.ensureHasBlobs = function (links, cb) {
293 var self = this
294 var done = multicb({pluck: 1})
295 links.forEach(function (link) {
296 var cb = done()
297 self.sbot.blobs.size(link.link, function (err, size) {
298 if (err) cb(err)
299 else if (size == null) cb(null, link)
300 else cb()
301 })
302 })
303 done(function (err, missingLinks) {
304 if (err) console.trace(err)
305 missingLinks = missingLinks.filter(Boolean)
306 if (missingLinks.length == 0) return cb()
307 return cb({name: 'BlobNotFoundError', links: missingLinks})
308 })
309}
310
311App.prototype.getReverseNameSync = function (name) {
312 var id = this.reverseNameCache.get(name)
313 return id
314}
315
316App.prototype.getReverseEmojiNameSync = function (name) {
317 return this.reverseEmojiNameCache.get(name)
318}
319
320App.prototype.getNameSync = function (name) {
321 var about = this.aboutCache.get(name)
322 return about && about.name
323}
324
325function getMsgWithValue(sbot, id, cb) {
326 if (!id) return cb()
327 sbot.get(id, function (err, value) {
328 if (err) return cb(err)
329 cb(null, {key: id, value: value})
330 })
331}
332
333App.prototype._getAbout = function (id, cb) {
334 var self = this
335 if (!u.isRef(id)) return cb(null, {})
336 self.about.get(id, function (err, about) {
337 if (err) return cb(err)
338 var sigil = id[0] || '@'
339 if (about.name && about.name[0] !== sigil) {
340 about.name = sigil + about.name
341 }
342 self.reverseNameCache.set(about.name, id)
343 cb(null, about)
344 })
345}
346
347App.prototype.pullGetMsg = function (id) {
348 return pull.asyncMap(this.getMsg)(pull.once(id))
349}
350
351App.prototype.createLogStream = function (opts) {
352 opts = opts || {}
353 return opts.sortByTimestamp
354 ? this.createFeedStream(opts)
355 : this.sbot.createLogStream(opts)
356}
357
358App.prototype.createFeedStream = function (opts) {
359 // work around opts.gt being treated as opts.gte sometimes
360 var limit = Number(opts.limit)
361 if (opts.gt && limit && !opts.reverse) return pull(
362 this.sbot.createFeedStream(u.mergeOpts(opts, {limit: opts.limit + 1})),
363 pull.filter(function (msg) {
364 return msg && msg.value.timestamp !== opts.gt
365 }),
366 limit && pull.take(limit)
367 )
368 return this.sbot.createFeedStream(opts)
369}
370
371var stateVals = {
372 connected: 3,
373 connecting: 2,
374 disconnecting: 1,
375}
376
377function comparePeers(a, b) {
378 var aState = stateVals[a.state] || 0
379 var bState = stateVals[b.state] || 0
380 return (bState - aState)
381 || (b.stateChange|0 - a.stateChange|0)
382}
383
384App.prototype.streamPeers = function (opts) {
385 var gossip = this.sbot.gossip
386 return u.readNext(function (cb) {
387 gossip.peers(function (err, peers) {
388 if (err) return cb(err)
389 if (opts) peers = peers.filter(function (peer) {
390 for (var k in opts) if (opts[k] !== peer[k]) return false
391 return true
392 })
393 peers.sort(comparePeers)
394 cb(null, pull.values(peers))
395 })
396 })
397}
398
399App.prototype.getContact = function (source, dest, cb) {
400 var self = this
401 pull(
402 self.sbot.links({source: source, dest: dest, rel: 'contact',
403 values: true, meta: false, keys: false}),
404 pull.filter(function (value) {
405 var c = value && value.content
406 return c && c.type === 'contact'
407 }),
408 pull.reduce(function (acc, value) {
409 // trinary logic from ssb-friends
410 return value.content.following ? true
411 : value.content.flagged || value.content.blocking ? false
412 : acc
413 }, null, cb)
414 )
415}
416
417App.prototype.unboxMessages = function () {
418 return paramap(this.unboxMsg, 16)
419}
420
421App.prototype.streamChannels = function (opts) {
422 return pull(
423 this.sbot.messagesByType({type: 'channel', reverse: true}),
424 this.unboxMessages(),
425 pull.filter(function (msg) {
426 return msg.value.content.subscribed
427 }),
428 pull.map(function (msg) {
429 return msg.value.content.channel
430 }),
431 pull.unique()
432 )
433}
434
435App.prototype.streamMyChannels = function (id, opts) {
436 // use ssb-query plugin if it is available, since it has an index for
437 // author + type
438 if (this.sbot.query) return pull(
439 this.sbot.query.read({
440 reverse: true,
441 query: [
442 {$filter: {
443 value: {
444 author: id,
445 content: {type: 'channel', subscribed: true}
446 }
447 }},
448 {$map: ['value', 'content', 'channel']}
449 ]
450 }),
451 pull.unique()
452 )
453
454 return pull(
455 this.sbot.createUserStream({id: id, reverse: true}),
456 this.unboxMessages(),
457 pull.filter(function (msg) {
458 if (msg.value.content.type == 'channel') {
459 return msg.value.content.subscribed
460 }
461 }),
462 pull.map(function (msg) {
463 return msg.value.content.channel
464 }),
465 pull.unique()
466 )
467}
468
469function compareVoted(a, b) {
470 return b.value - a.value
471}
472
473App.prototype.getVoted = function (_opts, cb) {
474 if (isNaN(_opts.limit)) return pull.error(new Error('missing limit'))
475 var self = this
476 var opts = {
477 type: 'vote',
478 limit: _opts.limit * 100,
479 reverse: !!_opts.reverse,
480 gt: _opts.gt || undefined,
481 lt: _opts.lt || undefined,
482 }
483
484 var votedObj = {}
485 var votedArray = []
486 var numItems = 0
487 var firstTimestamp, lastTimestamp
488 pull(
489 self.sbot.messagesByType(opts),
490 self.unboxMessages(),
491 pull.take(function () {
492 return numItems < _opts.limit
493 }),
494 pull.drain(function (msg) {
495 if (!firstTimestamp) firstTimestamp = msg.timestamp
496 lastTimestamp = msg.timestamp
497 var vote = msg.value.content.vote
498 if (!vote) return
499 var target = u.linkDest(vote)
500 var votes = votedObj[target]
501 if (!votes) {
502 numItems++
503 votes = {id: target, value: 0, feedsObj: {}, feeds: []}
504 votedObj[target] = votes
505 votedArray.push(votes)
506 }
507 if (msg.value.author in votes.feedsObj) {
508 if (!opts.reverse) return // leave latest vote value as-is
509 // remove old vote value
510 votes.value -= votes.feedsObj[msg.value.author]
511 } else {
512 votes.feeds.push(msg.value.author)
513 }
514 var value = vote.value > 0 ? 1 : vote.value < 0 ? -1 : 0
515 votes.feedsObj[msg.value.author] = value
516 votes.value += value
517 }, function (err) {
518 if (err && err !== true) return cb(err)
519 var items = votedArray
520 if (opts.reverse) items.reverse()
521 items.sort(compareVoted)
522 cb(null, {items: items,
523 firstTimestamp: firstTimestamp,
524 lastTimestamp: lastTimestamp})
525 })
526 )
527}
528
529App.prototype.createAboutStreams = function (id) {
530 return this.about.createAboutStreams(id)
531}
532
533App.prototype.streamEmojis = function () {
534 return pull(
535 cat([
536 this.sbot.links({
537 rel: 'mentions',
538 source: this.sbot.id,
539 dest: '&',
540 values: true
541 }),
542 this.sbot.links({rel: 'mentions', dest: '&', values: true})
543 ]),
544 this.unboxMessages(),
545 pull.map(function (msg) { return msg.value.content.mentions }),
546 pull.flatten(),
547 pull.filter('emoji'),
548 pull.unique('link')
549 )
550}
551
552App.prototype.filter = function (plugin, opts, filter) {
553 // work around flumeview-query not picking the best index.
554 // %b+QdyLFQ21UGYwvV3AiD8FEr7mKlB8w9xx3h8WzSUb0=.sha256
555 var limit = Number(opts.limit)
556 var index
557 if (plugin === this.sbot.backlinks) {
558 var c = filter && filter.value && filter.value.content
559 var filteringByType = c && c.type
560 if (!filteringByType) index = 'DTS'
561 }
562 // work around flumeview-query not supporting $lt/$gt.
563 // %FCIv0D7JQyERznC18p8Dc1KtN6SLeJAl1sR5DAIr/Ek=.sha256
564 return pull(
565 plugin.read({
566 index: index,
567 reverse: opts.reverse,
568 limit: limit ? (limit + 1) : undefined,
569 query: [{$filter: u.mergeOpts(filter, {
570 timestamp: {
571 $gte: opts.gt,
572 $lte: opts.lt,
573 }
574 })}]
575 }),
576 pull.filter(function (msg) {
577 return msg && msg.timestamp !== opts.lt && msg.timestamp !== opts.gt
578 }),
579 limit && pull.take(limit)
580 )
581}
582
583App.prototype.streamChannel = function (opts) {
584 // prefer ssb-backlinks to ssb-query because it also handles hashtag mentions
585 if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, {
586 dest: '#' + opts.channel,
587 })
588
589 if (this.sbot.query) return this.filter(this.sbot.query, opts, {
590 value: {content: {channel: opts.channel}},
591 })
592
593 return pull.error(new Error(
594 'Viewing channels/tags requires the ssb-backlinks or ssb-query plugin'))
595}
596
597App.prototype.streamMentions = function (opts) {
598 if (!this.sbot.backlinks) return pull.error(new Error(
599 'Viewing mentions requires the ssb-backlinks plugin'))
600
601 if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, {
602 dest: this.sbot.id,
603 })
604}
605
606App.prototype.streamPrivate = function (opts) {
607 if (this.sbot.private.read) return this.filter(this.sbot.private, opts, {})
608
609 return pull(
610 this.createLogStream(u.mergeOpts(opts)),
611 pull.filter(u.isMsgEncrypted),
612 this.unboxMessages(),
613 pull.filter(u.isMsgReadable)
614 )
615}
616
617App.prototype.blobMentions = function (opts) {
618 if (!this.sbot.links2) return pull.error(new Error(
619 'missing ssb-links plugin'))
620 var filter = {rel: ['mentions', opts.name]}
621 if (opts.author) filter.source = opts.author
622 return this.sbot.links2.read({
623 query: [
624 {$filter: filter},
625 {$filter: {dest: {$prefix: '&'}}},
626 {$map: {
627 name: ['rel', 1],
628 size: ['rel', 2],
629 link: 'dest',
630 author: 'source',
631 time: 'ts'
632 }}
633 ]
634 })
635}
636
637App.prototype.monitorBlobWants = function () {
638 var self = this
639 self.blobWants = {}
640 pull(
641 this.sbot.blobs.createWants(),
642 pull.drain(function (wants) {
643 for (var id in wants) {
644 if (wants[id] < 0) self.blobWants[id] = true
645 else delete self.blobWants[id]
646 self.blobSizeCache.remove(id)
647 }
648 }, function (err) {
649 if (err) console.trace(err)
650 })
651 )
652}
653
654App.prototype.getBlobState = function (id, cb) {
655 var self = this
656 if (self.blobWants[id]) return cb(null, 'wanted')
657 self.getBlobSize(id, function (err, size) {
658 if (err) return cb(err)
659 cb(null, size != null)
660 })
661}
662
663App.prototype.getNpmReadme = function (tarballId, cb) {
664 var self = this
665 // TODO: make this portable, and handle plaintext readmes
666 var tar = proc.spawn('tar', ['--ignore-case', '-Oxz',
667 'package/README.md', 'package/readme.markdown', 'package/readme.mkd'])
668 var done = multicb({pluck: 1, spread: true})
669 pull(
670 self.sbot.blobs.get(tarballId),
671 toPull.sink(tar.stdin, done())
672 )
673 pull(
674 toPull.source(tar.stdout),
675 pull.collect(done())
676 )
677 done(function (err, _, bufs) {
678 if (err) return cb(err)
679 var text = Buffer.concat(bufs).toString('utf8')
680 cb(null, text, true)
681 })
682}
683
684App.prototype.filterMsg = function (msg, opts, cb) {
685 var self = this
686 var myId = self.sbot.id
687 var author = msg.value && msg.value.author
688 var filter = opts.filter || self.msgFilter
689 var show = (filter !== 'invert')
690 if (filter === 'all'
691 || author === myId
692 || author === opts.feed
693 || msg.key === opts.msgId) return cb(null, show)
694 self.follows.getFollows(myId, function (err, follows) {
695 if (err) return cb(err)
696 if (follows[author]) return cb(null, show)
697 self.getVotes(msg.key, function (err, votes) {
698 if (err) return cb(err)
699 for (var author in votes) {
700 if (follows[author] && votes[author] > 0) {
701 return cb(null, show)
702 }
703 }
704 return cb(null, !show)
705 })
706 })
707}
708
709App.prototype.isFollowing = function (src, dest, cb) {
710 var self = this
711 self.follows.getFollows(src, function (err, follows) {
712 if (err) return cb(err)
713 return cb(null, follows[dest])
714 })
715}
716
717App.prototype._getVotes = function (id, cb) {
718 var votes = {}
719 pull(
720 this.sbot.links2.read({
721 query: [
722 {$filter: {
723 dest: id,
724 rel: [{$prefix: 'vote'}]
725 }},
726 {$map: {
727 value: ['rel', 1],
728 author: 'source'
729 }}
730 ]
731 }),
732 pull.drain(function (vote) {
733 votes[vote.author] = vote.value
734 }, function (err) {
735 cb(err, votes)
736 })
737 )
738}
739
740App.prototype.getAddresses = function (id) {
741 if (!this.sbot.backlinks) {
742 if (!this.warned1) {
743 this.warned1 = true
744 console.trace('Getting peer addresses requires the ssb-backlinks plugin')
745 }
746 return pull.empty()
747 }
748 return pull(
749 this.sbot.backlinks.read({
750 reverse: true,
751 query: [
752 {$filter: {
753 dest: id,
754 value: {
755 content: {
756 type: 'pub',
757 address: {
758 key: id,
759 host: {$truthy: true},
760 port: {$truthy: true},
761 }
762 }
763 }
764 }},
765 {$map: ['value', 'content', 'address']}
766 ]
767 }),
768 pull.map(function (addr) {
769 return addr.host + ':' + addr.port
770 }),
771 pull.unique()
772 )
773}
774

Built with git-ssb-web