Files: 42ce7079c3fb1c0fed6c733b7f8cef04102d9267 / overrides / patchcore / backlinks / obs.js
3340 bytesRaw
1 | var nest = require('depnest') |
2 | var Value = require('mutant/value') |
3 | var onceTrue = require('mutant/once-true') |
4 | var computed = require('mutant/computed') |
5 | var resolve = require('mutant/resolve') |
6 | var pull = require('pull-stream') |
7 | var onceIdle = require('mutant/once-idle') |
8 | var sorted = require('sorted-array-functions') |
9 | |
10 | exports.needs = nest({ |
11 | 'sbot.pull.backlinks': 'first', |
12 | 'sbot.obs.connection': 'first', |
13 | 'sbot.pull.stream': 'first' |
14 | }) |
15 | |
16 | exports.gives = nest('backlinks.obs.for', true) |
17 | |
18 | exports.create = function (api) { |
19 | var cache = {} |
20 | var collections = {} |
21 | |
22 | var loaded = false |
23 | |
24 | // cycle remove sets for fast cleanup |
25 | var newRemove = new Set() |
26 | var oldRemove = new Set() |
27 | |
28 | // run cache cleanup every 5 seconds |
29 | // an item will be removed from cache between 5 - 10 seconds after release |
30 | // this ensures that the data is still available for a page reload |
31 | var timer = setInterval(() => { |
32 | oldRemove.forEach(id => { |
33 | if (cache[id]) { |
34 | unsubscribe(id) |
35 | delete collections[id] |
36 | delete cache[id] |
37 | } |
38 | }) |
39 | oldRemove.clear() |
40 | |
41 | // cycle |
42 | var hold = oldRemove |
43 | oldRemove = newRemove |
44 | newRemove = hold |
45 | }, 5e3) |
46 | |
47 | if (timer.unref) timer.unref() |
48 | |
49 | return nest({ |
50 | 'backlinks.obs.for': (id) => backlinks(id) |
51 | }) |
52 | |
53 | function backlinks (id) { |
54 | load() |
55 | if (!cache[id]) { |
56 | var sync = Value(false) |
57 | var collection = Value([]) |
58 | subscribe(id) |
59 | |
60 | // try not to saturate the thread |
61 | onceIdle(() => { |
62 | pull( |
63 | api.sbot.pull.backlinks({ |
64 | query: [ {$filter: { dest: id }} ], |
65 | index: 'DTA' // use asserted timestamps |
66 | }), |
67 | pull.drain((msg) => { |
68 | var value = resolve(collection) |
69 | sorted.add(value, msg, compareAsserted) |
70 | collection.set(value) |
71 | }, () => { |
72 | sync.set(true) |
73 | }) |
74 | ) |
75 | }) |
76 | |
77 | collections[id] = collection |
78 | cache[id] = computed([collection], x => x, { |
79 | onListen: () => use(id), |
80 | onUnlisten: () => release(id) |
81 | }) |
82 | |
83 | cache[id].sync = sync |
84 | } |
85 | return cache[id] |
86 | } |
87 | |
88 | function load () { |
89 | if (!loaded) { |
90 | pull( |
91 | api.sbot.pull.stream(sbot => sbot.patchwork.liveBacklinks.stream()), |
92 | pull.drain(msg => { |
93 | var collection = collections[msg.dest] |
94 | if (collection) { |
95 | var value = resolve(collection) |
96 | value.push(msg) |
97 | collection.set(value) |
98 | } |
99 | }) |
100 | ) |
101 | loaded = true |
102 | } |
103 | } |
104 | |
105 | function use (id) { |
106 | newRemove.delete(id) |
107 | oldRemove.delete(id) |
108 | } |
109 | |
110 | function release (id) { |
111 | newRemove.add(id) |
112 | } |
113 | |
114 | function subscribe (id) { |
115 | onceTrue(api.sbot.obs.connection(), (sbot) => sbot.patchwork.liveBacklinks.subscribe(id)) |
116 | } |
117 | |
118 | function unsubscribe (id) { |
119 | onceTrue(api.sbot.obs.connection(), (sbot) => sbot.patchwork.liveBacklinks.unsubscribe(id)) |
120 | } |
121 | } |
122 | |
123 | function compareAsserted (a, b) { |
124 | if (isReplyTo(a, b)) { |
125 | return -1 |
126 | } else if (isReplyTo(b, a)) { |
127 | return 1 |
128 | } else { |
129 | return a.value.timestamp - b.value.timestamp |
130 | } |
131 | } |
132 | |
133 | function isReplyTo (maybeReply, msg) { |
134 | return (includesOrEquals(maybeReply.branch, msg.key)) |
135 | } |
136 | |
137 | function includesOrEquals (array, value) { |
138 | if (Array.isArray(array)) { |
139 | return array.includes(value) |
140 | } else { |
141 | return array === value |
142 | } |
143 | } |
144 |
Built with git-ssb-web