git ssb

10+

Matt McKegg / patchwork



Tree: b7084484d5f38609520442861cc067351128dfa4

Files: b7084484d5f38609520442861cc067351128dfa4 / sbot / roots.js

7981 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var FlumeViewLevel = require('flumeview-level')
4var pullCat = require('pull-cat')
5var HLRU = require('hashlru')
6var extend = require('xtend')
7var normalizeChannel = require('../lib/normalize-channel')
8var Defer = require('pull-defer')
9
10// HACK: pull it out of patchcore
11var getRoot = require('patchcore/message/sync/root').create().message.sync.root
12
13module.exports = function (ssb, config) {
14 var create = FlumeViewLevel(1, function (msg, seq) {
15 var result = [
16 [msg.value.timestamp, getRoot(msg) || msg.key]
17 ]
18 return result
19 })
20
21 var index = ssb._flumeUse('patchwork-roots', create)
22
23 // cache mostly just to avoid reading the same roots over and over again
24 // not really big enough for multiple refresh cycles
25 var cache = HLRU(100)
26
27 return {
28 latest: function ({ids = [ssb.id]}) {
29 var stream = Defer.source()
30 getFilter((err, filter) => {
31 if (err) return stream.abort(err)
32 stream.resolve(pull(
33 index.read({old: false}),
34
35 // BUMP FILTER
36 pull.filter(item => {
37 if (filter && item.value && item.value) {
38 var filterResult = filter(ids, item.value)
39 if (filterResult) {
40 item.value.filterResult = filterResult
41 return true
42 }
43 }
44 }),
45
46 // LOOKUP AND ADD ROOTS
47 LookupRoots(),
48
49 // FILTER ROOTS
50 pull.filter(item => {
51 var root = item.root || item
52 var isPrivate = root.value && typeof root.value.content === 'string'
53
54 if (filter && root && root.value && !isPrivate) {
55 return checkReplyForcesDisplay(item) || filter(ids, root)
56 }
57 })
58 ))
59 })
60 return stream
61 },
62
63 read: function ({ids = [ssb.id], reverse, limit, lt, gt}) {
64 var opts = {reverse, old: true}
65
66 // handle markers passed in to lt / gt
67 if (lt && typeof lt.timestamp === 'number') lt = lt.timestamp
68 if (gt && typeof gt.timestamp === 'number') gt = gt.timestamp
69 if (typeof lt === 'number') opts.lt = [lt]
70 if (typeof gt === 'number') opts.gt = [gt]
71
72 var seen = new Set()
73 var included = new Set()
74 var marker = {marker: true, timestamp: null}
75
76 var stream = Defer.source()
77
78 getFilter((err, filter) => {
79 if (err) return stream.abort(err)
80 stream.resolve(pull(
81 // READ ROOTS INDEX
82 index.read(opts),
83
84 // BUMP FILTER
85 pull.filter(item => {
86 // keep track of latest timestamp
87 marker.timestamp = item.key[0]
88
89 if (filter && item.value && item.value) {
90 var filterResult = filter(ids, item.value)
91 if (filterResult) {
92 item.value.filterResult = filterResult
93 return true
94 }
95 }
96 }),
97
98 // LOOKUP AND ADD ROOTS
99 LookupRoots(),
100
101 // FILTER ROOTS
102 pull.filter(item => {
103 var root = item.root || item
104 var isPrivate = root.value && typeof root.value.content === 'string'
105
106 // skip this item if it has already been included
107 if (!included.has(root.key) && filter && root && root.value && !isPrivate) {
108 if (checkReplyForcesDisplay(item)) {
109 // include this item if it has matching tags or the author is you
110 included.add(root.key)
111 return true
112 } else if (!seen.has(root.key)) {
113 seen.add(root.key)
114 var result = filter(ids, root)
115 if (result) {
116 // include this item if we have not yet seen it and the root filter passes
117 included.add(root.key)
118 return true
119 }
120 }
121 }
122 }),
123
124 // MAP ROOT ITEMS
125 pull.map(item => {
126 var root = item.root || item
127 return root
128 })
129 ))
130 })
131
132 // TRUNCATE
133 if (typeof limit === 'number') {
134 var count = 0
135 return pullCat([
136 pull(
137 stream,
138 pull.take(limit),
139 pull.through(() => {
140 count += 1
141 })
142 ),
143
144 // send truncated marker for resuming search
145 pull(
146 pull.values([marker]),
147 pull.filter(() => count === limit)
148 )
149 ])
150 } else {
151 return stream
152 }
153 }
154 }
155
156 function getThruCache (key, cb) {
157 if (cache.has(key)) {
158 cb(null, cache.get(key))
159 } else {
160 ssb.get(key, (_, value) => {
161 var msg = {key, value}
162 if (msg.value) {
163 cache.set(key, msg)
164 }
165 cb(null, msg)
166 })
167 }
168 }
169
170 function getFilter (cb) {
171 // TODO: rewrite contacts stream
172 ssb.friends.get((err, friends) => {
173 if (err) return cb(err)
174 ssb.patchwork.getSubscriptions((err, subscriptions) => {
175 if (err) return cb(err)
176 cb(null, function (ids, msg) {
177 var type = msg.value.content.type
178 if (type === 'vote') return false // filter out likes
179 var matchesChannel = (type !== 'channel' && checkChannel(subscriptions, ids, msg.value.content.channel))
180 var matchingTags = getMatchingTags(subscriptions, ids, msg.value.content.mentions)
181 var isYours = ids.includes(msg.value.author)
182 var mentionsYou = getMentionsYou(ids, msg.value.content.mentions)
183 var following = checkFollowing(friends, ids, msg.value.author)
184 if (isYours || matchesChannel || matchingTags.length || following || mentionsYou) {
185 return {
186 matchingTags, matchesChannel, isYours, following, mentionsYou
187 }
188 }
189 })
190 })
191 })
192 }
193
194 function LookupRoots () {
195 return pull.asyncMap((item, cb) => {
196 var msg = item.value
197 var key = item.key[1]
198 if (key === msg.key) {
199 // already a root
200 return cb(null, msg)
201 }
202 getThruCache(key, (_, value) => {
203 cb(null, extend(msg, {
204 root: value
205 }))
206 })
207 })
208 }
209}
210
211function getMatchingTags (lookup, ids, mentions) {
212 if (Array.isArray(mentions)) {
213 return mentions.reduce((result, mention) => {
214 if (mention && typeof mention.link === 'string' && mention.link.startsWith('#')) {
215 if (checkChannel(lookup, ids, mention.link.slice(1))) {
216 result.push(normalizeChannel(mention.link.slice(1)))
217 }
218 }
219 return result
220 }, [])
221 }
222 return []
223}
224
225function getMentionsYou (ids, mentions) {
226 if (Array.isArray(mentions)) {
227 return mentions.some((mention) => {
228 if (mention && typeof mention.link === 'string') {
229 return ids.includes(mention.link)
230 }
231 })
232 }
233}
234
235function checkReplyForcesDisplay (item) {
236 var filterResult = item.filterResult || {}
237 var matchesTags = filterResult.matchingTags && !!filterResult.matchingTags.length
238 return matchesTags || filterResult.isYours || filterResult.mentionsYou
239}
240
241function checkFollowing (lookup, ids, target) {
242 // TODO: rewrite contacts index (for some reason the order is different)
243 if (!lookup) return false
244 // HACK: only lookup the first ID until a method is added to ssb-friends to
245 // correctly identify latest info
246 var value = ids.slice(0, 1).map(id => lookup[id] && lookup[id][target])
247 return value && value[0]
248}
249
250function checkChannel (lookup, ids, channel) {
251 if (!lookup) return false
252 channel = normalizeChannel(channel)
253 if (channel) {
254 var value = mostRecentValue(ids.map(id => lookup[`${id}:${channel}`]))
255 return value && value[1]
256 }
257}
258
259function mostRecentValue (values, timestampIndex = 0) {
260 var mostRecent = null
261 values.forEach(value => {
262 if (value && (!mostRecent || mostRecent[timestampIndex] < value[timestampIndex])) {
263 mostRecent = value
264 }
265 })
266 return mostRecent
267}
268

Built with git-ssb-web