Files: 1e0d83b096ae1e434e0decc1cb3af6cec68ec72a / lib / depject / backlinks / obs.js
4253 bytesRaw
1 | const nest = require('depnest') |
2 | const Value = require('mutant/value') |
3 | const onceTrue = require('mutant/once-true') |
4 | const computed = require('mutant/computed') |
5 | const resolve = require('mutant/resolve') |
6 | const pull = require('pull-stream') |
7 | const sorted = require('sorted-array-functions') |
8 | const MutantPullCollection = require('../../mutant-pull-collection') |
9 | const getTimestamp = require('../../get-timestamp') |
10 | const getRoot = require('../../message/sync/root') |
11 | |
12 | exports.needs = nest({ |
13 | 'sbot.pull.backlinks': 'first', |
14 | 'sbot.obs.connection': 'first', |
15 | 'sbot.pull.stream': 'first' |
16 | }) |
17 | |
18 | exports.gives = nest({ |
19 | 'backlinks.obs.for': true, |
20 | 'backlinks.obs.references': true, |
21 | 'backlinks.obs.forks': true |
22 | }) |
23 | |
24 | exports.create = function (api) { |
25 | const cache = {} |
26 | const collections = {} |
27 | |
28 | let loaded = false |
29 | |
30 | // cycle remove sets for fast cleanup |
31 | let newRemove = new Set() |
32 | let oldRemove = new Set() |
33 | |
34 | // run cache cleanup every 5 seconds |
35 | // an item will be removed from cache between 5 - 10 seconds after release |
36 | // this ensures that the data is still available for a page reload |
37 | const timer = setInterval(() => { |
38 | oldRemove.forEach(id => { |
39 | if (cache[id]) { |
40 | unsubscribe(id) |
41 | delete collections[id] |
42 | delete cache[id] |
43 | } |
44 | }) |
45 | oldRemove.clear() |
46 | |
47 | // cycle |
48 | const hold = oldRemove |
49 | oldRemove = newRemove |
50 | newRemove = hold |
51 | }, 5e3) |
52 | |
53 | if (timer.unref) timer.unref() |
54 | |
55 | return nest({ |
56 | 'backlinks.obs.for': (id) => backlinks(id), |
57 | 'backlinks.obs.references': references, |
58 | 'backlinks.obs.forks': forks |
59 | }) |
60 | |
61 | function references (msg) { |
62 | const id = msg.key |
63 | return MutantPullCollection((lastMessage) => { |
64 | return api.sbot.pull.stream((sbot) => sbot.patchwork.backlinks.referencesStream({ id, since: lastMessage && lastMessage.timestamp })) |
65 | }) |
66 | } |
67 | |
68 | function forks (msg) { |
69 | const id = msg.key |
70 | const rooted = !!getRoot(msg) |
71 | if (rooted) { |
72 | return MutantPullCollection((lastMessage) => { |
73 | return api.sbot.pull.stream((sbot) => sbot.patchwork.backlinks.forksStream({ id, since: lastMessage && lastMessage.timestamp })) |
74 | }) |
75 | } else { |
76 | return [] |
77 | } |
78 | } |
79 | |
80 | function backlinks (id) { |
81 | load() |
82 | if (!cache[id]) { |
83 | const sync = Value(false) |
84 | const collection = Value([]) |
85 | subscribe(id) |
86 | |
87 | process.nextTick(() => { |
88 | pull( |
89 | api.sbot.pull.backlinks({ |
90 | query: [{ $filter: { dest: id } }], |
91 | index: 'DTA' // use asserted timestamps |
92 | }), |
93 | pull.drain((msg) => { |
94 | const value = resolve(collection) |
95 | sorted.add(value, msg, compareAsserted) |
96 | collection.set(value) |
97 | }, () => { |
98 | sync.set(true) |
99 | }) |
100 | ) |
101 | }) |
102 | |
103 | collections[id] = collection |
104 | cache[id] = computed([collection], x => x, { |
105 | onListen: () => use(id), |
106 | onUnlisten: () => release(id) |
107 | }) |
108 | |
109 | cache[id].sync = sync |
110 | } |
111 | return cache[id] |
112 | } |
113 | |
114 | function load () { |
115 | if (!loaded) { |
116 | pull( |
117 | api.sbot.pull.stream(sbot => sbot.patchwork.liveBacklinks.stream()), |
118 | pull.drain(msg => { |
119 | const collection = collections[msg.dest] |
120 | if (collection) { |
121 | const value = resolve(collection) |
122 | sorted.add(value, msg, compareAsserted) |
123 | collection.set(value) |
124 | } |
125 | }) |
126 | ) |
127 | loaded = true |
128 | } |
129 | } |
130 | |
131 | function use (id) { |
132 | newRemove.delete(id) |
133 | oldRemove.delete(id) |
134 | } |
135 | |
136 | function release (id) { |
137 | newRemove.add(id) |
138 | } |
139 | |
140 | function subscribe (id) { |
141 | onceTrue(api.sbot.obs.connection(), (sbot) => sbot.patchwork.liveBacklinks.subscribe(id)) |
142 | } |
143 | |
144 | function unsubscribe (id) { |
145 | onceTrue(api.sbot.obs.connection(), (sbot) => sbot.patchwork.liveBacklinks.unsubscribe(id)) |
146 | } |
147 | |
148 | function compareAsserted (a, b) { |
149 | if (isReplyTo(a, b)) { |
150 | return -1 |
151 | } else if (isReplyTo(b, a)) { |
152 | return 1 |
153 | } else { |
154 | return getTimestamp(a) - getTimestamp(b) |
155 | } |
156 | } |
157 | } |
158 | |
159 | function isReplyTo (maybeReply, msg) { |
160 | return (includesOrEquals(maybeReply.branch, msg.key)) |
161 | } |
162 | |
163 | function includesOrEquals (array, value) { |
164 | if (Array.isArray(array)) { |
165 | return array.includes(value) |
166 | } else { |
167 | return array === value |
168 | } |
169 | } |
170 |
Built with git-ssb-web