Files: 418ac35b065b1ea7acf2d01ec7f1de61a5a721df / feed / pull / rollup.js
2849 bytesRaw
1 | // read stream to get events |
2 | // for each item, check to see if already rendered root |
3 | // accept prioritized list (render these first) |
4 | |
5 | var pull = require('pull-stream') |
6 | var nest = require('depnest') |
7 | var extend = require('xtend') |
8 | var HLRU = require('hashlru') |
9 | var resolve = require('mutant/resolve') |
10 | var onceTrue = require('mutant/once-true') |
11 | |
12 | exports.needs = nest({ |
13 | 'backlinks.obs.for': 'first', |
14 | 'sbot.async.get': 'first', |
15 | 'message.sync.isBlocked': 'first', |
16 | 'message.sync.root': 'first', |
17 | 'message.sync.unbox': 'first', |
18 | }) |
19 | |
20 | exports.gives = nest('feed.pull.rollup', true) |
21 | |
22 | exports.create = function (api) { |
23 | // cache mostly just to avoid reading the same roots over and over again |
24 | // not really big enough for multiple refresh cycles |
25 | var cache = HLRU(100) |
26 | |
27 | return nest('feed.pull.rollup', function (rootFilter) { |
28 | var seen = new Set() |
29 | return pull( |
30 | pull.map(msg => { |
31 | if (msg.value) { |
32 | var root = api.message.sync.root(msg) |
33 | if (!root) { |
34 | // already a root, pass thru! |
35 | return msg |
36 | } else { |
37 | return root |
38 | } |
39 | } |
40 | }), |
41 | |
42 | // UNIQUE |
43 | pull.filter(idOrMsg => { |
44 | if (idOrMsg) { |
45 | if (idOrMsg.key) idOrMsg = idOrMsg.key |
46 | if (typeof idOrMsg === 'string') { |
47 | var key = idOrMsg |
48 | if (!seen.has(key)) { |
49 | seen.add(key) |
50 | return true |
51 | } |
52 | } |
53 | } |
54 | }), |
55 | |
56 | // LOOKUP (if needed) |
57 | pull.asyncMap((keyOrMsg, cb) => { |
58 | if (keyOrMsg.value) { |
59 | cb(null, keyOrMsg) |
60 | } else { |
61 | var key = keyOrMsg |
62 | if (cache.has(key)) { |
63 | cb(null, cache.get(key)) |
64 | } else { |
65 | api.sbot.async.get(key, (_, value) => { |
66 | var msg = {key, value} |
67 | if (msg.value) { |
68 | cache.set(key, msg) |
69 | } |
70 | cb(null, msg) |
71 | }) |
72 | } |
73 | } |
74 | }), |
75 | |
76 | // UNBOX (if needed) |
77 | pull.map(msg => { |
78 | if (msg.value && typeof msg.value.content === 'string') { |
79 | var unboxed = api.message.sync.unbox(msg) |
80 | if (unboxed) return unboxed |
81 | } |
82 | return msg |
83 | }), |
84 | |
85 | // FILTER |
86 | pull.filter(msg => msg && msg.value), |
87 | pull.filter(rootFilter || (() => true)), |
88 | pull.filter(msg => !api.message.sync.isBlocked(msg)), |
89 | |
90 | // ADD REPLIES |
91 | pull.asyncMap((rootMessage, cb) => { |
92 | // use global backlinks cache |
93 | var backlinks = api.backlinks.obs.for(rootMessage.key) |
94 | onceTrue(backlinks.sync, () => { |
95 | var replies = resolve(backlinks).filter(msg => { |
96 | return api.message.sync.root(msg) === rootMessage.key && !api.message.sync.isBlocked(msg) |
97 | }) |
98 | cb(null, extend(rootMessage, { replies })) |
99 | }) |
100 | }) |
101 | ) |
102 | }) |
103 | } |
104 |
Built with git-ssb-web