Files: fe907a3f25ec3a3633a139910a155b8e0a1afa16 / sbot / roots.js
6045 bytesRaw
1 | |
2 | var pull = require('pull-stream') |
3 | var FlumeViewLevel = require('flumeview-level') |
4 | var pullCat = require('pull-cat') |
5 | var HLRU = require('hashlru') |
6 | var extend = require('xtend') |
7 | |
8 | // HACK: pull it out of patchcore |
9 | var getRoot = require('patchcore/message/sync/root').create().message.sync.root |
10 | |
11 | module.exports = function (ssb, config) { |
12 | var create = FlumeViewLevel(1, function (msg, seq) { |
13 | var result = [ |
14 | [msg.value.timestamp, getRoot(msg) || msg.key] |
15 | ] |
16 | return result |
17 | }) |
18 | |
19 | var index = ssb._flumeUse('patchwork-roots', create) |
20 | |
21 | // cache mostly just to avoid reading the same roots over and over again |
22 | // not really big enough for multiple refresh cycles |
23 | var cache = HLRU(100) |
24 | |
25 | return { |
26 | latest: function ({ids = [ssb.id]}) { |
27 | var filter = null |
28 | return pull( |
29 | // READ INDEX |
30 | index.read({old: false}), |
31 | |
32 | // LOAD FILTERS |
33 | pull.asyncMap((item, cb) => { |
34 | if (!filter) { |
35 | // pause stream until filters have loaded |
36 | getFilter((err, result) => { |
37 | if (err) return cb(err) |
38 | filter = result |
39 | cb(null, item) |
40 | }) |
41 | } else { |
42 | cb(null, item) |
43 | } |
44 | }), |
45 | |
46 | // BUMP FILTER |
47 | pull.filter(item => { |
48 | if (filter && item.value && item.value) { |
49 | return filter(ids, item.value) |
50 | } |
51 | }), |
52 | |
53 | // LOOKUP ROOTS |
54 | pull.asyncMap((item, cb) => { |
55 | var msg = item.value |
56 | var key = item.key[1] |
57 | if (key === msg.key) { |
58 | // already a root |
59 | cb(null, msg) |
60 | } |
61 | getThruCache(key, (_, value) => { |
62 | cb(null, extend(msg, { |
63 | root: value |
64 | })) |
65 | }) |
66 | }), |
67 | |
68 | // FILTER |
69 | pull.filter(item => { |
70 | var root = item.root || item |
71 | if (filter && root && root.value && !getRoot(root)) { |
72 | return filter(ids, root) |
73 | } |
74 | }) |
75 | ) |
76 | }, |
77 | read: function ({ids = [ssb.id], reverse, live, old, limit, lt, gt}) { |
78 | var opts = {reverse, live, old} |
79 | |
80 | // handle markers passed in to lt / gt |
81 | if (lt && typeof lt.timestamp === 'number') lt = lt.timestamp |
82 | if (gt && typeof gt.timestamp === 'number') gt = gt.timestamp |
83 | if (typeof lt === 'number') opts.lt = [lt] |
84 | if (typeof gt === 'number') opts.gt = [gt] |
85 | |
86 | var seen = new Set() |
87 | var marker = {marker: true, timestamp: null} |
88 | var filter = null |
89 | |
90 | var stream = pull( |
91 | |
92 | // READ ROOTS |
93 | index.read(opts), |
94 | |
95 | // LOAD FILTERS |
96 | pull.asyncMap((item, cb) => { |
97 | if (!filter) { |
98 | // pause stream until filters have loaded |
99 | getFilter((err, result) => { |
100 | if (err) return cb(err) |
101 | filter = result |
102 | cb(null, item) |
103 | }) |
104 | } else { |
105 | cb(null, item) |
106 | } |
107 | }), |
108 | |
109 | // BUMP FILTER |
110 | pull.filter(item => { |
111 | if (filter && item.value && item.value.value) { |
112 | return filter(ids, item.value) |
113 | } |
114 | }), |
115 | |
116 | // MAP ROOTS |
117 | pull.map(item => { |
118 | if (item.sync) return item |
119 | marker.timestamp = item.key[0] |
120 | return item.key[1] |
121 | }), |
122 | |
123 | // UNIQUE |
124 | pull.filter(item => { |
125 | if (old === false) return true // don't filter live stream |
126 | if (item && item.sync) { |
127 | return true |
128 | } else if (typeof item === 'string') { |
129 | if (!seen.has(item)) { |
130 | seen.add(item) |
131 | return true |
132 | } |
133 | } |
134 | }), |
135 | |
136 | // LOOKUP (with cache) |
137 | pull.asyncMap((item, cb) => { |
138 | if (item.sync) return cb(null, item) |
139 | var key = item |
140 | getThruCache(key, cb) |
141 | }), |
142 | |
143 | // ROOT FILTER |
144 | pull.filter(msg => { |
145 | if (filter && msg.value && !getRoot(msg)) { |
146 | return filter(ids, msg) |
147 | } |
148 | }) |
149 | ) |
150 | |
151 | // TRUNCATE |
152 | if (typeof limit === 'number') { |
153 | var count = 0 |
154 | return pullCat([ |
155 | pull( |
156 | stream, |
157 | pull.take(limit), |
158 | pull.through(() => { |
159 | count += 1 |
160 | }) |
161 | ), |
162 | |
163 | // send truncated marker for resuming search |
164 | pull( |
165 | pull.values([marker]), |
166 | pull.filter(() => count === limit) |
167 | ) |
168 | ]) |
169 | } else { |
170 | return stream |
171 | } |
172 | } |
173 | } |
174 | |
175 | function getThruCache (key, cb) { |
176 | if (cache.has(key)) { |
177 | cb(null, cache.get(key)) |
178 | } else { |
179 | ssb.get(key, (_, value) => { |
180 | var msg = {key, value} |
181 | if (msg.value) { |
182 | cache.set(key, msg) |
183 | } |
184 | cb(null, msg) |
185 | }) |
186 | } |
187 | } |
188 | |
189 | function getFilter (cb) { |
190 | // TODO: rewrite contacts stream |
191 | ssb.contacts.get((err, contacts) => { |
192 | if (err) return cb(err) |
193 | ssb.patchwork.getSubscriptions((err, subscriptions) => { |
194 | if (err) return cb(err) |
195 | cb(null, function (ids, msg) { |
196 | return ( |
197 | ids.includes(msg.value.author) || |
198 | checkFollowing(contacts, ids, msg.value.author) || |
199 | checkChannel(subscriptions, ids, msg.value.content.channel) |
200 | ) |
201 | }) |
202 | }) |
203 | }) |
204 | } |
205 | } |
206 | |
207 | function checkFollowing (lookup, ids, target) { |
208 | // TODO: rewrite contacts index (for some reason the order is different) |
209 | var value = mostRecentValue(ids.map(id => lookup[id].following && lookup[id].following[target]), 1) |
210 | return value && value[0] |
211 | } |
212 | |
213 | function checkChannel (lookup, ids, channel) { |
214 | channel = typeof channel === 'string' ? channel.replace(/\s/g, '') : null |
215 | if (channel) { |
216 | var value = mostRecentValue(ids.map(id => lookup[`${id}:channel`])) |
217 | return value && value[1] |
218 | } |
219 | } |
220 | |
221 | function mostRecentValue (values, timestampIndex = 0) { |
222 | var mostRecent = null |
223 | values.forEach(value => { |
224 | if (value && (!mostRecent || mostRecent[timestampIndex] < value[timestampIndex])) { |
225 | mostRecent = value |
226 | } |
227 | }) |
228 | return mostRecent |
229 | } |
230 |
Built with git-ssb-web