Commit 7843b07831ea4e4665e69676cbf74a669bf57874
add feed.pull.rollup
Matt McKegg committed on 6/20/2017, 4:09:21 PMParent: ef870d56f5d2d95a1f8cbc2bd5d250c5768c5ee6
Files changed
feed/pull/rollup.js | added |
feed/pull/rollup.js | |||
---|---|---|---|
@@ -1,0 +1,70 @@ | |||
1 … | +// read stream to get events | ||
2 … | +// for each item, check to see if already rendered root | ||
3 … | +// accept prioritized list (render these first) | ||
4 … | + | ||
5 … | +var pull = require('pull-stream') | ||
6 … | +var nest = require('depnest') | ||
7 … | +var extend = require('xtend') | ||
8 … | + | ||
9 … | +exports.needs = nest({ | ||
10 … | + 'sbot.pull.backlinks': 'first', | ||
11 … | + 'sbot.async.get': 'first', | ||
12 … | + 'message.sync.root': 'first', | ||
13 … | + 'message.sync.unbox': 'first' | ||
14 … | +}) | ||
15 … | + | ||
16 … | +exports.gives = nest('feed.pull.rollup', true) | ||
17 … | + | ||
18 … | +exports.create = function (api) { | ||
19 … | + return nest('feed.pull.rollup', function (rootFilter) { | ||
20 … | + return pull( | ||
21 … | + Roots(), | ||
22 … | + Lookup(), | ||
23 … | + pull.filter(msg => msg && msg.value && !api.message.sync.root(msg)), | ||
24 … | + pull.filter(rootFilter || (() => true)), | ||
25 … | + AddReplies() | ||
26 … | + ) | ||
27 … | + }) | ||
28 … | + | ||
29 … | + // scoped | ||
30 … | + | ||
31 … | + function Roots () { | ||
32 … | + var alreadyEmitted = new Set() | ||
33 … | + return pull( | ||
34 … | + pull.map((msg) => { | ||
35 … | + var root = api.message.sync.root(msg) || msg.key | ||
36 … | + if (!alreadyEmitted.has(root)) { | ||
37 … | + alreadyEmitted.add(root) | ||
38 … | + return root | ||
39 … | + } | ||
40 … | + }), | ||
41 … | + pull.filter() | ||
42 … | + ) | ||
43 … | + } | ||
44 … | + | ||
45 … | + function Lookup () { | ||
46 … | + return pull.asyncMap((key, cb) => { | ||
47 … | + api.sbot.async.get(key, (_, value) => { | ||
48 … | + if (typeof value.content === 'string') { | ||
49 … | + value = api.message.sync.unbox(value) | ||
50 … | + } | ||
51 … | + cb(null, {key, value}) | ||
52 … | + }) | ||
53 … | + }) | ||
54 … | + } | ||
55 … | + | ||
56 … | + function AddReplies () { | ||
57 … | + return pull.asyncMap((rootMessage, cb) => { | ||
58 … | + pull( | ||
59 … | + api.sbot.pull.backlinks({ | ||
60 … | + query: [{$filter: { dest: rootMessage.key }}] | ||
61 … | + }), | ||
62 … | + pull.filter(msg => api.message.sync.root(msg) || rootMessage.key === rootMessage.key), | ||
63 … | + pull.collect((err, replies) => { | ||
64 … | + if (err) return cb(err) | ||
65 … | + cb(null, extend(rootMessage, { replies })) | ||
66 … | + }) | ||
67 … | + ) | ||
68 … | + }) | ||
69 … | + } | ||
70 … | +} |
Built with git-ssb-web