git ssb

10+

Matt McKegg / patchwork



Tree: 20cee95636c3adc788f738eff9c79fb6477c66d4

Files: 20cee95636c3adc788f738eff9c79fb6477c66d4 / sbot / roots.js

6133 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var FlumeViewLevel = require('flumeview-level')
4var pullCat = require('pull-cat')
5var HLRU = require('hashlru')
6var extend = require('xtend')
7var normalizeChannel = require('../lib/normalize-channel')
8
9// HACK: pull it out of patchcore
10var getRoot = require('patchcore/message/sync/root').create().message.sync.root
11
12module.exports = function (ssb, config) {
13 var create = FlumeViewLevel(1, function (msg, seq) {
14 var result = [
15 [msg.value.timestamp, getRoot(msg) || msg.key]
16 ]
17 return result
18 })
19
20 var index = ssb._flumeUse('patchwork-roots', create)
21
22 // cache mostly just to avoid reading the same roots over and over again
23 // not really big enough for multiple refresh cycles
24 var cache = HLRU(100)
25
26 return {
27 latest: function ({ids = [ssb.id]}) {
28 var filter = null
29 return pull(
30 // READ INDEX
31 index.read({old: false}),
32
33 // LOAD FILTERS
34 pull.asyncMap((item, cb) => {
35 if (!filter) {
36 // pause stream until filters have loaded
37 getFilter((err, result) => {
38 if (err) return cb(err)
39 filter = result
40 cb(null, item)
41 })
42 } else {
43 cb(null, item)
44 }
45 }),
46
47 // BUMP FILTER
48 pull.filter(item => {
49 if (filter && item.value && item.value) {
50 return filter(ids, item.value)
51 }
52 }),
53
54 // LOOKUP ROOTS
55 pull.asyncMap((item, cb) => {
56 var msg = item.value
57 var key = item.key[1]
58 if (key === msg.key) {
59 // already a root
60 cb(null, msg)
61 }
62 getThruCache(key, (_, value) => {
63 cb(null, extend(msg, {
64 root: value
65 }))
66 })
67 }),
68
69 // FILTER
70 pull.filter(item => {
71 var root = item.root || item
72 if (filter && root && root.value && !getRoot(root)) {
73 return filter(ids, root)
74 }
75 })
76 )
77 },
78 read: function ({ids = [ssb.id], reverse, live, old, limit, lt, gt}) {
79 var opts = {reverse, live, old}
80
81 // handle markers passed in to lt / gt
82 if (lt && typeof lt.timestamp === 'number') lt = lt.timestamp
83 if (gt && typeof gt.timestamp === 'number') gt = gt.timestamp
84 if (typeof lt === 'number') opts.lt = [lt]
85 if (typeof gt === 'number') opts.gt = [gt]
86
87 var seen = new Set()
88 var marker = {marker: true, timestamp: null}
89 var filter = null
90
91 var stream = pull(
92
93 // READ ROOTS
94 index.read(opts),
95
96 // LOAD FILTERS
97 pull.asyncMap((item, cb) => {
98 if (!filter) {
99 // pause stream until filters have loaded
100 getFilter((err, result) => {
101 if (err) return cb(err)
102 filter = result
103 cb(null, item)
104 })
105 } else {
106 cb(null, item)
107 }
108 }),
109
110 // BUMP FILTER
111 pull.filter(item => {
112 if (filter && item.value && item.value.value) {
113 return filter(ids, item.value)
114 }
115 }),
116
117 // MAP ROOTS
118 pull.map(item => {
119 if (item.sync) return item
120 marker.timestamp = item.key[0]
121 return item.key[1]
122 }),
123
124 // UNIQUE
125 pull.filter(item => {
126 if (old === false) return true // don't filter live stream
127 if (item && item.sync) {
128 return true
129 } else if (typeof item === 'string') {
130 if (!seen.has(item)) {
131 seen.add(item)
132 return true
133 }
134 }
135 }),
136
137 // LOOKUP (with cache)
138 pull.asyncMap((item, cb) => {
139 if (item.sync) return cb(null, item)
140 var key = item
141 getThruCache(key, cb)
142 }),
143
144 // ROOT FILTER
145 pull.filter(msg => {
146 if (filter && msg.value && !getRoot(msg)) {
147 return filter(ids, msg)
148 }
149 })
150 )
151
152 // TRUNCATE
153 if (typeof limit === 'number') {
154 var count = 0
155 return pullCat([
156 pull(
157 stream,
158 pull.take(limit),
159 pull.through(() => {
160 count += 1
161 })
162 ),
163
164 // send truncated marker for resuming search
165 pull(
166 pull.values([marker]),
167 pull.filter(() => count === limit)
168 )
169 ])
170 } else {
171 return stream
172 }
173 }
174 }
175
176 function getThruCache (key, cb) {
177 if (cache.has(key)) {
178 cb(null, cache.get(key))
179 } else {
180 ssb.get(key, (_, value) => {
181 var msg = {key, value}
182 if (msg.value) {
183 cache.set(key, msg)
184 }
185 cb(null, msg)
186 })
187 }
188 }
189
190 function getFilter (cb) {
191 // TODO: rewrite contacts stream
192 ssb.contacts.get((err, contacts) => {
193 if (err) return cb(err)
194 ssb.patchwork.getSubscriptions((err, subscriptions) => {
195 if (err) return cb(err)
196 cb(null, function (ids, msg) {
197 var type = msg.value.content.type
198 var matchesChannel = (type !== 'channel' && checkChannel(subscriptions, ids, msg.value.content.channel))
199 return ids.includes(msg.value.author) || matchesChannel || checkFollowing(contacts, ids, msg.value.author)
200 })
201 })
202 })
203 }
204}
205
206function checkFollowing (lookup, ids, target) {
207 // TODO: rewrite contacts index (for some reason the order is different)
208 var value = mostRecentValue(ids.map(id => lookup[id].following && lookup[id].following[target]), 1)
209 return value && value[0]
210}
211
212function checkChannel (lookup, ids, channel) {
213 channel = normalizeChannel(channel)
214 if (channel) {
215 var value = mostRecentValue(ids.map(id => lookup[`${id}:${channel}`]))
216 return value && value[1]
217 }
218}
219
220function mostRecentValue (values, timestampIndex = 0) {
221 var mostRecent = null
222 values.forEach(value => {
223 if (value && (!mostRecent || mostRecent[timestampIndex] < value[timestampIndex])) {
224 mostRecent = value
225 }
226 })
227 return mostRecent
228}
229

Built with git-ssb-web