Files: b231307c8a7f2cac38250a347620038c0742abac / overrides / patchcore / backlinks / obs.js
3443 bytesRaw
1 | var nest = require('depnest') |
2 | var Value = require('mutant/value') |
3 | var onceTrue = require('mutant/once-true') |
4 | var computed = require('mutant/computed') |
5 | var resolve = require('mutant/resolve') |
6 | var pull = require('pull-stream') |
7 | var onceIdle = require('mutant/once-idle') |
8 | var sorted = require('sorted-array-functions') |
9 | |
10 | exports.needs = nest({ |
11 | 'sbot.pull.backlinks': 'first', |
12 | 'sbot.obs.connection': 'first', |
13 | 'sbot.pull.stream': 'first', |
14 | 'message.sync.timestamp': 'first' |
15 | }) |
16 | |
17 | exports.gives = nest('backlinks.obs.for', true) |
18 | |
19 | exports.create = function (api) { |
20 | var cache = {} |
21 | var collections = {} |
22 | |
23 | var loaded = false |
24 | |
25 | // cycle remove sets for fast cleanup |
26 | var newRemove = new Set() |
27 | var oldRemove = new Set() |
28 | |
29 | // run cache cleanup every 5 seconds |
30 | // an item will be removed from cache between 5 - 10 seconds after release |
31 | // this ensures that the data is still available for a page reload |
32 | var timer = setInterval(() => { |
33 | oldRemove.forEach(id => { |
34 | if (cache[id]) { |
35 | unsubscribe(id) |
36 | delete collections[id] |
37 | delete cache[id] |
38 | } |
39 | }) |
40 | oldRemove.clear() |
41 | |
42 | // cycle |
43 | var hold = oldRemove |
44 | oldRemove = newRemove |
45 | newRemove = hold |
46 | }, 5e3) |
47 | |
48 | if (timer.unref) timer.unref() |
49 | |
50 | return nest({ |
51 | 'backlinks.obs.for': (id) => backlinks(id) |
52 | }) |
53 | |
54 | function backlinks (id) { |
55 | load() |
56 | if (!cache[id]) { |
57 | var sync = Value(false) |
58 | var collection = Value([]) |
59 | subscribe(id) |
60 | |
61 | // try not to saturate the thread |
62 | onceIdle(() => { |
63 | pull( |
64 | api.sbot.pull.backlinks({ |
65 | query: [ {$filter: { dest: id }} ], |
66 | index: 'DTA' // use asserted timestamps |
67 | }), |
68 | pull.drain((msg) => { |
69 | var value = resolve(collection) |
70 | sorted.add(value, msg, compareAsserted) |
71 | collection.set(value) |
72 | }, () => { |
73 | sync.set(true) |
74 | }) |
75 | ) |
76 | }) |
77 | |
78 | collections[id] = collection |
79 | cache[id] = computed([collection], x => x, { |
80 | onListen: () => use(id), |
81 | onUnlisten: () => release(id) |
82 | }) |
83 | |
84 | cache[id].sync = sync |
85 | } |
86 | return cache[id] |
87 | } |
88 | |
89 | function load () { |
90 | if (!loaded) { |
91 | pull( |
92 | api.sbot.pull.stream(sbot => sbot.patchwork.liveBacklinks.stream()), |
93 | pull.drain(msg => { |
94 | var collection = collections[msg.dest] |
95 | if (collection) { |
96 | var value = resolve(collection) |
97 | sorted.add(value, msg, compareAsserted) |
98 | collection.set(value) |
99 | } |
100 | }) |
101 | ) |
102 | loaded = true |
103 | } |
104 | } |
105 | |
106 | function use (id) { |
107 | newRemove.delete(id) |
108 | oldRemove.delete(id) |
109 | } |
110 | |
111 | function release (id) { |
112 | newRemove.add(id) |
113 | } |
114 | |
115 | function subscribe (id) { |
116 | onceTrue(api.sbot.obs.connection(), (sbot) => sbot.patchwork.liveBacklinks.subscribe(id)) |
117 | } |
118 | |
119 | function unsubscribe (id) { |
120 | onceTrue(api.sbot.obs.connection(), (sbot) => sbot.patchwork.liveBacklinks.unsubscribe(id)) |
121 | } |
122 | |
123 | function compareAsserted (a, b) { |
124 | if (isReplyTo(a, b)) { |
125 | return -1 |
126 | } else if (isReplyTo(b, a)) { |
127 | return 1 |
128 | } else { |
129 | return api.message.sync.timestamp(a) - api.message.sync.timestamp(b) |
130 | } |
131 | } |
132 | } |
133 | |
134 | function isReplyTo (maybeReply, msg) { |
135 | return (includesOrEquals(maybeReply.branch, msg.key)) |
136 | } |
137 | |
138 | function includesOrEquals (array, value) { |
139 | if (Array.isArray(array)) { |
140 | return array.includes(value) |
141 | } else { |
142 | return array === value |
143 | } |
144 | } |
145 |
Built with git-ssb-web