git ssb

10+

Matt McKegg / patchwork



Tree: 5624a137f19bc508201cff6468ac5b3fffef93cf

Files: 5624a137f19bc508201cff6468ac5b3fffef93cf / overrides / patchcore / backlinks / obs.js

3443 bytesRaw
1var nest = require('depnest')
2var Value = require('mutant/value')
3var onceTrue = require('mutant/once-true')
4var computed = require('mutant/computed')
5var resolve = require('mutant/resolve')
6var pull = require('pull-stream')
7var onceIdle = require('mutant/once-idle')
8var sorted = require('sorted-array-functions')
9
10exports.needs = nest({
11 'sbot.pull.backlinks': 'first',
12 'sbot.obs.connection': 'first',
13 'sbot.pull.stream': 'first',
14 'message.sync.timestamp': 'first'
15})
16
17exports.gives = nest('backlinks.obs.for', true)
18
19exports.create = function (api) {
20 var cache = {}
21 var collections = {}
22
23 var loaded = false
24
25 // cycle remove sets for fast cleanup
26 var newRemove = new Set()
27 var oldRemove = new Set()
28
29 // run cache cleanup every 5 seconds
30 // an item will be removed from cache between 5 - 10 seconds after release
31 // this ensures that the data is still available for a page reload
32 var timer = setInterval(() => {
33 oldRemove.forEach(id => {
34 if (cache[id]) {
35 unsubscribe(id)
36 delete collections[id]
37 delete cache[id]
38 }
39 })
40 oldRemove.clear()
41
42 // cycle
43 var hold = oldRemove
44 oldRemove = newRemove
45 newRemove = hold
46 }, 5e3)
47
48 if (timer.unref) timer.unref()
49
50 return nest({
51 'backlinks.obs.for': (id) => backlinks(id)
52 })
53
54 function backlinks (id) {
55 load()
56 if (!cache[id]) {
57 var sync = Value(false)
58 var collection = Value([])
59 subscribe(id)
60
61 // try not to saturate the thread
62 onceIdle(() => {
63 pull(
64 api.sbot.pull.backlinks({
65 query: [ {$filter: { dest: id }} ],
66 index: 'DTA' // use asserted timestamps
67 }),
68 pull.drain((msg) => {
69 var value = resolve(collection)
70 sorted.add(value, msg, compareAsserted)
71 collection.set(value)
72 }, () => {
73 sync.set(true)
74 })
75 )
76 })
77
78 collections[id] = collection
79 cache[id] = computed([collection], x => x, {
80 onListen: () => use(id),
81 onUnlisten: () => release(id)
82 })
83
84 cache[id].sync = sync
85 }
86 return cache[id]
87 }
88
89 function load () {
90 if (!loaded) {
91 pull(
92 api.sbot.pull.stream(sbot => sbot.patchwork.liveBacklinks.stream()),
93 pull.drain(msg => {
94 var collection = collections[msg.dest]
95 if (collection) {
96 var value = resolve(collection)
97 sorted.add(value, msg, compareAsserted)
98 collection.set(value)
99 }
100 })
101 )
102 loaded = true
103 }
104 }
105
106 function use (id) {
107 newRemove.delete(id)
108 oldRemove.delete(id)
109 }
110
111 function release (id) {
112 newRemove.add(id)
113 }
114
115 function subscribe (id) {
116 onceTrue(api.sbot.obs.connection(), (sbot) => sbot.patchwork.liveBacklinks.subscribe(id))
117 }
118
119 function unsubscribe (id) {
120 onceTrue(api.sbot.obs.connection(), (sbot) => sbot.patchwork.liveBacklinks.unsubscribe(id))
121 }
122
123 function compareAsserted (a, b) {
124 if (isReplyTo(a, b)) {
125 return -1
126 } else if (isReplyTo(b, a)) {
127 return 1
128 } else {
129 return api.message.sync.timestamp(a) - api.message.sync.timestamp(b)
130 }
131 }
132}
133
134function isReplyTo (maybeReply, msg) {
135 return (includesOrEquals(maybeReply.branch, msg.key))
136}
137
138function includesOrEquals (array, value) {
139 if (Array.isArray(array)) {
140 return array.includes(value)
141 } else {
142 return array === value
143 }
144}
145

Built with git-ssb-web