Files: ebe8d562f226c534e1f12af64c355eb6ba6dba13 / backlinks / obs.js
2048 bytesRaw
1 | var nest = require('depnest') |
2 | var Value = require('mutant/value') |
3 | var computed = require('mutant/computed') |
4 | var Abortable = require('pull-abortable') |
5 | var resolve = require('mutant/resolve') |
6 | var pull = require('pull-stream') |
7 | var onceIdle = require('mutant/once-idle') |
8 | |
9 | exports.needs = nest({ |
10 | 'sbot.pull.backlinks': 'first' |
11 | }) |
12 | |
13 | exports.gives = nest('backlinks.obs.for', true) |
14 | |
15 | exports.create = function (api) { |
16 | var cache = {} |
17 | |
18 | // cycle remove sets for fast cleanup |
19 | var newRemove = new Set() |
20 | var oldRemove = new Set() |
21 | |
22 | // run cache cleanup every 5 seconds |
23 | // an item will be removed from cache between 5 - 10 seconds after release |
24 | // this ensures that the data is still available for a page reload |
25 | var timer = setInterval(() => { |
26 | oldRemove.forEach(id => { |
27 | cache[id].destroy() |
28 | delete cache[id] |
29 | }) |
30 | oldRemove.clear() |
31 | |
32 | // cycle |
33 | var hold = oldRemove |
34 | oldRemove = newRemove |
35 | newRemove = hold |
36 | }, 5e3) |
37 | |
38 | if (timer.unref) timer.unref() |
39 | |
40 | return nest({ |
41 | 'backlinks.obs.for': (id) => backlinks(id) |
42 | }) |
43 | |
44 | function backlinks (id) { |
45 | if (!cache[id]) { |
46 | var sync = Value(false) |
47 | var aborter = Abortable() |
48 | var collection = Value([]) |
49 | |
50 | // try not to saturate the thread |
51 | onceIdle(() => { |
52 | pull( |
53 | api.sbot.pull.backlinks({ |
54 | query: [ {$filter: { dest: id }} ], |
55 | live: true |
56 | }), |
57 | aborter, |
58 | pull.drain((msg) => { |
59 | if (msg.sync) { |
60 | sync.set(true) |
61 | } else { |
62 | var value = resolve(collection) |
63 | value.push(msg) |
64 | collection.set(value) |
65 | } |
66 | }) |
67 | ) |
68 | }) |
69 | |
70 | cache[id] = computed([collection], x => x, { |
71 | onListen: () => use(id), |
72 | onUnlisten: () => release(id) |
73 | }) |
74 | |
75 | cache[id].destroy = aborter.abort |
76 | cache[id].sync = sync |
77 | } |
78 | return cache[id] |
79 | } |
80 | |
81 | function use (id) { |
82 | newRemove.delete(id) |
83 | oldRemove.delete(id) |
84 | } |
85 | |
86 | function release (id) { |
87 | newRemove.add(id) |
88 | } |
89 | } |
90 |
Built with git-ssb-web