git ssb

1+

Daan Patchwork / patchwork



Tree: 06ded70b47c2b7b6dbd436a59f532f6caff0e67c

Files: 06ded70b47c2b7b6dbd436a59f532f6caff0e67c / lib / depject / backlinks / obs.js

4253 bytesRaw
1const nest = require('depnest')
2const Value = require('mutant/value')
3const onceTrue = require('mutant/once-true')
4const computed = require('mutant/computed')
5const resolve = require('mutant/resolve')
6const pull = require('pull-stream')
7const sorted = require('sorted-array-functions')
8const MutantPullCollection = require('../../mutant-pull-collection')
9const getTimestamp = require('../../get-timestamp')
10const getRoot = require('../../message/sync/root')
11
12exports.needs = nest({
13 'sbot.pull.backlinks': 'first',
14 'sbot.obs.connection': 'first',
15 'sbot.pull.stream': 'first'
16})
17
18exports.gives = nest({
19 'backlinks.obs.for': true,
20 'backlinks.obs.references': true,
21 'backlinks.obs.forks': true
22})
23
24exports.create = function (api) {
25 const cache = {}
26 const collections = {}
27
28 let loaded = false
29
30 // cycle remove sets for fast cleanup
31 let newRemove = new Set()
32 let oldRemove = new Set()
33
34 // run cache cleanup every 5 seconds
35 // an item will be removed from cache between 5 - 10 seconds after release
36 // this ensures that the data is still available for a page reload
37 const timer = setInterval(() => {
38 oldRemove.forEach(id => {
39 if (cache[id]) {
40 unsubscribe(id)
41 delete collections[id]
42 delete cache[id]
43 }
44 })
45 oldRemove.clear()
46
47 // cycle
48 const hold = oldRemove
49 oldRemove = newRemove
50 newRemove = hold
51 }, 5e3)
52
53 if (timer.unref) timer.unref()
54
55 return nest({
56 'backlinks.obs.for': (id) => backlinks(id),
57 'backlinks.obs.references': references,
58 'backlinks.obs.forks': forks
59 })
60
61 function references (msg) {
62 const id = msg.key
63 return MutantPullCollection((lastMessage) => {
64 return api.sbot.pull.stream((sbot) => sbot.patchwork.backlinks.referencesStream({ id, since: lastMessage && lastMessage.timestamp }))
65 })
66 }
67
68 function forks (msg) {
69 const id = msg.key
70 const rooted = !!getRoot(msg)
71 if (rooted) {
72 return MutantPullCollection((lastMessage) => {
73 return api.sbot.pull.stream((sbot) => sbot.patchwork.backlinks.forksStream({ id, since: lastMessage && lastMessage.timestamp }))
74 })
75 } else {
76 return []
77 }
78 }
79
80 function backlinks (id) {
81 load()
82 if (!cache[id]) {
83 const sync = Value(false)
84 const collection = Value([])
85 subscribe(id)
86
87 process.nextTick(() => {
88 pull(
89 api.sbot.pull.backlinks({
90 query: [{ $filter: { dest: id } }],
91 index: 'DTA' // use asserted timestamps
92 }),
93 pull.drain((msg) => {
94 const value = resolve(collection)
95 sorted.add(value, msg, compareAsserted)
96 collection.set(value)
97 }, () => {
98 sync.set(true)
99 })
100 )
101 })
102
103 collections[id] = collection
104 cache[id] = computed([collection], x => x, {
105 onListen: () => use(id),
106 onUnlisten: () => release(id)
107 })
108
109 cache[id].sync = sync
110 }
111 return cache[id]
112 }
113
114 function load () {
115 if (!loaded) {
116 pull(
117 api.sbot.pull.stream(sbot => sbot.patchwork.liveBacklinks.stream()),
118 pull.drain(msg => {
119 const collection = collections[msg.dest]
120 if (collection) {
121 const value = resolve(collection)
122 sorted.add(value, msg, compareAsserted)
123 collection.set(value)
124 }
125 })
126 )
127 loaded = true
128 }
129 }
130
131 function use (id) {
132 newRemove.delete(id)
133 oldRemove.delete(id)
134 }
135
136 function release (id) {
137 newRemove.add(id)
138 }
139
140 function subscribe (id) {
141 onceTrue(api.sbot.obs.connection(), (sbot) => sbot.patchwork.liveBacklinks.subscribe(id))
142 }
143
144 function unsubscribe (id) {
145 onceTrue(api.sbot.obs.connection(), (sbot) => sbot.patchwork.liveBacklinks.unsubscribe(id))
146 }
147
148 function compareAsserted (a, b) {
149 if (isReplyTo(a, b)) {
150 return -1
151 } else if (isReplyTo(b, a)) {
152 return 1
153 } else {
154 return getTimestamp(a) - getTimestamp(b)
155 }
156 }
157}
158
159function isReplyTo (maybeReply, msg) {
160 return (includesOrEquals(maybeReply.branch, msg.key))
161}
162
163function includesOrEquals (array, value) {
164 if (Array.isArray(array)) {
165 return array.includes(value)
166 } else {
167 return array === value
168 }
169}
170

Built with git-ssb-web