Files: f6a49ebd9fb233e15e04deecd49ddcbc3f6a4857 / feed / pull / rollup.js
2998 bytesRaw
1 | // read stream to get events |
2 | // for each item, check to see if already rendered root |
3 | // accept prioritized list (render these first) |
4 | |
5 | var pull = require('pull-stream') |
6 | var nest = require('depnest') |
7 | var extend = require('xtend') |
8 | var HLRU = require('hashlru') |
9 | var resolve = require('mutant/resolve') |
10 | var onceTrue = require('mutant/once-true') |
11 | |
12 | exports.needs = nest({ |
13 | 'backlinks.obs.for': 'first', |
14 | 'sbot.async.get': 'first', |
15 | 'message.sync.root': 'first', |
16 | 'message.sync.unbox': 'first', |
17 | 'contact.obs.blocking': 'first', |
18 | 'keys.sync.id': 'first' |
19 | }) |
20 | |
21 | exports.gives = nest('feed.pull.rollup', true) |
22 | |
23 | exports.create = function (api) { |
24 | // cache mostly just to avoid reading the same roots over and over again |
25 | // not really big enough for multiple refresh cycles |
26 | var cache = HLRU(100) |
27 | |
28 | const blocking = api.contact.obs.blocking(api.keys.sync.id()) |
29 | |
30 | return nest('feed.pull.rollup', function (rootFilter) { |
31 | var seen = new Set() |
32 | return pull( |
33 | pull.map(msg => { |
34 | if (msg.value) { |
35 | var root = api.message.sync.root(msg) |
36 | if (!root) { |
37 | // already a root, pass thru! |
38 | return msg |
39 | } else { |
40 | return root |
41 | } |
42 | } |
43 | }), |
44 | |
45 | // UNIQUE |
46 | pull.filter(idOrMsg => { |
47 | if (idOrMsg) { |
48 | if (idOrMsg.key) idOrMsg = idOrMsg.key |
49 | if (typeof idOrMsg === 'string') { |
50 | var key = idOrMsg |
51 | if (!seen.has(key)) { |
52 | seen.add(key) |
53 | return true |
54 | } |
55 | } |
56 | } |
57 | }), |
58 | |
59 | // LOOKUP (if needed) |
60 | pull.asyncMap((keyOrMsg, cb) => { |
61 | if (keyOrMsg.value) { |
62 | cb(null, keyOrMsg) |
63 | } else { |
64 | var key = keyOrMsg |
65 | if (cache.has(key)) { |
66 | cb(null, cache.get(key)) |
67 | } else { |
68 | api.sbot.async.get(key, (_, value) => { |
69 | var msg = {key, value} |
70 | if (msg.value) { |
71 | cache.set(key, msg) |
72 | } |
73 | cb(null, msg) |
74 | }) |
75 | } |
76 | } |
77 | }), |
78 | |
79 | // UNBOX (if needed) |
80 | pull.map(msg => { |
81 | if (msg.value && typeof msg.value.content === 'string') { |
82 | var unboxed = api.message.sync.unbox(msg) |
83 | if (unboxed) return unboxed |
84 | } |
85 | return msg |
86 | }), |
87 | |
88 | // FILTER |
89 | pull.filter(msg => msg && msg.value && !api.message.sync.root(msg)), |
90 | pull.filter(rootFilter || (() => true)), |
91 | |
92 | pull.filter(msg => !blocking().includes(msg.value.author)), |
93 | |
94 | // ADD REPLIES |
95 | pull.asyncMap((rootMessage, cb) => { |
96 | // use global backlinks cache |
97 | var backlinks = api.backlinks.obs.for(rootMessage.key) |
98 | onceTrue(backlinks.sync, () => { |
99 | var replies = resolve(backlinks).filter((msg) => { |
100 | return !blocking().includes(msg.value.author) && |
101 | api.message.sync.root(msg) === rootMessage.key |
102 | }) |
103 | cb(null, extend(rootMessage, { replies })) |
104 | }) |
105 | }) |
106 | ) |
107 | }) |
108 | } |
109 |
Built with git-ssb-web