Files: 8e90c8e8cb8f0b22e2d4f3c0ff91bb80b4880378 / backlinks / obs.js
2136 bytesRaw
1 | var nest = require('depnest') |
2 | var Value = require('mutant/value') |
3 | var computed = require('mutant/computed') |
4 | var Abortable = require('pull-abortable') |
5 | var resolve = require('mutant/resolve') |
6 | var pull = require('pull-stream') |
7 | var onceIdle = require('mutant/once-idle') |
8 | |
9 | exports.needs = nest({ |
10 | 'sbot.pull.backlinks': 'first' |
11 | }) |
12 | |
13 | exports.gives = nest('backlinks.obs.for', true) |
14 | |
15 | exports.create = function (api) { |
16 | var cache = {} |
17 | |
18 | // cycle remove sets for fast cleanup |
19 | var newRemove = new Set() |
20 | var oldRemove = new Set() |
21 | |
22 | // run cache cleanup every 5 seconds |
23 | // an item will be removed from cache between 5 - 10 seconds after release |
24 | // this ensures that the data is still available for a page reload |
25 | var timer = setInterval(() => { |
26 | oldRemove.forEach(id => { |
27 | if (cache[id]) { |
28 | cache[id].destroy() |
29 | delete cache[id] |
30 | } |
31 | }) |
32 | oldRemove.clear() |
33 | |
34 | // cycle |
35 | var hold = oldRemove |
36 | oldRemove = newRemove |
37 | newRemove = hold |
38 | }, 5e3) |
39 | |
40 | if (timer.unref) timer.unref() |
41 | |
42 | return nest({ |
43 | 'backlinks.obs.for': (id) => backlinks(id) |
44 | }) |
45 | |
46 | function backlinks (id) { |
47 | if (!cache[id]) { |
48 | var sync = Value(false) |
49 | var aborter = Abortable() |
50 | var collection = Value([]) |
51 | |
52 | // try not to saturate the thread |
53 | onceIdle(() => { |
54 | pull( |
55 | api.sbot.pull.backlinks({ |
56 | query: [ {$filter: { dest: id }} ], |
57 | index: 'DTA', // use asserted timestamps |
58 | live: true |
59 | }), |
60 | aborter, |
61 | pull.drain((msg) => { |
62 | if (msg.sync) { |
63 | sync.set(true) |
64 | } else { |
65 | var value = resolve(collection) |
66 | value.push(msg) |
67 | collection.set(value) |
68 | } |
69 | }) |
70 | ) |
71 | }) |
72 | |
73 | cache[id] = computed([collection], x => x, { |
74 | onListen: () => use(id), |
75 | onUnlisten: () => release(id) |
76 | }) |
77 | |
78 | cache[id].destroy = aborter.abort |
79 | cache[id].sync = sync |
80 | } |
81 | return cache[id] |
82 | } |
83 | |
84 | function use (id) { |
85 | newRemove.delete(id) |
86 | oldRemove.delete(id) |
87 | } |
88 | |
89 | function release (id) { |
90 | newRemove.add(id) |
91 | } |
92 | } |
93 |
Built with git-ssb-web