Files: 0b68762e851dab73abc45fa60add0d96d3b5c7ac / backlinks / obs-filter.js
3473 bytesRaw
1 | var nest = require('depnest') |
2 | var pull = require('pull-stream') |
3 | var Value = require('mutant/value') |
4 | var computed = require('mutant/computed') |
5 | var Abortable = require('pull-abortable') |
6 | var onceIdle = require('mutant/once-idle') |
7 | var resolve = require('mutant/resolve') |
8 | |
9 | exports.needs = nest({ |
10 | 'sbot.pull.backlinks': 'first', |
11 | 'backlinks.obs.cache': 'first' |
12 | }) |
13 | |
14 | exports.gives = nest('backlinks.obs.filter', true) |
15 | |
16 | /** |
17 | * sbot.obs.filter returns an observable list of messages that link |
18 | * back to the message with the given message ID (@id). Only messages that |
19 | * pass the filter are added to the list. |
20 | * |
21 | * When a message arrives, if a filter function is given in the options (opts.filter) |
22 | * and passing it to the filter function does not result in it returning |
23 | * 'true' the message is not added to the observable list. |
24 | * |
25 | * An optional backlinks cache (which should be constructed from backlinks.obs.cache) can be |
26 | * supplied with opts.cache. A caller constructed cache is required because different |
27 | * pull stream filters might be used for the same thread ID. If no cache is supplied, |
28 | * the backlinks observables will not be cached. |
29 | * |
30 | * A 'sync' observable property is also added to the returned observable |
31 | * which is 'true' when all previously seen messages are caught up with. |
32 | * |
33 | */ |
34 | exports.create = function (api) { |
35 | |
36 | function pullFilterReduceObs (id, filterFunction, backlinksCache) { |
37 | if (!id || typeof (id) !== 'string') { |
38 | throw new Error('id must be a string.') |
39 | } |
40 | |
41 | var sbotFilter = { |
42 | $filter: { |
43 | dest: id |
44 | } |
45 | } |
46 | |
47 | var msgBacklinks = api.sbot.pull.backlinks({ |
48 | query: [sbotFilter], |
49 | index: 'DTA', // use asserted timestamps |
50 | live: true |
51 | }) |
52 | |
53 | var filteredBacklinks = pull( |
54 | msgBacklinks, |
55 | // We need to allow 'msg.sync' even if the supplied filter function does not |
56 | // match it to allow mutant-pull-reduce to handle it for us and set the |
57 | // 'sync' observable to indicate that the list is up to date with the messages |
58 | // received so far. |
59 | pull.filter(msg => msg.sync || filterFunction(msg)) |
60 | ) |
61 | |
62 | if (backlinksCache) { |
63 | return backlinksCache.cachedBacklinks(id, filteredBacklinks) |
64 | } else { |
65 | return backlinksObs(id, filteredBacklinks); |
66 | } |
67 | } |
68 | |
69 | function backlinksObs(id, backlinksPullStream) { |
70 | var sync = Value(false) |
71 | var aborter = Abortable() |
72 | var collection = Value([]) |
73 | |
74 | var obs = computed([collection], x => x, { |
75 | onListen: () => { |
76 | // try not to saturate the thread |
77 | onceIdle(() => { |
78 | pull( |
79 | backlinksPullStream, |
80 | aborter, |
81 | pull.drain((msg) => { |
82 | if (msg.sync) { |
83 | sync.set(true) |
84 | } else { |
85 | var value = resolve(collection) |
86 | value.push(msg) |
87 | collection.set(value) |
88 | } |
89 | }) |
90 | ) |
91 | }) |
92 | }, |
93 | onUnlisten: () => aborter.abort |
94 | }) |
95 | |
96 | obs.sync = sync; |
97 | |
98 | return obs; |
99 | } |
100 | |
101 | return nest({ |
102 | 'backlinks.obs.filter': (id, opts) => { |
103 | |
104 | // If a filter function is supplied in the options, we use it to filter |
105 | // the links stream, otherwise we use all the messages from the stream |
106 | var filterFunction = opts && opts.filter ? opts.filter : () => true |
107 | var cache = opts ? opts.cache : null; |
108 | |
109 | return pullFilterReduceObs(id, filterFunction, cache) |
110 | } |
111 | }) |
112 | } |
113 |
Built with git-ssb-web