git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: 531b55a224c7f5a8f794c64f2d60803fdfba8df9

Files: 531b55a224c7f5a8f794c64f2d60803fdfba8df9 / backlinks / obs-filter.js

2241 bytesRaw
1var nest = require('depnest')
2var pull = require('pull-stream')
3var MutantPullReduce = require('mutant-pull-reduce')
4
5exports.needs = nest({
6 'sbot.pull.backlinks': 'first'
7})
8
9exports.gives = nest('backlinks.obs.filter', true)
10
11/**
12 * sbot.obs.filter returns an observable list of messages that link
13 * back to the message with the given message ID (@id). Only messages that
14 * pass the filter are added to the list.
15 *
16 * When a message arrives, if a filter function is given in the options (opts.filter)
17 * and passing it to the filter function does not result in it returning
18 * 'true' the message is not added to the observable list.
19 *
20 * A 'sync' observable property is also added to the returned observable
21 * which is 'true' when all previously seen messages are caught up with.
22 *
23 * Note: Unlike backlinks.obs.for this does not cache the observable for
24 * callers that supply the same arguments.
25 */
26exports.create = function (api) {
27 function pullFilterReduceObs (id, opts) {
28 if (!id || typeof (id) !== 'string') {
29 throw new Error('id must be a string.')
30 }
31
32 var sbotFilter = {
33 $filter: {
34 dest: id
35 }
36 }
37
38 var msgBacklinks = api.sbot.pull.backlinks({
39 query: [sbotFilter],
40 index: 'DTA', // use asserted timestamps
41 live: true
42 })
43
44 // If a filter function is supplied in the options, we use it to filter
45 // the links stream, otherwise we use all the messages from the stream
46 var filterFunction = opts && opts.filter ? opts.filter : () => true
47
48 var filteredBacklinks = pull(
49 msgBacklinks,
50 // We need to allow 'msg.sync' even if the supplied filter function does not
51 // match it to allow mutant-pull-reduce to handle it for us and set the
52 // 'sync' observable to indicate that the list is up to date with the messages
53 // received so far.
54 pull.filter(msg => msg.sync || filterFunction(msg))
55 )
56
57 var backlinksObs = MutantPullReduce(filteredBacklinks, (state, msg) => {
58 state.push(msg)
59 return state
60 }, {
61 startValue: [],
62 nextTick: true,
63 sync: true
64 })
65
66 return backlinksObs
67 }
68
69 return nest({
70 'backlinks.obs.filter': (id, opts) => pullFilterReduceObs(id, opts)
71 })
72}
73

Built with git-ssb-web