git ssb

10+

Matt McKegg / patchwork



Tree: fe907a3f25ec3a3633a139910a155b8e0a1afa16

Files: fe907a3f25ec3a3633a139910a155b8e0a1afa16 / sbot / roots.js

6045 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var FlumeViewLevel = require('flumeview-level')
4var pullCat = require('pull-cat')
5var HLRU = require('hashlru')
6var extend = require('xtend')
7
8// HACK: pull it out of patchcore
9var getRoot = require('patchcore/message/sync/root').create().message.sync.root
10
11module.exports = function (ssb, config) {
12 var create = FlumeViewLevel(1, function (msg, seq) {
13 var result = [
14 [msg.value.timestamp, getRoot(msg) || msg.key]
15 ]
16 return result
17 })
18
19 var index = ssb._flumeUse('patchwork-roots', create)
20
21 // cache mostly just to avoid reading the same roots over and over again
22 // not really big enough for multiple refresh cycles
23 var cache = HLRU(100)
24
25 return {
26 latest: function ({ids = [ssb.id]}) {
27 var filter = null
28 return pull(
29 // READ INDEX
30 index.read({old: false}),
31
32 // LOAD FILTERS
33 pull.asyncMap((item, cb) => {
34 if (!filter) {
35 // pause stream until filters have loaded
36 getFilter((err, result) => {
37 if (err) return cb(err)
38 filter = result
39 cb(null, item)
40 })
41 } else {
42 cb(null, item)
43 }
44 }),
45
46 // BUMP FILTER
47 pull.filter(item => {
48 if (filter && item.value && item.value) {
49 return filter(ids, item.value)
50 }
51 }),
52
53 // LOOKUP ROOTS
54 pull.asyncMap((item, cb) => {
55 var msg = item.value
56 var key = item.key[1]
57 if (key === msg.key) {
58 // already a root
59 cb(null, msg)
60 }
61 getThruCache(key, (_, value) => {
62 cb(null, extend(msg, {
63 root: value
64 }))
65 })
66 }),
67
68 // FILTER
69 pull.filter(item => {
70 var root = item.root || item
71 if (filter && root && root.value && !getRoot(root)) {
72 return filter(ids, root)
73 }
74 })
75 )
76 },
77 read: function ({ids = [ssb.id], reverse, live, old, limit, lt, gt}) {
78 var opts = {reverse, live, old}
79
80 // handle markers passed in to lt / gt
81 if (lt && typeof lt.timestamp === 'number') lt = lt.timestamp
82 if (gt && typeof gt.timestamp === 'number') gt = gt.timestamp
83 if (typeof lt === 'number') opts.lt = [lt]
84 if (typeof gt === 'number') opts.gt = [gt]
85
86 var seen = new Set()
87 var marker = {marker: true, timestamp: null}
88 var filter = null
89
90 var stream = pull(
91
92 // READ ROOTS
93 index.read(opts),
94
95 // LOAD FILTERS
96 pull.asyncMap((item, cb) => {
97 if (!filter) {
98 // pause stream until filters have loaded
99 getFilter((err, result) => {
100 if (err) return cb(err)
101 filter = result
102 cb(null, item)
103 })
104 } else {
105 cb(null, item)
106 }
107 }),
108
109 // BUMP FILTER
110 pull.filter(item => {
111 if (filter && item.value && item.value.value) {
112 return filter(ids, item.value)
113 }
114 }),
115
116 // MAP ROOTS
117 pull.map(item => {
118 if (item.sync) return item
119 marker.timestamp = item.key[0]
120 return item.key[1]
121 }),
122
123 // UNIQUE
124 pull.filter(item => {
125 if (old === false) return true // don't filter live stream
126 if (item && item.sync) {
127 return true
128 } else if (typeof item === 'string') {
129 if (!seen.has(item)) {
130 seen.add(item)
131 return true
132 }
133 }
134 }),
135
136 // LOOKUP (with cache)
137 pull.asyncMap((item, cb) => {
138 if (item.sync) return cb(null, item)
139 var key = item
140 getThruCache(key, cb)
141 }),
142
143 // ROOT FILTER
144 pull.filter(msg => {
145 if (filter && msg.value && !getRoot(msg)) {
146 return filter(ids, msg)
147 }
148 })
149 )
150
151 // TRUNCATE
152 if (typeof limit === 'number') {
153 var count = 0
154 return pullCat([
155 pull(
156 stream,
157 pull.take(limit),
158 pull.through(() => {
159 count += 1
160 })
161 ),
162
163 // send truncated marker for resuming search
164 pull(
165 pull.values([marker]),
166 pull.filter(() => count === limit)
167 )
168 ])
169 } else {
170 return stream
171 }
172 }
173 }
174
175 function getThruCache (key, cb) {
176 if (cache.has(key)) {
177 cb(null, cache.get(key))
178 } else {
179 ssb.get(key, (_, value) => {
180 var msg = {key, value}
181 if (msg.value) {
182 cache.set(key, msg)
183 }
184 cb(null, msg)
185 })
186 }
187 }
188
189 function getFilter (cb) {
190 // TODO: rewrite contacts stream
191 ssb.contacts.get((err, contacts) => {
192 if (err) return cb(err)
193 ssb.patchwork.getSubscriptions((err, subscriptions) => {
194 if (err) return cb(err)
195 cb(null, function (ids, msg) {
196 return (
197 ids.includes(msg.value.author) ||
198 checkFollowing(contacts, ids, msg.value.author) ||
199 checkChannel(subscriptions, ids, msg.value.content.channel)
200 )
201 })
202 })
203 })
204 }
205}
206
207function checkFollowing (lookup, ids, target) {
208 // TODO: rewrite contacts index (for some reason the order is different)
209 var value = mostRecentValue(ids.map(id => lookup[id].following && lookup[id].following[target]), 1)
210 return value && value[0]
211}
212
213function checkChannel (lookup, ids, channel) {
214 channel = typeof channel === 'string' ? channel.replace(/\s/g, '') : null
215 if (channel) {
216 var value = mostRecentValue(ids.map(id => lookup[`${id}:channel`]))
217 return value && value[1]
218 }
219}
220
221function mostRecentValue (values, timestampIndex = 0) {
222 var mostRecent = null
223 values.forEach(value => {
224 if (value && (!mostRecent || mostRecent[timestampIndex] < value[timestampIndex])) {
225 mostRecent = value
226 }
227 })
228 return mostRecent
229}
230

Built with git-ssb-web