Files: 20cee95636c3adc788f738eff9c79fb6477c66d4 / sbot / roots.js
6133 bytesRaw
1 | |
2 | var pull = require('pull-stream') |
3 | var FlumeViewLevel = require('flumeview-level') |
4 | var pullCat = require('pull-cat') |
5 | var HLRU = require('hashlru') |
6 | var extend = require('xtend') |
7 | var normalizeChannel = require('../lib/normalize-channel') |
8 | |
9 | // HACK: pull it out of patchcore |
10 | var getRoot = require('patchcore/message/sync/root').create().message.sync.root |
11 | |
12 | module.exports = function (ssb, config) { |
13 | var create = FlumeViewLevel(1, function (msg, seq) { |
14 | var result = [ |
15 | [msg.value.timestamp, getRoot(msg) || msg.key] |
16 | ] |
17 | return result |
18 | }) |
19 | |
20 | var index = ssb._flumeUse('patchwork-roots', create) |
21 | |
22 | // cache mostly just to avoid reading the same roots over and over again |
23 | // not really big enough for multiple refresh cycles |
24 | var cache = HLRU(100) |
25 | |
26 | return { |
27 | latest: function ({ids = [ssb.id]}) { |
28 | var filter = null |
29 | return pull( |
30 | // READ INDEX |
31 | index.read({old: false}), |
32 | |
33 | // LOAD FILTERS |
34 | pull.asyncMap((item, cb) => { |
35 | if (!filter) { |
36 | // pause stream until filters have loaded |
37 | getFilter((err, result) => { |
38 | if (err) return cb(err) |
39 | filter = result |
40 | cb(null, item) |
41 | }) |
42 | } else { |
43 | cb(null, item) |
44 | } |
45 | }), |
46 | |
47 | // BUMP FILTER |
48 | pull.filter(item => { |
49 | if (filter && item.value && item.value) { |
50 | return filter(ids, item.value) |
51 | } |
52 | }), |
53 | |
54 | // LOOKUP ROOTS |
55 | pull.asyncMap((item, cb) => { |
56 | var msg = item.value |
57 | var key = item.key[1] |
58 | if (key === msg.key) { |
59 | // already a root |
60 | cb(null, msg) |
61 | } |
62 | getThruCache(key, (_, value) => { |
63 | cb(null, extend(msg, { |
64 | root: value |
65 | })) |
66 | }) |
67 | }), |
68 | |
69 | // FILTER |
70 | pull.filter(item => { |
71 | var root = item.root || item |
72 | if (filter && root && root.value && !getRoot(root)) { |
73 | return filter(ids, root) |
74 | } |
75 | }) |
76 | ) |
77 | }, |
78 | read: function ({ids = [ssb.id], reverse, live, old, limit, lt, gt}) { |
79 | var opts = {reverse, live, old} |
80 | |
81 | // handle markers passed in to lt / gt |
82 | if (lt && typeof lt.timestamp === 'number') lt = lt.timestamp |
83 | if (gt && typeof gt.timestamp === 'number') gt = gt.timestamp |
84 | if (typeof lt === 'number') opts.lt = [lt] |
85 | if (typeof gt === 'number') opts.gt = [gt] |
86 | |
87 | var seen = new Set() |
88 | var marker = {marker: true, timestamp: null} |
89 | var filter = null |
90 | |
91 | var stream = pull( |
92 | |
93 | // READ ROOTS |
94 | index.read(opts), |
95 | |
96 | // LOAD FILTERS |
97 | pull.asyncMap((item, cb) => { |
98 | if (!filter) { |
99 | // pause stream until filters have loaded |
100 | getFilter((err, result) => { |
101 | if (err) return cb(err) |
102 | filter = result |
103 | cb(null, item) |
104 | }) |
105 | } else { |
106 | cb(null, item) |
107 | } |
108 | }), |
109 | |
110 | // BUMP FILTER |
111 | pull.filter(item => { |
112 | if (filter && item.value && item.value.value) { |
113 | return filter(ids, item.value) |
114 | } |
115 | }), |
116 | |
117 | // MAP ROOTS |
118 | pull.map(item => { |
119 | if (item.sync) return item |
120 | marker.timestamp = item.key[0] |
121 | return item.key[1] |
122 | }), |
123 | |
124 | // UNIQUE |
125 | pull.filter(item => { |
126 | if (old === false) return true // don't filter live stream |
127 | if (item && item.sync) { |
128 | return true |
129 | } else if (typeof item === 'string') { |
130 | if (!seen.has(item)) { |
131 | seen.add(item) |
132 | return true |
133 | } |
134 | } |
135 | }), |
136 | |
137 | // LOOKUP (with cache) |
138 | pull.asyncMap((item, cb) => { |
139 | if (item.sync) return cb(null, item) |
140 | var key = item |
141 | getThruCache(key, cb) |
142 | }), |
143 | |
144 | // ROOT FILTER |
145 | pull.filter(msg => { |
146 | if (filter && msg.value && !getRoot(msg)) { |
147 | return filter(ids, msg) |
148 | } |
149 | }) |
150 | ) |
151 | |
152 | // TRUNCATE |
153 | if (typeof limit === 'number') { |
154 | var count = 0 |
155 | return pullCat([ |
156 | pull( |
157 | stream, |
158 | pull.take(limit), |
159 | pull.through(() => { |
160 | count += 1 |
161 | }) |
162 | ), |
163 | |
164 | // send truncated marker for resuming search |
165 | pull( |
166 | pull.values([marker]), |
167 | pull.filter(() => count === limit) |
168 | ) |
169 | ]) |
170 | } else { |
171 | return stream |
172 | } |
173 | } |
174 | } |
175 | |
176 | function getThruCache (key, cb) { |
177 | if (cache.has(key)) { |
178 | cb(null, cache.get(key)) |
179 | } else { |
180 | ssb.get(key, (_, value) => { |
181 | var msg = {key, value} |
182 | if (msg.value) { |
183 | cache.set(key, msg) |
184 | } |
185 | cb(null, msg) |
186 | }) |
187 | } |
188 | } |
189 | |
190 | function getFilter (cb) { |
191 | // TODO: rewrite contacts stream |
192 | ssb.contacts.get((err, contacts) => { |
193 | if (err) return cb(err) |
194 | ssb.patchwork.getSubscriptions((err, subscriptions) => { |
195 | if (err) return cb(err) |
196 | cb(null, function (ids, msg) { |
197 | var type = msg.value.content.type |
198 | var matchesChannel = (type !== 'channel' && checkChannel(subscriptions, ids, msg.value.content.channel)) |
199 | return ids.includes(msg.value.author) || matchesChannel || checkFollowing(contacts, ids, msg.value.author) |
200 | }) |
201 | }) |
202 | }) |
203 | } |
204 | } |
205 | |
206 | function checkFollowing (lookup, ids, target) { |
207 | // TODO: rewrite contacts index (for some reason the order is different) |
208 | var value = mostRecentValue(ids.map(id => lookup[id].following && lookup[id].following[target]), 1) |
209 | return value && value[0] |
210 | } |
211 | |
212 | function checkChannel (lookup, ids, channel) { |
213 | channel = normalizeChannel(channel) |
214 | if (channel) { |
215 | var value = mostRecentValue(ids.map(id => lookup[`${id}:${channel}`])) |
216 | return value && value[1] |
217 | } |
218 | } |
219 | |
220 | function mostRecentValue (values, timestampIndex = 0) { |
221 | var mostRecent = null |
222 | values.forEach(value => { |
223 | if (value && (!mostRecent || mostRecent[timestampIndex] < value[timestampIndex])) { |
224 | mostRecent = value |
225 | } |
226 | }) |
227 | return mostRecent |
228 | } |
229 |
Built with git-ssb-web