git ssb

0+

alanz / patchwork



forked from Matt McKegg / patchwork

Tree: f06d537e3a5dcdce8c60eb9c129923924e3d7326

Files: f06d537e3a5dcdce8c60eb9c129923924e3d7326 / sbot / roots.js

8074 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var FlumeViewLevel = require('flumeview-level')
4var pullCat = require('pull-cat')
5var HLRU = require('hashlru')
6var extend = require('xtend')
7var normalizeChannel = require('ssb-ref').normalizeChannel
8var Defer = require('pull-defer')
9
10// HACK: pull it out of patchcore
11var getRoot = require('patchcore/message/sync/root').create().message.sync.root
12var getTimestamp = require('patchcore/message/sync/timestamp').create().message.sync.timestamp
13
14module.exports = function (ssb, config) {
15 var create = FlumeViewLevel(2, function (msg, seq) {
16 var result = [
17 [getTimestamp(msg), getRoot(msg) || msg.key]
18 ]
19 return result
20 })
21
22 var index = ssb._flumeUse('patchwork-roots', create)
23
24 // cache mostly just to avoid reading the same roots over and over again
25 // not really big enough for multiple refresh cycles
26 var cache = HLRU(100)
27
28 return {
29 latest: function ({ids = [ssb.id]}) {
30 var stream = Defer.source()
31 getFilter((err, filter) => {
32 if (err) return stream.abort(err)
33 stream.resolve(pull(
34 index.read({old: false}),
35
36 // BUMP FILTER
37 pull.filter(item => {
38 if (filter && item.value && item.value) {
39 var filterResult = filter(ids, item.value)
40 if (filterResult) {
41 item.value.filterResult = filterResult
42 return true
43 }
44 }
45 }),
46
47 // LOOKUP AND ADD ROOTS
48 LookupRoots(),
49
50 // FILTER ROOTS
51 pull.filter(item => {
52 var root = item.root || item
53 var isPrivate = root.value && typeof root.value.content === 'string'
54
55 if (filter && root && root.value && !isPrivate) {
56 return checkReplyForcesDisplay(item) || filter(ids, root)
57 }
58 })
59 ))
60 })
61 return stream
62 },
63
64 read: function ({ids = [ssb.id], reverse, limit, lt, gt}) {
65 var opts = {reverse, old: true}
66
67 // handle markers passed in to lt / gt
68 if (lt && typeof lt.timestamp === 'number') lt = lt.timestamp
69 if (gt && typeof gt.timestamp === 'number') gt = gt.timestamp
70 if (typeof lt === 'number') opts.lt = [lt]
71 if (typeof gt === 'number') opts.gt = [gt]
72
73 var seen = new Set()
74 var included = new Set()
75 var marker = {marker: true, timestamp: null}
76
77 var stream = Defer.source()
78
79 getFilter((err, filter) => {
80 if (err) return stream.abort(err)
81 stream.resolve(pull(
82 // READ ROOTS INDEX
83 index.read(opts),
84
85 // BUMP FILTER
86 pull.filter(item => {
87 // keep track of latest timestamp
88 marker.timestamp = item.key[0]
89
90 if (filter && item.value && item.value) {
91 var filterResult = filter(ids, item.value)
92 if (filterResult) {
93 item.value.filterResult = filterResult
94 return true
95 }
96 }
97 }),
98
99 // LOOKUP AND ADD ROOTS
100 LookupRoots(),
101
102 // FILTER ROOTS
103 pull.filter(item => {
104 var root = item.root || item
105 var isPrivate = root.value && typeof root.value.content === 'string'
106
107 // skip this item if it has already been included
108 if (!included.has(root.key) && filter && root && root.value && !isPrivate) {
109 if (checkReplyForcesDisplay(item)) {
110 // include this item if it has matching tags or the author is you
111 included.add(root.key)
112 return true
113 } else if (!seen.has(root.key)) {
114 seen.add(root.key)
115 var result = filter(ids, root)
116 if (result) {
117 // include this item if we have not yet seen it and the root filter passes
118 included.add(root.key)
119 return true
120 }
121 }
122 }
123 }),
124
125 // MAP ROOT ITEMS
126 pull.map(item => {
127 var root = item.root || item
128 return root
129 })
130 ))
131 })
132
133 // TRUNCATE
134 if (typeof limit === 'number') {
135 var count = 0
136 return pullCat([
137 pull(
138 stream,
139 pull.take(limit),
140 pull.through(() => {
141 count += 1
142 })
143 ),
144
145 // send truncated marker for resuming search
146 pull(
147 pull.values([marker]),
148 pull.filter(() => count === limit)
149 )
150 ])
151 } else {
152 return stream
153 }
154 }
155 }
156
157 function getThruCache (key, cb) {
158 if (cache.has(key)) {
159 cb(null, cache.get(key))
160 } else {
161 ssb.get(key, (_, value) => {
162 var msg = {key, value}
163 if (msg.value) {
164 cache.set(key, msg)
165 }
166 cb(null, msg)
167 })
168 }
169 }
170
171 function getFilter (cb) {
172 // TODO: rewrite contacts stream
173 ssb.friends.get((err, friends) => {
174 if (err) return cb(err)
175 ssb.patchwork.getSubscriptions((err, subscriptions) => {
176 if (err) return cb(err)
177 cb(null, function (ids, msg) {
178 var type = msg.value.content.type
179 if (type === 'vote') return false // filter out likes
180 var matchesChannel = (type !== 'channel' && checkChannel(subscriptions, ids, msg.value.content.channel))
181 var matchingTags = getMatchingTags(subscriptions, ids, msg.value.content.mentions)
182 var isYours = ids.includes(msg.value.author)
183 var mentionsYou = getMentionsYou(ids, msg.value.content.mentions)
184 var following = checkFollowing(friends, ids, msg.value.author)
185 if (isYours || matchesChannel || matchingTags.length || following || mentionsYou) {
186 return {
187 matchingTags, matchesChannel, isYours, following, mentionsYou
188 }
189 }
190 })
191 })
192 })
193 }
194
195 function LookupRoots () {
196 return pull.asyncMap((item, cb) => {
197 var msg = item.value
198 var key = item.key[1]
199 if (key === msg.key) {
200 // already a root
201 return cb(null, msg)
202 }
203 getThruCache(key, (_, value) => {
204 cb(null, extend(msg, {
205 root: value
206 }))
207 })
208 })
209 }
210}
211
212function getMatchingTags (lookup, ids, mentions) {
213 if (Array.isArray(mentions)) {
214 return mentions.reduce((result, mention) => {
215 if (mention && typeof mention.link === 'string' && mention.link.startsWith('#')) {
216 if (checkChannel(lookup, ids, mention.link.slice(1))) {
217 result.push(normalizeChannel(mention.link.slice(1)))
218 }
219 }
220 return result
221 }, [])
222 }
223 return []
224}
225
226function getMentionsYou (ids, mentions) {
227 if (Array.isArray(mentions)) {
228 return mentions.some((mention) => {
229 if (mention && typeof mention.link === 'string') {
230 return ids.includes(mention.link)
231 }
232 })
233 }
234}
235
236function checkReplyForcesDisplay (item) {
237 var filterResult = item.filterResult || {}
238 var matchesTags = filterResult.matchingTags && !!filterResult.matchingTags.length
239 return matchesTags || filterResult.isYours || filterResult.mentionsYou
240}
241
242function checkFollowing (lookup, ids, target) {
243 // TODO: rewrite contacts index (for some reason the order is different)
244 if (!lookup) return false
245 // HACK: only lookup the first ID until a method is added to ssb-friends to
246 // correctly identify latest info
247 var value = ids.slice(0, 1).map(id => lookup[id] && lookup[id][target])
248 return value && value[0]
249}
250
251function checkChannel (lookup, ids, channel) {
252 if (!lookup) return false
253 channel = normalizeChannel(channel)
254 if (channel) {
255 var value = mostRecentValue(ids.map(id => lookup[`${id}:${channel}`]))
256 return value && value[1]
257 }
258}
259
260function mostRecentValue (values, timestampIndex = 0) {
261 var mostRecent = null
262 values.forEach(value => {
263 if (value && (!mostRecent || mostRecent[timestampIndex] < value[timestampIndex])) {
264 mostRecent = value
265 }
266 })
267 return mostRecent
268}
269

Built with git-ssb-web