git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: 0b68762e851dab73abc45fa60add0d96d3b5c7ac

Files: 0b68762e851dab73abc45fa60add0d96d3b5c7ac / backlinks / obs-filter.js

3473 bytesRaw
1var nest = require('depnest')
2var pull = require('pull-stream')
3var Value = require('mutant/value')
4var computed = require('mutant/computed')
5var Abortable = require('pull-abortable')
6var onceIdle = require('mutant/once-idle')
7var resolve = require('mutant/resolve')
8
9exports.needs = nest({
10 'sbot.pull.backlinks': 'first',
11 'backlinks.obs.cache': 'first'
12})
13
14exports.gives = nest('backlinks.obs.filter', true)
15
16/**
17 * sbot.obs.filter returns an observable list of messages that link
18 * back to the message with the given message ID (@id). Only messages that
19 * pass the filter are added to the list.
20 *
21 * When a message arrives, if a filter function is given in the options (opts.filter)
22 * and passing it to the filter function does not result in it returning
23 * 'true' the message is not added to the observable list.
24 *
25 * An optional backlinks cache (which should be constructed from backlinks.obs.cache) can be
26 * supplied with opts.cache. A caller constructed cache is required because different
27 * pull stream filters might be used for the same thread ID. If no cache is supplied,
28 * the backlinks observables will not be cached.
29 *
30 * A 'sync' observable property is also added to the returned observable
31 * which is 'true' when all previously seen messages are caught up with.
32 *
33 */
34exports.create = function (api) {
35
36 function pullFilterReduceObs (id, filterFunction, backlinksCache) {
37 if (!id || typeof (id) !== 'string') {
38 throw new Error('id must be a string.')
39 }
40
41 var sbotFilter = {
42 $filter: {
43 dest: id
44 }
45 }
46
47 var msgBacklinks = api.sbot.pull.backlinks({
48 query: [sbotFilter],
49 index: 'DTA', // use asserted timestamps
50 live: true
51 })
52
53 var filteredBacklinks = pull(
54 msgBacklinks,
55 // We need to allow 'msg.sync' even if the supplied filter function does not
56 // match it to allow mutant-pull-reduce to handle it for us and set the
57 // 'sync' observable to indicate that the list is up to date with the messages
58 // received so far.
59 pull.filter(msg => msg.sync || filterFunction(msg))
60 )
61
62 if (backlinksCache) {
63 return backlinksCache.cachedBacklinks(id, filteredBacklinks)
64 } else {
65 return backlinksObs(id, filteredBacklinks);
66 }
67 }
68
69 function backlinksObs(id, backlinksPullStream) {
70 var sync = Value(false)
71 var aborter = Abortable()
72 var collection = Value([])
73
74 var obs = computed([collection], x => x, {
75 onListen: () => {
76 // try not to saturate the thread
77 onceIdle(() => {
78 pull(
79 backlinksPullStream,
80 aborter,
81 pull.drain((msg) => {
82 if (msg.sync) {
83 sync.set(true)
84 } else {
85 var value = resolve(collection)
86 value.push(msg)
87 collection.set(value)
88 }
89 })
90 )
91 })
92 },
93 onUnlisten: () => aborter.abort
94 })
95
96 obs.sync = sync;
97
98 return obs;
99 }
100
101 return nest({
102 'backlinks.obs.filter': (id, opts) => {
103
104 // If a filter function is supplied in the options, we use it to filter
105 // the links stream, otherwise we use all the messages from the stream
106 var filterFunction = opts && opts.filter ? opts.filter : () => true
107 var cache = opts ? opts.cache : null;
108
109 return pullFilterReduceObs(id, filterFunction, cache)
110 }
111 })
112}
113

Built with git-ssb-web