Files: 0b68762e851dab73abc45fa60add0d96d3b5c7ac / backlinks / obs-cache.js
2505 bytesRaw
1 | var Value = require('mutant/value') |
2 | var computed = require('mutant/computed') |
3 | var Abortable = require('pull-abortable') |
4 | var resolve = require('mutant/resolve') |
5 | var pull = require('pull-stream') |
6 | var onceIdle = require('mutant/once-idle') |
7 | var nest = require('depnest') |
8 | |
9 | exports.gives = nest('backlinks.obs.cache', true) |
10 | |
11 | exports.create = function (api) { |
12 | |
13 | return nest({ |
14 | 'backlinks.obs.cache': (cacheForMilliSeconds) => createCache(cacheForMilliSeconds) |
15 | }) |
16 | } |
17 | |
18 | /** |
19 | * Creates a cache for backlinks observables by thread ID. The cache entry is an obserable |
20 | * list of messages built from a backlinks stream. The cache is evicted if there are no |
21 | * listeners for the given backlinks observable for the configured amount of time, |
22 | * or a default of 5 seconds |
23 | */ |
24 | function createCache(cacheForMilliSeconds) { |
25 | |
26 | var cache = {} |
27 | |
28 | var newRemove = new Set() |
29 | var oldRemove = new Set() |
30 | |
31 | function use (id) { |
32 | newRemove.delete(id) |
33 | oldRemove.delete(id) |
34 | } |
35 | |
36 | function release (id) { |
37 | newRemove.add(id) |
38 | } |
39 | |
40 | var timer = setInterval(() => { |
41 | oldRemove.forEach(id => { |
42 | if (cache[id]) { |
43 | cache[id].destroy() |
44 | delete cache[id] |
45 | } |
46 | }) |
47 | oldRemove.clear() |
48 | |
49 | // cycle |
50 | var hold = oldRemove |
51 | oldRemove = newRemove |
52 | newRemove = hold |
53 | }, cacheForMilliSeconds || 5e3) |
54 | |
55 | if (timer.unref) timer.unref() |
56 | |
57 | /** |
58 | * Takes a thread ID (for the cache ID) and a pull stream to populate the |
59 | * backlinks observable with. After the backlinks obserable is unsubscribed |
60 | * from it is cached for the configured amount of time before the pull stream |
61 | * is aborted unless there is a new incoming listener |
62 | */ |
63 | function cachedBacklinks (id, backlinksPullStream) { |
64 | |
65 | if (!cache[id]) { |
66 | var sync = Value(false) |
67 | var aborter = Abortable() |
68 | var collection = Value([]) |
69 | |
70 | // try not to saturate the thread |
71 | onceIdle(() => { |
72 | pull( |
73 | backlinksPullStream, |
74 | aborter, |
75 | pull.drain((msg) => { |
76 | if (msg.sync) { |
77 | sync.set(true) |
78 | } else { |
79 | var value = resolve(collection) |
80 | value.push(msg) |
81 | collection.set(value) |
82 | } |
83 | }) |
84 | ) |
85 | }) |
86 | |
87 | cache[id] = computed([collection], x => x, { |
88 | onListen: () => use(id), |
89 | onUnlisten: () => release(id) |
90 | }) |
91 | |
92 | cache[id].destroy = aborter.abort |
93 | cache[id].sync = sync |
94 | } |
95 | return cache[id] |
96 | } |
97 | |
98 | return { |
99 | cachedBacklinks: cachedBacklinks |
100 | } |
101 | } |
102 |
Built with git-ssb-web