git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: 0b68762e851dab73abc45fa60add0d96d3b5c7ac

Files: 0b68762e851dab73abc45fa60add0d96d3b5c7ac / backlinks / obs-cache.js

2505 bytesRaw
1var Value = require('mutant/value')
2var computed = require('mutant/computed')
3var Abortable = require('pull-abortable')
4var resolve = require('mutant/resolve')
5var pull = require('pull-stream')
6var onceIdle = require('mutant/once-idle')
7var nest = require('depnest')
8
9exports.gives = nest('backlinks.obs.cache', true)
10
11exports.create = function (api) {
12
13 return nest({
14 'backlinks.obs.cache': (cacheForMilliSeconds) => createCache(cacheForMilliSeconds)
15 })
16}
17
18/**
19 * Creates a cache for backlinks observables by thread ID. The cache entry is an obserable
20 * list of messages built from a backlinks stream. The cache is evicted if there are no
21 * listeners for the given backlinks observable for the configured amount of time,
22 * or a default of 5 seconds
23 */
24function createCache(cacheForMilliSeconds) {
25
26 var cache = {}
27
28 var newRemove = new Set()
29 var oldRemove = new Set()
30
31 function use (id) {
32 newRemove.delete(id)
33 oldRemove.delete(id)
34 }
35
36 function release (id) {
37 newRemove.add(id)
38 }
39
40 var timer = setInterval(() => {
41 oldRemove.forEach(id => {
42 if (cache[id]) {
43 cache[id].destroy()
44 delete cache[id]
45 }
46 })
47 oldRemove.clear()
48
49 // cycle
50 var hold = oldRemove
51 oldRemove = newRemove
52 newRemove = hold
53 }, cacheForMilliSeconds || 5e3)
54
55 if (timer.unref) timer.unref()
56
57 /**
58 * Takes a thread ID (for the cache ID) and a pull stream to populate the
59 * backlinks observable with. After the backlinks obserable is unsubscribed
60 * from it is cached for the configured amount of time before the pull stream
61 * is aborted unless there is a new incoming listener
62 */
63 function cachedBacklinks (id, backlinksPullStream) {
64
65 if (!cache[id]) {
66 var sync = Value(false)
67 var aborter = Abortable()
68 var collection = Value([])
69
70 // try not to saturate the thread
71 onceIdle(() => {
72 pull(
73 backlinksPullStream,
74 aborter,
75 pull.drain((msg) => {
76 if (msg.sync) {
77 sync.set(true)
78 } else {
79 var value = resolve(collection)
80 value.push(msg)
81 collection.set(value)
82 }
83 })
84 )
85 })
86
87 cache[id] = computed([collection], x => x, {
88 onListen: () => use(id),
89 onUnlisten: () => release(id)
90 })
91
92 cache[id].destroy = aborter.abort
93 cache[id].sync = sync
94 }
95 return cache[id]
96 }
97
98 return {
99 cachedBacklinks: cachedBacklinks
100 }
101}
102

Built with git-ssb-web