git ssb

10+

Matt McKegg / patchwork



Tree: 973b79e581dbc98d1e008cb5cbdf69714b168342

Files: 973b79e581dbc98d1e008cb5cbdf69714b168342 / sbot / roots.js

8591 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var FlumeViewLevel = require('flumeview-level')
4var pullCat = require('pull-cat')
5var HLRU = require('hashlru')
6var extend = require('xtend')
7var normalizeChannel = require('ssb-ref').normalizeChannel
8var Defer = require('pull-defer')
9
10// HACK: pull it out of patchcore
11var getRoot = require('patchcore/message/sync/root').create().message.sync.root
12var getTimestamp = require('patchcore/message/sync/timestamp').create().message.sync.timestamp
13
14module.exports = function (ssb, config) {
15 var create = FlumeViewLevel(2, function (msg, seq) {
16 var result = [
17 [getTimestamp(msg), getRoot(msg) || msg.key]
18 ]
19 return result
20 })
21
22 var index = ssb._flumeUse('patchwork-roots', create)
23
24 // cache mostly just to avoid reading the same roots over and over again
25 // not really big enough for multiple refresh cycles
26 var cache = HLRU(100)
27
28 return {
29 latest: function ({ids = [ssb.id], onlySubscribedChannels = false}) {
30 var stream = Defer.source()
31 getFilter((err, filter) => {
32 if (err) return stream.abort(err)
33 stream.resolve(pull(
34 index.read({old: false}),
35
36 // BUMP FILTER
37 pull.filter(item => {
38 if (filter && item.value && item.value) {
39 var filterResult = filter(ids, item.value)
40 if (filterResult) {
41 item.value.filterResult = filterResult
42 return true
43 }
44 }
45 }),
46
47 // LOOKUP AND ADD ROOTS
48 LookupRoots(),
49
50 // FILTER ROOTS
51 pull.filter(item => {
52 var root = item.root || item
53 var isPrivate = root.value && typeof root.value.content === 'string'
54
55 if (filter && root && root.value && !isPrivate) {
56 var filterResult = filter(ids, root)
57 return checkReplyForcesDisplay(item) || shouldShow(filterResult, {onlySubscribedChannels})
58 }
59 })
60 ))
61 })
62 return stream
63 },
64
65 read: function ({ids = [ssb.id], reverse, limit, lt, gt, onlySubscribedChannels = false}) {
66 var opts = {reverse, old: true}
67
68 // handle markers passed in to lt / gt
69 if (lt && typeof lt.timestamp === 'number') lt = lt.timestamp
70 if (gt && typeof gt.timestamp === 'number') gt = gt.timestamp
71 if (typeof lt === 'number') opts.lt = [lt]
72 if (typeof gt === 'number') opts.gt = [gt]
73
74 var seen = new Set()
75 var included = new Set()
76 var marker = {marker: true, timestamp: null}
77
78 var stream = Defer.source()
79
80 getFilter((err, filter) => {
81 if (err) return stream.abort(err)
82 stream.resolve(pull(
83 // READ ROOTS INDEX
84 index.read(opts),
85
86 // BUMP FILTER
87 pull.filter(item => {
88 // keep track of latest timestamp
89 marker.timestamp = item.key[0]
90
91 if (filter && item.value && item.value) {
92 var filterResult = filter(ids, item.value)
93 if (filterResult) {
94 item.value.filterResult = filterResult
95 return true
96 }
97 }
98 }),
99
100 // LOOKUP AND ADD ROOTS
101 LookupRoots(),
102
103 // FILTER ROOTS
104 pull.filter(item => {
105 var root = item.root || item
106 var isPrivate = root.value && typeof root.value.content === 'string'
107
108 // skip this item if it has already been included
109 if (!included.has(root.key) && filter && root && root.value && !isPrivate) {
110 if (checkReplyForcesDisplay(item)) {
111 // include this item if it has matching tags or the author is you
112 included.add(root.key)
113 return true
114 } else if (!seen.has(root.key)) {
115 seen.add(root.key)
116 var filterResult = filter(ids, root)
117 if (shouldShow(filterResult, {onlySubscribedChannels})) {
118 included.add(root.key)
119 return true
120 }
121 }
122 }
123 }),
124
125 // MAP ROOT ITEMS
126 pull.map(item => {
127 var root = item.root || item
128 root.filterResult = item.filterResult
129 return root
130 })
131 ))
132 })
133
134 // TRUNCATE
135 if (typeof limit === 'number') {
136 var count = 0
137 return pullCat([
138 pull(
139 stream,
140 pull.take(limit),
141 pull.through(() => {
142 count += 1
143 })
144 ),
145
146 // send truncated marker for resuming search
147 pull(
148 pull.values([marker]),
149 pull.filter(() => count === limit)
150 )
151 ])
152 } else {
153 return stream
154 }
155 }
156 }
157
158 function shouldShow (filterResult, {onlySubscribedChannels}) {
159 if (filterResult && onlySubscribedChannels && filterResult.hasChannel) {
160 return filterResult.matchesChannel || filterResult.matchingTags.length || filterResult.mentionsYou || filterResult.isYours
161 } else {
162 return !!filterResult
163 }
164 }
165
166 function getThruCache (key, cb) {
167 if (cache.has(key)) {
168 cb(null, cache.get(key))
169 } else {
170 ssb.get(key, (_, value) => {
171 var msg = {key, value}
172 if (msg.value) {
173 cache.set(key, msg)
174 }
175 cb(null, msg)
176 })
177 }
178 }
179
180 function getFilter (cb) {
181 // TODO: rewrite contacts stream
182 ssb.friends.get((err, friends) => {
183 if (err) return cb(err)
184 ssb.patchwork.getSubscriptions((err, subscriptions) => {
185 if (err) return cb(err)
186 cb(null, function (ids, msg) {
187 var type = msg.value.content.type
188 if (type === 'vote') return false // filter out likes
189 var hasChannel = !!msg.value.content.channel
190 var matchesChannel = (type !== 'channel' && checkChannel(subscriptions, ids, msg.value.content.channel))
191 var matchingTags = getMatchingTags(subscriptions, ids, msg.value.content.mentions)
192 var isYours = ids.includes(msg.value.author)
193 var mentionsYou = getMentionsYou(ids, msg.value.content.mentions)
194 var following = checkFollowing(friends, ids, msg.value.author)
195 if (isYours || matchesChannel || matchingTags.length || following || mentionsYou) {
196 return {
197 matchingTags, matchesChannel, isYours, following, mentionsYou, hasChannel
198 }
199 }
200 })
201 })
202 })
203 }
204
205 function LookupRoots () {
206 return pull.asyncMap((item, cb) => {
207 var msg = item.value
208 var key = item.key[1]
209 if (key === msg.key) {
210 // already a root
211 return cb(null, msg)
212 }
213 getThruCache(key, (_, value) => {
214 cb(null, extend(msg, {
215 root: value
216 }))
217 })
218 })
219 }
220}
221
222function getMatchingTags (lookup, ids, mentions) {
223 if (Array.isArray(mentions)) {
224 return mentions.reduce((result, mention) => {
225 if (mention && typeof mention.link === 'string' && mention.link.startsWith('#')) {
226 if (checkChannel(lookup, ids, mention.link.slice(1))) {
227 result.push(normalizeChannel(mention.link.slice(1)))
228 }
229 }
230 return result
231 }, [])
232 }
233 return []
234}
235
236function getMentionsYou (ids, mentions) {
237 if (Array.isArray(mentions)) {
238 return mentions.some((mention) => {
239 if (mention && typeof mention.link === 'string') {
240 return ids.includes(mention.link)
241 }
242 })
243 }
244}
245
246function checkReplyForcesDisplay (item) {
247 var filterResult = item.filterResult || {}
248 var matchesTags = filterResult.matchingTags && !!filterResult.matchingTags.length
249 return matchesTags || filterResult.isYours
250}
251
252function checkFollowing (lookup, ids, target) {
253 // TODO: rewrite contacts index (for some reason the order is different)
254 if (!lookup) return false
255 // HACK: only lookup the first ID until a method is added to ssb-friends to
256 // correctly identify latest info
257 var value = ids.slice(0, 1).map(id => lookup[id] && lookup[id][target])
258 return value && value[0]
259}
260
261function checkChannel (lookup, ids, channel) {
262 if (!lookup) return false
263 channel = normalizeChannel(channel)
264 if (channel) {
265 var value = mostRecentValue(ids.map(id => lookup[`${id}:${channel}`]))
266 return value && value[1]
267 }
268}
269
270function mostRecentValue (values, timestampIndex = 0) {
271 var mostRecent = null
272 values.forEach(value => {
273 if (value && (!mostRecent || mostRecent[timestampIndex] < value[timestampIndex])) {
274 mostRecent = value
275 }
276 })
277 return mostRecent
278}
279

Built with git-ssb-web