git ssb

0+

alanz / patchwork



forked from Matt McKegg / patchwork

Tree: bdee82e994ee3ce0e26e552ad2fba8393f32ce45

Files: bdee82e994ee3ce0e26e552ad2fba8393f32ce45 / sbot / roots.js

7885 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var FlumeViewLevel = require('flumeview-level')
4var pullCat = require('pull-cat')
5var HLRU = require('hashlru')
6var extend = require('xtend')
7var normalizeChannel = require('../lib/normalize-channel')
8var Defer = require('pull-defer')
9
10// HACK: pull it out of patchcore
11var getRoot = require('patchcore/message/sync/root').create().message.sync.root
12
13module.exports = function (ssb, config) {
14 var create = FlumeViewLevel(1, function (msg, seq) {
15 var result = [
16 [msg.value.timestamp, getRoot(msg) || msg.key]
17 ]
18 return result
19 })
20
21 var index = ssb._flumeUse('patchwork-roots', create)
22
23 // cache mostly just to avoid reading the same roots over and over again
24 // not really big enough for multiple refresh cycles
25 var cache = HLRU(100)
26
27 return {
28 latest: function ({ids = [ssb.id]}) {
29 var stream = Defer.source()
30 getFilter((err, filter) => {
31 if (err) return stream.abort(err)
32 stream.resolve(pull(
33 index.read({old: false}),
34
35 // BUMP FILTER
36 pull.filter(item => {
37 if (filter && item.value && item.value) {
38 var filterResult = filter(ids, item.value)
39 if (filterResult) {
40 item.value.filterResult = filterResult
41 return true
42 }
43 }
44 }),
45
46 // LOOKUP AND ADD ROOTS
47 LookupRoots(),
48
49 // FILTER ROOTS
50 pull.filter(item => {
51 var root = item.root || item
52 if (filter && root && root.value) {
53 return checkReplyForcesDisplay(item) || filter(ids, root)
54 }
55 })
56 ))
57 })
58 return stream
59 },
60
61 read: function ({ids = [ssb.id], reverse, limit, lt, gt}) {
62 var opts = {reverse, old: true}
63
64 // handle markers passed in to lt / gt
65 if (lt && typeof lt.timestamp === 'number') lt = lt.timestamp
66 if (gt && typeof gt.timestamp === 'number') gt = gt.timestamp
67 if (typeof lt === 'number') opts.lt = [lt]
68 if (typeof gt === 'number') opts.gt = [gt]
69
70 var seen = new Set()
71 var included = new Set()
72 var marker = {marker: true, timestamp: null}
73
74 var stream = Defer.source()
75
76 getFilter((err, filter) => {
77 if (err) return stream.abort(err)
78 stream.resolve(pull(
79 // READ ROOTS INDEX
80 index.read(opts),
81
82 // BUMP FILTER
83 pull.filter(item => {
84 // keep track of latest timestamp
85 marker.timestamp = item.key[0]
86
87 if (filter && item.value && item.value) {
88 var filterResult = filter(ids, item.value)
89 if (filterResult) {
90 item.value.filterResult = filterResult
91 return true
92 }
93 }
94 }),
95
96 // LOOKUP AND ADD ROOTS
97 LookupRoots(),
98
99 // FILTER ROOTS
100 pull.filter(item => {
101 var root = item.root || item
102 var isPrivate = root.value && typeof root.value.content === 'string'
103
104 // skip this item if it has already been included
105 if (!included.has(root.key) && filter && root && root.value && !isPrivate) {
106 if (checkReplyForcesDisplay(item)) {
107 // include this item if it has matching tags or the author is you
108 included.add(root.key)
109 return true
110 } else if (!seen.has(root.key)) {
111 seen.add(root.key)
112 var result = filter(ids, root)
113 if (result) {
114 // include this item if we have not yet seen it and the root filter passes
115 included.add(root.key)
116 return true
117 }
118 }
119 }
120 }),
121
122 // MAP ROOT ITEMS
123 pull.map(item => {
124 var root = item.root || item
125 return root
126 })
127 ))
128 })
129
130 // TRUNCATE
131 if (typeof limit === 'number') {
132 var count = 0
133 return pullCat([
134 pull(
135 stream,
136 pull.take(limit),
137 pull.through(() => {
138 count += 1
139 })
140 ),
141
142 // send truncated marker for resuming search
143 pull(
144 pull.values([marker]),
145 pull.filter(() => count === limit)
146 )
147 ])
148 } else {
149 return stream
150 }
151 }
152 }
153
154 function getThruCache (key, cb) {
155 if (cache.has(key)) {
156 cb(null, cache.get(key))
157 } else {
158 ssb.get(key, (_, value) => {
159 var msg = {key, value}
160 if (msg.value) {
161 cache.set(key, msg)
162 }
163 cb(null, msg)
164 })
165 }
166 }
167
168 function getFilter (cb) {
169 // TODO: rewrite contacts stream
170 ssb.friends.get((err, friends) => {
171 if (err) return cb(err)
172 ssb.patchwork.getSubscriptions((err, subscriptions) => {
173 if (err) return cb(err)
174 cb(null, function (ids, msg) {
175 var type = msg.value.content.type
176 if (type === 'vote') return false // filter out likes
177 var matchesChannel = (type !== 'channel' && checkChannel(subscriptions, ids, msg.value.content.channel))
178 var matchingTags = getMatchingTags(subscriptions, ids, msg.value.content.mentions)
179 var isYours = ids.includes(msg.value.author)
180 var mentionsYou = getMentionsYou(ids, msg.value.content.mentions)
181 var following = checkFollowing(friends, ids, msg.value.author)
182 if (isYours || matchesChannel || matchingTags.length || following || mentionsYou) {
183 return {
184 matchingTags, matchesChannel, isYours, following, mentionsYou
185 }
186 }
187 })
188 })
189 })
190 }
191
192 function LookupRoots () {
193 return pull.asyncMap((item, cb) => {
194 var msg = item.value
195 var key = item.key[1]
196 if (key === msg.key) {
197 // already a root
198 return cb(null, msg)
199 }
200 getThruCache(key, (_, value) => {
201 cb(null, extend(msg, {
202 root: value
203 }))
204 })
205 })
206 }
207}
208
209function getMatchingTags (lookup, ids, mentions) {
210 if (Array.isArray(mentions)) {
211 return mentions.reduce((result, mention) => {
212 if (mention && typeof mention.link === 'string' && mention.link.startsWith('#')) {
213 if (checkChannel(lookup, ids, mention.link.slice(1))) {
214 result.push(normalizeChannel(mention.link.slice(1)))
215 }
216 }
217 return result
218 }, [])
219 }
220 return []
221}
222
223function getMentionsYou (ids, mentions) {
224 if (Array.isArray(mentions)) {
225 return mentions.some((mention) => {
226 if (mention && typeof mention.link === 'string') {
227 return ids.includes(mention.link)
228 }
229 })
230 }
231}
232
233function checkReplyForcesDisplay (item) {
234 var filterResult = item.filterResult || {}
235 var matchesTags = filterResult.matchingTags && !!filterResult.matchingTags.length
236 return matchesTags || filterResult.isYours || filterResult.mentionsYou
237}
238
239function checkFollowing (lookup, ids, target) {
240 // TODO: rewrite contacts index (for some reason the order is different)
241 if (!lookup) return false
242 // HACK: only lookup the first ID until a method is added to ssb-friends to
243 // correctly identify latest info
244 var value = ids.slice(0, 1).map(id => lookup[id] && lookup[id][target])
245 return value && value[0]
246}
247
248function checkChannel (lookup, ids, channel) {
249 if (!lookup) return false
250 channel = normalizeChannel(channel)
251 if (channel) {
252 var value = mostRecentValue(ids.map(id => lookup[`${id}:${channel}`]))
253 return value && value[1]
254 }
255}
256
257function mostRecentValue (values, timestampIndex = 0) {
258 var mostRecent = null
259 values.forEach(value => {
260 if (value && (!mostRecent || mostRecent[timestampIndex] < value[timestampIndex])) {
261 mostRecent = value
262 }
263 })
264 return mostRecent
265}
266

Built with git-ssb-web