Files: e69a4bdedf11f26f5a8b25434154fefafbf6ba2a / feed / pull / rollup.js
2699 bytesRaw
1 | // read stream to get events |
2 | // for each item, check to see if already rendered root |
3 | // accept prioritized list (render these first) |
4 | |
5 | var pull = require('pull-stream') |
6 | var nest = require('depnest') |
7 | var extend = require('xtend') |
8 | var HLRU = require('hashlru') |
9 | |
10 | exports.needs = nest({ |
11 | 'sbot.pull.backlinks': 'first', |
12 | 'sbot.async.get': 'first', |
13 | 'message.sync.root': 'first', |
14 | 'message.sync.unbox': 'first' |
15 | }) |
16 | |
17 | exports.gives = nest('feed.pull.rollup', true) |
18 | |
19 | exports.create = function (api) { |
20 | // cache mostly just to avoid reading the same roots over and over again |
21 | // not really big enough for multiple refresh cycles |
22 | var cache = HLRU(100) |
23 | |
24 | return nest('feed.pull.rollup', function (rootFilter) { |
25 | var seen = new Set() |
26 | return pull( |
27 | pull.map(msg => { |
28 | if (msg.value) { |
29 | var root = api.message.sync.root(msg) |
30 | if (!root) { |
31 | // already a root, pass thru! |
32 | return msg |
33 | } else { |
34 | return root |
35 | } |
36 | } |
37 | }), |
38 | |
39 | // UNIQUE |
40 | pull.filter(idOrMsg => { |
41 | if (idOrMsg) { |
42 | if (idOrMsg.key) idOrMsg = idOrMsg.key |
43 | if (typeof idOrMsg === 'string') { |
44 | var key = idOrMsg |
45 | if (!seen.has(key)) { |
46 | seen.add(key) |
47 | return true |
48 | } |
49 | } |
50 | } |
51 | }), |
52 | |
53 | // LOOKUP (if needed) |
54 | pull.asyncMap((keyOrMsg, cb) => { |
55 | if (keyOrMsg.value) { |
56 | cb(null, keyOrMsg) |
57 | } else { |
58 | var key = keyOrMsg |
59 | if (cache.has(key)) { |
60 | cb(null, cache.get(key)) |
61 | } else { |
62 | api.sbot.async.get(key, (_, value) => { |
63 | var msg = {key, value} |
64 | if (msg.value) { |
65 | cache.set(key, msg) |
66 | } |
67 | cb(null, msg) |
68 | }) |
69 | } |
70 | } |
71 | }), |
72 | |
73 | // UNBOX (if needed) |
74 | pull.map(msg => { |
75 | if (msg.value && typeof msg.value.content === 'string') { |
76 | var unboxed = api.message.sync.unbox(msg) |
77 | if (unboxed) return unboxed |
78 | } |
79 | return msg |
80 | }), |
81 | |
82 | // FILTER |
83 | pull.filter(msg => msg && msg.value && !api.message.sync.root(msg)), |
84 | pull.filter(rootFilter || (() => true)), |
85 | |
86 | // ADD REPLIES |
87 | pull.asyncMap((rootMessage, cb) => { |
88 | pull( |
89 | api.sbot.pull.backlinks({ |
90 | query: [{$filter: { dest: rootMessage.key }}] |
91 | }), |
92 | pull.filter(msg => (api.message.sync.root(msg) || rootMessage.key) === rootMessage.key), |
93 | pull.collect((err, replies) => { |
94 | if (err) return cb(err) |
95 | cb(null, extend(rootMessage, { replies })) |
96 | }) |
97 | ) |
98 | }) |
99 | ) |
100 | }) |
101 | } |
102 |
Built with git-ssb-web