git ssb

10+

Matt McKegg / patchwork



Tree: 4f2f50a77ef55ab8e505ad02c47746b3cab34157

Files: 4f2f50a77ef55ab8e505ad02c47746b3cab34157 / overrides / patchcore / backlinks / obs.js

3340 bytesRaw
1var nest = require('depnest')
2var Value = require('mutant/value')
3var onceTrue = require('mutant/once-true')
4var computed = require('mutant/computed')
5var resolve = require('mutant/resolve')
6var pull = require('pull-stream')
7var onceIdle = require('mutant/once-idle')
8var sorted = require('sorted-array-functions')
9
10exports.needs = nest({
11 'sbot.pull.backlinks': 'first',
12 'sbot.obs.connection': 'first',
13 'sbot.pull.stream': 'first'
14})
15
16exports.gives = nest('backlinks.obs.for', true)
17
18exports.create = function (api) {
19 var cache = {}
20 var collections = {}
21
22 var loaded = false
23
24 // cycle remove sets for fast cleanup
25 var newRemove = new Set()
26 var oldRemove = new Set()
27
28 // run cache cleanup every 5 seconds
29 // an item will be removed from cache between 5 - 10 seconds after release
30 // this ensures that the data is still available for a page reload
31 var timer = setInterval(() => {
32 oldRemove.forEach(id => {
33 if (cache[id]) {
34 unsubscribe(id)
35 delete collections[id]
36 delete cache[id]
37 }
38 })
39 oldRemove.clear()
40
41 // cycle
42 var hold = oldRemove
43 oldRemove = newRemove
44 newRemove = hold
45 }, 5e3)
46
47 if (timer.unref) timer.unref()
48
49 return nest({
50 'backlinks.obs.for': (id) => backlinks(id)
51 })
52
53 function backlinks (id) {
54 load()
55 if (!cache[id]) {
56 var sync = Value(false)
57 var collection = Value([])
58 subscribe(id)
59
60 // try not to saturate the thread
61 onceIdle(() => {
62 pull(
63 api.sbot.pull.backlinks({
64 query: [ {$filter: { dest: id }} ],
65 index: 'DTA' // use asserted timestamps
66 }),
67 pull.drain((msg) => {
68 var value = resolve(collection)
69 sorted.add(value, msg, compareAsserted)
70 collection.set(value)
71 }, () => {
72 sync.set(true)
73 })
74 )
75 })
76
77 collections[id] = collection
78 cache[id] = computed([collection], x => x, {
79 onListen: () => use(id),
80 onUnlisten: () => release(id)
81 })
82
83 cache[id].sync = sync
84 }
85 return cache[id]
86 }
87
88 function load () {
89 if (!loaded) {
90 pull(
91 api.sbot.pull.stream(sbot => sbot.patchwork.liveBacklinks.stream()),
92 pull.drain(msg => {
93 var collection = collections[msg.dest]
94 if (collection) {
95 var value = resolve(collection)
96 value.push(msg)
97 collection.set(value)
98 }
99 })
100 )
101 loaded = true
102 }
103 }
104
105 function use (id) {
106 newRemove.delete(id)
107 oldRemove.delete(id)
108 }
109
110 function release (id) {
111 newRemove.add(id)
112 }
113
114 function subscribe (id) {
115 onceTrue(api.sbot.obs.connection(), (sbot) => sbot.patchwork.liveBacklinks.subscribe(id))
116 }
117
118 function unsubscribe (id) {
119 onceTrue(api.sbot.obs.connection(), (sbot) => sbot.patchwork.liveBacklinks.unsubscribe(id))
120 }
121}
122
123function compareAsserted (a, b) {
124 if (isReplyTo(a, b)) {
125 return -1
126 } else if (isReplyTo(b, a)) {
127 return 1
128 } else {
129 return a.value.timestamp - b.value.timestamp
130 }
131}
132
133function isReplyTo (maybeReply, msg) {
134 return (includesOrEquals(maybeReply.branch, msg.key))
135}
136
137function includesOrEquals (array, value) {
138 if (Array.isArray(array)) {
139 return array.includes(value)
140 } else {
141 return array === value
142 }
143}
144

Built with git-ssb-web