Files: 247e13706f3029b40ee63d2a19cb5fe7e6680356 / sbot / roots.js
8836 bytesRaw
1 | |
2 | var pull = require('pull-stream') |
3 | var FlumeViewLevel = require('flumeview-level') |
4 | var pullCat = require('pull-cat') |
5 | var HLRU = require('hashlru') |
6 | var extend = require('xtend') |
7 | var normalizeChannel = require('ssb-ref').normalizeChannel |
8 | var Defer = require('pull-defer') |
9 | |
10 | // HACK: pull it out of patchcore |
11 | var getRoot = require('patchcore/message/sync/root').create().message.sync.root |
12 | var getTimestamp = require('patchcore/message/sync/timestamp').create().message.sync.timestamp |
13 | |
14 | module.exports = function (ssb, config) { |
15 | var create = FlumeViewLevel(2, function (msg, seq) { |
16 | var result = [ |
17 | [getTimestamp(msg), getRoot(msg) || msg.key] |
18 | ] |
19 | return result |
20 | }) |
21 | |
22 | var index = ssb._flumeUse('patchwork-roots', create) |
23 | |
24 | // cache mostly just to avoid reading the same roots over and over again |
25 | // not really big enough for multiple refresh cycles |
26 | var cache = HLRU(100) |
27 | |
28 | return { |
29 | latest: function ({ids = [ssb.id], onlySubscribedChannels = false}) { |
30 | var stream = Defer.source() |
31 | getFilter((err, filter) => { |
32 | if (err) return stream.abort(err) |
33 | stream.resolve(pull( |
34 | index.read({old: false}), |
35 | |
36 | // BUMP FILTER |
37 | pull.filter(item => { |
38 | if (filter && item.value && item.value) { |
39 | var filterResult = filter(ids, item.value) |
40 | if (filterResult) { |
41 | item.value.filterResult = filterResult |
42 | return true |
43 | } |
44 | } |
45 | }), |
46 | |
47 | // LOOKUP AND ADD ROOTS |
48 | LookupRoots(), |
49 | |
50 | // FILTER ROOTS |
51 | pull.filter(item => { |
52 | var root = item.root || item |
53 | var isPrivate = root.value && typeof root.value.content === 'string' |
54 | |
55 | if (filter && root && root.value && !isPrivate) { |
56 | var filterResult = filter(ids, root) |
57 | if (checkReplyForcesDisplay(item) || shouldShow(filterResult, {onlySubscribedChannels})) { |
58 | root.filterResult = filterResult |
59 | return true |
60 | } |
61 | } |
62 | }) |
63 | )) |
64 | }) |
65 | return stream |
66 | }, |
67 | |
68 | read: function ({ids = [ssb.id], reverse, limit, lt, gt, onlySubscribedChannels = false}) { |
69 | var opts = {reverse, old: true} |
70 | |
71 | // handle markers passed in to lt / gt |
72 | if (lt && typeof lt.timestamp === 'number') lt = lt.timestamp |
73 | if (gt && typeof gt.timestamp === 'number') gt = gt.timestamp |
74 | if (typeof lt === 'number') opts.lt = [lt] |
75 | if (typeof gt === 'number') opts.gt = [gt] |
76 | |
77 | var seen = new Set() |
78 | var included = new Set() |
79 | var marker = {marker: true, timestamp: null} |
80 | |
81 | var stream = Defer.source() |
82 | |
83 | getFilter((err, filter) => { |
84 | if (err) return stream.abort(err) |
85 | stream.resolve(pull( |
86 | // READ ROOTS INDEX |
87 | index.read(opts), |
88 | |
89 | // BUMP FILTER |
90 | pull.filter(item => { |
91 | // keep track of latest timestamp |
92 | marker.timestamp = item.key[0] |
93 | |
94 | if (filter && item.value && item.value) { |
95 | var filterResult = filter(ids, item.value) |
96 | if (filterResult) { |
97 | item.value.filterResult = filterResult |
98 | return true |
99 | } |
100 | } |
101 | }), |
102 | |
103 | // LOOKUP AND ADD ROOTS |
104 | LookupRoots(), |
105 | |
106 | // FILTER ROOTS |
107 | pull.filter(item => { |
108 | var root = item.root || item |
109 | var isPrivate = root.value && typeof root.value.content === 'string' |
110 | |
111 | // skip this item if it has already been included |
112 | if (!included.has(root.key) && filter && root && root.value && !isPrivate) { |
113 | if (checkReplyForcesDisplay(item)) { // include this item if it has matching tags or the author is you |
114 | // update filter result so that we can display the correct bump message |
115 | root.filterResult = extend(item.filterResult, {forced: true}) |
116 | included.add(root.key) |
117 | return true |
118 | } else if (!seen.has(root.key)) { |
119 | seen.add(root.key) |
120 | var filterResult = filter(ids, root) |
121 | if (shouldShow(filterResult, {onlySubscribedChannels})) { |
122 | root.filterResult = filterResult |
123 | included.add(root.key) |
124 | return true |
125 | } |
126 | } |
127 | } |
128 | }), |
129 | |
130 | // MAP ROOT ITEMS |
131 | pull.map(item => { |
132 | var root = item.root || item |
133 | return root |
134 | }) |
135 | )) |
136 | }) |
137 | |
138 | // TRUNCATE |
139 | if (typeof limit === 'number') { |
140 | var count = 0 |
141 | return pullCat([ |
142 | pull( |
143 | stream, |
144 | pull.take(limit), |
145 | pull.through(() => { |
146 | count += 1 |
147 | }) |
148 | ), |
149 | |
150 | // send truncated marker for resuming search |
151 | pull( |
152 | pull.values([marker]), |
153 | pull.filter(() => count === limit) |
154 | ) |
155 | ]) |
156 | } else { |
157 | return stream |
158 | } |
159 | } |
160 | } |
161 | |
162 | function shouldShow (filterResult, {onlySubscribedChannels}) { |
163 | if (filterResult && onlySubscribedChannels && filterResult.hasChannel) { |
164 | return filterResult.matchesChannel || filterResult.matchingTags.length || filterResult.mentionsYou || filterResult.isYours |
165 | } else { |
166 | return !!filterResult |
167 | } |
168 | } |
169 | |
170 | function getThruCache (key, cb) { |
171 | if (cache.has(key)) { |
172 | cb(null, cache.get(key)) |
173 | } else { |
174 | ssb.get(key, (_, value) => { |
175 | var msg = {key, value} |
176 | if (msg.value) { |
177 | cache.set(key, msg) |
178 | } |
179 | cb(null, msg) |
180 | }) |
181 | } |
182 | } |
183 | |
184 | function getFilter (cb) { |
185 | // TODO: rewrite contacts stream |
186 | ssb.friends.get((err, friends) => { |
187 | if (err) return cb(err) |
188 | ssb.patchwork.getSubscriptions((err, subscriptions) => { |
189 | if (err) return cb(err) |
190 | cb(null, function (ids, msg) { |
191 | var type = msg.value.content.type |
192 | if (type === 'vote') return false // filter out likes |
193 | var hasChannel = !!msg.value.content.channel |
194 | var matchesChannel = (type !== 'channel' && checkChannel(subscriptions, ids, msg.value.content.channel)) |
195 | var matchingTags = getMatchingTags(subscriptions, ids, msg.value.content.mentions) |
196 | var isYours = ids.includes(msg.value.author) |
197 | var mentionsYou = getMentionsYou(ids, msg.value.content.mentions) |
198 | |
199 | var following = checkFollowing(friends, ids, msg.value.author) |
200 | if (isYours || matchesChannel || matchingTags.length || following || mentionsYou) { |
201 | return { |
202 | matchingTags, matchesChannel, isYours, following, mentionsYou, hasChannel |
203 | } |
204 | } |
205 | }) |
206 | }) |
207 | }) |
208 | } |
209 | |
210 | function LookupRoots () { |
211 | return pull.asyncMap((item, cb) => { |
212 | var msg = item.value |
213 | var key = item.key[1] |
214 | if (key === msg.key) { |
215 | // already a root |
216 | return cb(null, msg) |
217 | } |
218 | getThruCache(key, (_, value) => { |
219 | cb(null, extend(msg, { |
220 | root: value |
221 | })) |
222 | }) |
223 | }) |
224 | } |
225 | } |
226 | |
227 | function getMatchingTags (lookup, ids, mentions) { |
228 | if (Array.isArray(mentions)) { |
229 | return mentions.reduce((result, mention) => { |
230 | if (mention && typeof mention.link === 'string' && mention.link.startsWith('#')) { |
231 | if (checkChannel(lookup, ids, mention.link.slice(1))) { |
232 | result.push(normalizeChannel(mention.link.slice(1))) |
233 | } |
234 | } |
235 | return result |
236 | }, []) |
237 | } |
238 | return [] |
239 | } |
240 | |
241 | function getMentionsYou (ids, mentions) { |
242 | if (Array.isArray(mentions)) { |
243 | return mentions.some((mention) => { |
244 | if (mention && typeof mention.link === 'string') { |
245 | return ids.includes(mention.link) |
246 | } |
247 | }) |
248 | } |
249 | } |
250 | |
251 | function checkReplyForcesDisplay (item) { |
252 | var filterResult = item.filterResult || {} |
253 | var matchesTags = filterResult.matchingTags && !!filterResult.matchingTags.length |
254 | return matchesTags || filterResult.isYours |
255 | } |
256 | |
257 | function checkFollowing (lookup, ids, target) { |
258 | // TODO: rewrite contacts index (for some reason the order is different) |
259 | if (!lookup) return false |
260 | // HACK: only lookup the first ID until a method is added to ssb-friends to |
261 | // correctly identify latest info |
262 | var value = ids.slice(0, 1).map(id => lookup[id] && lookup[id][target]) |
263 | return value && value[0] |
264 | } |
265 | |
266 | function checkChannel (lookup, ids, channel) { |
267 | if (!lookup) return false |
268 | channel = normalizeChannel(channel) |
269 | if (channel) { |
270 | var value = mostRecentValue(ids.map(id => lookup[`${id}:${channel}`])) |
271 | return value && value[1] |
272 | } |
273 | } |
274 | |
275 | function mostRecentValue (values, timestampIndex = 0) { |
276 | var mostRecent = null |
277 | values.forEach(value => { |
278 | if (value && (!mostRecent || mostRecent[timestampIndex] < value[timestampIndex])) { |
279 | mostRecent = value |
280 | } |
281 | }) |
282 | return mostRecent |
283 | } |
284 |
Built with git-ssb-web