feed/pull/rollup.jsView |
---|
4 | 4 … | |
5 | 5 … | var pull = require('pull-stream') |
6 | 6 … | var nest = require('depnest') |
7 | 7 … | var extend = require('xtend') |
| 8 … | +var HLRU = require('hashlru') |
8 | 9 … | |
9 | 10 … | exports.needs = nest({ |
10 | 11 … | 'sbot.pull.backlinks': 'first', |
|
11 | 12 … | 'sbot.async.get': 'first', |
15 | 16 … | |
16 | 17 … | exports.gives = nest('feed.pull.rollup', true) |
17 | 18 … | |
18 | 19 … | exports.create = function (api) { |
| 20 … | + |
| 21 … | + |
| 22 … | + var cache = HLRU(100) |
| 23 … | + |
19 | 24 … | return nest('feed.pull.rollup', function (rootFilter) { |
| 25 … | + var seen = new Set() |
20 | 26 … | return pull( |
21 | | - pull.map(msg => api.message.sync.root(msg) || msg.key), |
22 | | - pull.unique(), |
23 | | - Lookup(), |
| 27 … | + pull.map(msg => { |
| 28 … | + if (msg.value) { |
| 29 … | + var root = api.message.sync.root(msg) |
| 30 … | + if (!root) { |
| 31 … | + |
| 32 … | + return msg |
| 33 … | + } else { |
| 34 … | + return root |
| 35 … | + } |
| 36 … | + } |
| 37 … | + }), |
| 38 … | + |
| 39 … | + |
| 40 … | + pull.filter(idOrMsg => { |
| 41 … | + if (idOrMsg) { |
| 42 … | + if (idOrMsg.key) idOrMsg = idOrMsg.key |
| 43 … | + if (typeof idOrMsg === 'string') { |
| 44 … | + var key = idOrMsg |
| 45 … | + if (!seen.has(key)) { |
| 46 … | + seen.add(key) |
| 47 … | + return true |
| 48 … | + } |
| 49 … | + } |
| 50 … | + } |
| 51 … | + }), |
| 52 … | + |
| 53 … | + |
| 54 … | + pull.asyncMap((keyOrMsg, cb) => { |
| 55 … | + if (keyOrMsg.value) { |
| 56 … | + cb(null, keyOrMsg) |
| 57 … | + } else { |
| 58 … | + var key = keyOrMsg |
| 59 … | + if (cache.has(key)) { |
| 60 … | + cb(null, cache.get(key)) |
| 61 … | + } else { |
| 62 … | + api.sbot.async.get(key, (_, value) => { |
| 63 … | + var msg = {key, value} |
| 64 … | + if (msg.value) { |
| 65 … | + cache.set(key, msg) |
| 66 … | + } |
| 67 … | + cb(null, msg) |
| 68 … | + }) |
| 69 … | + } |
| 70 … | + } |
| 71 … | + }), |
| 72 … | + |
| 73 … | + |
| 74 … | + pull.map(msg => { |
| 75 … | + if (msg.value && typeof msg.value.content === 'string') { |
| 76 … | + var unboxed = api.message.sync.unbox(msg) |
| 77 … | + if (unboxed) return unboxed |
| 78 … | + } |
| 79 … | + return msg |
| 80 … | + }), |
| 81 … | + |
| 82 … | + |
24 | 83 … | pull.filter(msg => msg && msg.value && !api.message.sync.root(msg)), |
25 | 84 … | pull.filter(rootFilter || (() => true)), |
26 | | - AddReplies() |
| 85 … | + |
| 86 … | + |
| 87 … | + pull.asyncMap((rootMessage, cb) => { |
| 88 … | + pull( |
| 89 … | + api.sbot.pull.backlinks({ |
| 90 … | + query: [{$filter: { dest: rootMessage.key }}] |
| 91 … | + }), |
| 92 … | + pull.filter(msg => (api.message.sync.root(msg) || rootMessage.key) === rootMessage.key), |
| 93 … | + pull.collect((err, replies) => { |
| 94 … | + if (err) return cb(err) |
| 95 … | + cb(null, extend(rootMessage, { replies })) |
| 96 … | + }) |
| 97 … | + ) |
| 98 … | + }) |
27 | 99 … | ) |
28 | 100 … | }) |
29 | | - |
30 | | - |
31 | | - function Lookup () { |
32 | | - return pull.asyncMap((key, cb) => { |
33 | | - api.sbot.async.get(key, (_, value) => { |
34 | | - if (value && typeof value.content === 'string') { |
35 | | - value = api.message.sync.unbox(value) |
36 | | - } |
37 | | - cb(null, {key, value}) |
38 | | - }) |
39 | | - }) |
40 | | - } |
41 | | - |
42 | | - function AddReplies () { |
43 | | - return pull.asyncMap((rootMessage, cb) => { |
44 | | - pull( |
45 | | - api.sbot.pull.backlinks({ |
46 | | - query: [{$filter: { dest: rootMessage.key }}] |
47 | | - }), |
48 | | - pull.filter(msg => (api.message.sync.root(msg) || rootMessage.key) === rootMessage.key), |
49 | | - pull.collect((err, replies) => { |
50 | | - if (err) return cb(err) |
51 | | - cb(null, extend(rootMessage, { replies })) |
52 | | - }) |
53 | | - ) |
54 | | - }) |
55 | | - } |
56 | 101 … | } |