Files: c0e1e89b2fb18a9f6e4ed67ff823131c49035921 / sbot / roots.js
5872 bytesRaw
1 | |
2 | var pull = require('pull-stream') |
3 | var FlumeViewLevel = require('flumeview-level') |
4 | var pullCat = require('pull-cat') |
5 | var HLRU = require('hashlru') |
6 | var extend = require('xtend') |
7 | |
8 | // HACK: pull it out of patchcore |
9 | var getRoot = require('patchcore/message/sync/root').create().message.sync.root |
10 | |
11 | module.exports = function (ssb, config) { |
12 | var create = FlumeViewLevel(1, function (msg, seq) { |
13 | var result = [ |
14 | [msg.value.timestamp, getRoot(msg) || msg.key] |
15 | ] |
16 | return result |
17 | }) |
18 | |
19 | var index = ssb._flumeUse('patchwork-roots', create) |
20 | |
21 | // cache mostly just to avoid reading the same roots over and over again |
22 | // not really big enough for multiple refresh cycles |
23 | var cache = HLRU(100) |
24 | |
25 | return { |
26 | latest: function ({ids = [ssb.id]}) { |
27 | var filter = null |
28 | return pull( |
29 | // READ INDEX |
30 | index.read({old: false}), |
31 | |
32 | // LOAD FILTERS |
33 | pull.asyncMap((item, cb) => { |
34 | if (!filter) { |
35 | // pause stream until filters have loaded |
36 | getFilter((err, result) => { |
37 | if (err) return cb(err) |
38 | filter = result |
39 | cb(null, item) |
40 | }) |
41 | } else { |
42 | cb(null, item) |
43 | } |
44 | }), |
45 | |
46 | // BUMP FILTER |
47 | pull.filter(item => { |
48 | if (filter && item.value && item.value) { |
49 | return filter(ids, item.value) |
50 | } |
51 | }), |
52 | |
53 | // LOOKUP ROOTS |
54 | pull.asyncMap((item, cb) => { |
55 | var msg = item.value |
56 | var key = item.key[1] |
57 | if (key === msg.key) { |
58 | // already a root |
59 | cb(null, msg) |
60 | } |
61 | getThruCache(key, (_, value) => { |
62 | cb(null, extend(msg, { |
63 | root: value |
64 | })) |
65 | }) |
66 | }), |
67 | |
68 | // FILTER |
69 | pull.filter(item => { |
70 | console.log('filter', item) |
71 | var root = item.root || item |
72 | if (filter && root && root.value && !getRoot(root)) { |
73 | return filter(ids, root) |
74 | } |
75 | }) |
76 | ) |
77 | }, |
78 | read: function ({ids = [ssb.id], reverse, live, old, limit, lt, gt}) { |
79 | var opts = {reverse, live, old} |
80 | |
81 | // handle markers passed in to lt / gt |
82 | if (lt && typeof lt.timestamp === 'number') lt = lt.timestamp |
83 | if (gt && typeof gt.timestamp === 'number') gt = gt.timestamp |
84 | if (typeof lt === 'number') opts.lt = [lt] |
85 | if (typeof gt === 'number') opts.gt = [gt] |
86 | |
87 | var seen = new Set() |
88 | var marker = {marker: true, timestamp: null} |
89 | var filter = null |
90 | |
91 | var stream = pull( |
92 | |
93 | // READ ROOTS |
94 | index.read(opts), |
95 | |
96 | // LOAD FILTERS |
97 | pull.asyncMap((item, cb) => { |
98 | if (!filter) { |
99 | // pause stream until filters have loaded |
100 | getFilter((err, result) => { |
101 | if (err) return cb(err) |
102 | filter = result |
103 | cb(null, item) |
104 | }) |
105 | } else { |
106 | cb(null, item) |
107 | } |
108 | }), |
109 | |
110 | // BUMP FILTER |
111 | pull.filter(item => { |
112 | if (filter && item.value && item.value.value) { |
113 | return filter(ids, item.value) |
114 | } |
115 | }), |
116 | |
117 | // MAP ROOTS |
118 | pull.map(item => { |
119 | if (item.sync) return item |
120 | marker.timestamp = item.key[0] |
121 | return item.key[1] |
122 | }), |
123 | |
124 | // UNIQUE |
125 | pull.filter(item => { |
126 | if (old === false) return true // don't filter live stream |
127 | if (item && item.sync) { |
128 | return true |
129 | } else if (typeof item === 'string') { |
130 | if (!seen.has(item)) { |
131 | seen.add(item) |
132 | return true |
133 | } |
134 | } |
135 | }), |
136 | |
137 | // LOOKUP (with cache) |
138 | pull.asyncMap((item, cb) => { |
139 | if (item.sync) return cb(null, item) |
140 | var key = item |
141 | getThruCache(key, cb) |
142 | }), |
143 | |
144 | // ROOT FILTER |
145 | pull.filter(msg => { |
146 | if (filter && msg.value && !getRoot(msg)) { |
147 | return filter(ids, msg) |
148 | } |
149 | }) |
150 | ) |
151 | |
152 | // TRUNCATE |
153 | if (typeof limit === 'number') { |
154 | return pullCat([ |
155 | pull( |
156 | stream, |
157 | pull.take(limit) |
158 | ), |
159 | |
160 | // send truncated marker for resuming search |
161 | pull.values([marker]) |
162 | ]) |
163 | } |
164 | } |
165 | } |
166 | |
167 | function getThruCache (key, cb) { |
168 | if (cache.has(key)) { |
169 | cb(null, cache.get(key)) |
170 | } else { |
171 | ssb.get(key, (_, value) => { |
172 | var msg = {key, value} |
173 | if (msg.value) { |
174 | cache.set(key, msg) |
175 | } |
176 | cb(null, msg) |
177 | }) |
178 | } |
179 | } |
180 | |
181 | function getFilter (cb) { |
182 | // TODO: rewrite contacts stream |
183 | ssb.contacts.get((err, contacts) => { |
184 | if (err) return cb(err) |
185 | ssb.patchwork.getSubscriptions((err, subscriptions) => { |
186 | if (err) return cb(err) |
187 | cb(null, function (ids, msg) { |
188 | return ( |
189 | ids.includes(msg.value.author) || |
190 | checkFollowing(contacts, ids, msg.value.author) || |
191 | checkChannel(subscriptions, ids, msg.value.content.channel) |
192 | ) |
193 | }) |
194 | }) |
195 | }) |
196 | } |
197 | } |
198 | |
199 | function checkFollowing (lookup, ids, target) { |
200 | // TODO: rewrite contacts index (for some reason the order is different) |
201 | var value = mostRecentValue(ids.map(id => lookup[id].following && lookup[id].following[target]), 1) |
202 | return value && value[0] |
203 | } |
204 | |
205 | function checkChannel (lookup, ids, channel) { |
206 | channel = typeof channel === 'string' ? channel.replace(/\s/g, '') : null |
207 | if (channel) { |
208 | var value = mostRecentValue(ids.map(id => lookup[`${id}:channel`])) |
209 | return value && value[1] |
210 | } |
211 | } |
212 | |
213 | function mostRecentValue (values, timestampIndex = 0) { |
214 | var mostRecent = null |
215 | values.forEach(value => { |
216 | if (value && (!mostRecent || mostRecent[timestampIndex] < value[timestampIndex])) { |
217 | mostRecent = value |
218 | } |
219 | }) |
220 | return mostRecent |
221 | } |
222 |
Built with git-ssb-web