Files: 531b55a224c7f5a8f794c64f2d60803fdfba8df9 / backlinks / obs-filter.js
2241 bytesRaw
1 | var nest = require('depnest') |
2 | var pull = require('pull-stream') |
3 | var MutantPullReduce = require('mutant-pull-reduce') |
4 | |
5 | exports.needs = nest({ |
6 | 'sbot.pull.backlinks': 'first' |
7 | }) |
8 | |
9 | exports.gives = nest('backlinks.obs.filter', true) |
10 | |
11 | /** |
12 | * sbot.obs.filter returns an observable list of messages that link |
13 | * back to the message with the given message ID (@id). Only messages that |
14 | * pass the filter are added to the list. |
15 | * |
16 | * When a message arrives, if a filter function is given in the options (opts.filter) |
17 | * and passing it to the filter function does not result in it returning |
18 | * 'true' the message is not added to the observable list. |
19 | * |
20 | * A 'sync' observable property is also added to the returned observable |
21 | * which is 'true' when all previously seen messages are caught up with. |
22 | * |
23 | * Note: Unlike backlinks.obs.for this does not cache the observable for |
24 | * callers that supply the same arguments. |
25 | */ |
26 | exports.create = function (api) { |
27 | function pullFilterReduceObs (id, opts) { |
28 | if (!id || typeof (id) !== 'string') { |
29 | throw new Error('id must be a string.') |
30 | } |
31 | |
32 | var sbotFilter = { |
33 | $filter: { |
34 | dest: id |
35 | } |
36 | } |
37 | |
38 | var msgBacklinks = api.sbot.pull.backlinks({ |
39 | query: [sbotFilter], |
40 | index: 'DTA', // use asserted timestamps |
41 | live: true |
42 | }) |
43 | |
44 | // If a filter function is supplied in the options, we use it to filter |
45 | // the links stream, otherwise we use all the messages from the stream |
46 | var filterFunction = opts && opts.filter ? opts.filter : () => true |
47 | |
48 | var filteredBacklinks = pull( |
49 | msgBacklinks, |
50 | // We need to allow 'msg.sync' even if the supplied filter function does not |
51 | // match it to allow mutant-pull-reduce to handle it for us and set the |
52 | // 'sync' observable to indicate that the list is up to date with the messages |
53 | // received so far. |
54 | pull.filter(msg => msg.sync || filterFunction(msg)) |
55 | ) |
56 | |
57 | var backlinksObs = MutantPullReduce(filteredBacklinks, (state, msg) => { |
58 | state.push(msg) |
59 | return state |
60 | }, { |
61 | startValue: [], |
62 | nextTick: true, |
63 | sync: true |
64 | }) |
65 | |
66 | return backlinksObs |
67 | } |
68 | |
69 | return nest({ |
70 | 'backlinks.obs.filter': (id, opts) => pullFilterReduceObs(id, opts) |
71 | }) |
72 | } |
73 |
Built with git-ssb-web