git ssb

1+

Daan Patchwork / patchwork



Tree: 486af00f56978c646ce40d45da2d63a8bb58f0dd

Files: 486af00f56978c646ce40d45da2d63a8bb58f0dd / lib / plugins / contacts.js

8418 bytesRaw
1const pull = require('pull-stream')
2const PullPushAbort = require('../pull-push-abort')
3const FlumeReduce = require('flumeview-reduce')
4
5const ref = require('ssb-ref')
6
7exports.manifest = {
8 replicateStream: 'source',
9 stateStream: 'source',
10 ignoreStream: 'source',
11 isFollowing: 'async',
12 isBlocking: 'async'
13}
14
15exports.init = function (ssb) {
16 let values = {}
17
18 const view = ssb._flumeUse('patchwork-contacts', FlumeReduce(0, function reduce (result, item) {
19 // used by the reducer view
20 if (!result) result = {}
21 if (item) {
22 for (const author in item) {
23 for (const contact in item[author]) {
24 if (!result[author]) result[author] = {}
25 result[author][contact] = item[author][contact]
26 }
27 }
28 }
29
30 // always make sure values is the latest result
31 // hack around result being null on index initialize
32 values = result
33 return result
34 }, function map (msg) {
35 // used by the reducer view
36 if (msg.value && msg.value.content && msg.value.content.type === 'contact' && ref.isFeed(msg.value.content.contact)) {
37 return {
38 [msg.value.author]: {
39 [msg.value.content.contact]: getContactState(msg.value.content)
40 }
41 }
42 }
43 }, null, null, 100 * 1000))
44
45 view.get((err, result) => {
46 if (!err && result) {
47 // initialize values
48 values = result
49 }
50 })
51
52 return {
53 // expose raw view to other plugins (not over rpc)
54 raw: view,
55
56 isFollowing: function ({ source, dest }, cb) {
57 if (values && values[source]) {
58 cb(null, values[source][dest] === true)
59 } else {
60 view.get((err, graph) => {
61 if (err) return cb(err)
62 const following = graph && graph[source] && graph[source][dest] === true
63 cb(null, following)
64 })
65 }
66 },
67
68 isBlocking: function ({ source, dest }, cb) {
69 if (values && values[source]) {
70 cb(null, values[source][dest] === false)
71 } else {
72 view.get((err, graph) => {
73 if (err) return cb(err)
74 const blocking = graph && graph[source] && graph[source][dest] === false
75 cb(null, blocking)
76 })
77 }
78 },
79
80 /// return a list of everyone you have blocked privately
81 ignoreStream: function ({ live }) {
82 const stream = PullPushAbort()
83
84 const result = {}
85 let sync = null
86
87 pull(
88 ssb.query.read({
89 query: [{
90 $filter: {
91 value: {
92 author: ssb.id,
93 content: {
94 type: 'contact'
95 }
96 }
97 }
98 }],
99 private: true,
100 live
101 }),
102 stream.aborter,
103 pull.drain((msg) => {
104 if (msg.sync) {
105 sync = true
106 if (live) {
107 stream.push(result)
108 }
109 return
110 }
111
112 const isPrivate = msg.value && msg.value.meta && msg.value.meta.private
113 const content = msg.value.content
114
115 // if a non-private state has been set since last private, revert to null
116 // patchwork will always try to set a new ignore status after the public if it differs
117 // this is just to handle the case where the private state was set by another client
118 // (which will override ignore due to the way ssb-friends handles private blocks)
119
120 const value = isPrivate ? !!content.blocking : null
121
122 if (sync) {
123 stream.push({ [content.contact]: value })
124 } else {
125 result[content.contact] = value
126 }
127 }, (err) => {
128 if (err) return stream.end(err)
129 if (stream.ended || sync) return
130
131 if (!live) {
132 stream.push(result)
133 }
134
135 stream.end()
136 })
137 )
138
139 return stream
140 },
141
142 // return who a given contact publicly follows and blocks (or reverse)
143 stateStream: function ({ feedId, live = false, reverse = false }) {
144 const stream = PullPushAbort()
145
146 const result = {}
147 let sync = null
148
149 // stream reverse states if option specified
150 const queryStream = reverse
151 ? ssb.backlinks.read({
152 query: [{
153 $filter: {
154 dest: feedId,
155 value: {
156 content: {
157 type: 'contact',
158 contact: feedId
159 }
160 }
161 }
162 }],
163 live
164 })
165 : ssb.query.read({
166 query: [{
167 $filter: {
168 value: {
169 author: feedId,
170 content: {
171 type: 'contact'
172 }
173 }
174 }
175 }],
176 live
177 })
178
179 pull(
180 queryStream,
181 stream.aborter,
182 pull.filter(msg => (msg.value && msg.value.content && msg.value.content.type) || msg.sync),
183 pull.drain((msg) => {
184 if (msg.sync) {
185 // send first reduced result when running in live mode
186 sync = true
187 if (live) stream.push(result)
188 return
189 }
190
191 const isPrivate = msg.value && msg.value.meta && msg.value.meta.private && msg.value.author === ssb.id
192
193 if (!isPrivate) {
194 const content = msg.value.content
195 const contact = reverse ? msg.value.author : content.contact
196 const value = getContactState(msg.value.content)
197
198 if (sync) {
199 // send updated state in live mode
200 stream.push({ [contact]: value })
201 } else {
202 result[contact] = value
203 }
204 }
205 }, (err) => {
206 if (err) return stream.end(err)
207 if (stream.ended || sync) return
208
209 // send final result when not live
210 if (!live) stream.push(result)
211 stream.end()
212 })
213 )
214
215 return stream
216 },
217
218 // get the reduced follows list starting at yourId (who to replicate, block)
219 replicateStream: function ({ throttle = 5000, live }) {
220 const stream = PullPushAbort()
221
222 let lastResolvedValues = {}
223
224 let timer = null
225 let queued = false
226 let sync = false
227
228 const update = () => {
229 // clear queue
230 clearTimeout(timer)
231 queued = false
232
233 // get latest replication state (merge together values)
234 const resolvedValues = resolveValues(values, ssb.id)
235
236 // push changes since last update
237 stream.push(objectDiff(lastResolvedValues, resolvedValues))
238
239 // update internal de-dupe list
240 lastResolvedValues = resolvedValues
241 }
242
243 pull(
244 view.stream({ live }),
245 stream.aborter,
246 pull.drain((msg) => {
247 if (stream.ended) return
248
249 if (!sync) {
250 // we'll store the incoming values (they will be updated as the view updates so
251 // do not need to be manually patched)
252 sync = true
253 update()
254
255 // if not live, we can close stream
256 if (!live) stream.end()
257 } else if (msg) {
258 if (!queued) {
259 queued = true
260 timer = setTimeout(update, throttle)
261 }
262 }
263 }, (err) => {
264 if (err) return stream.end(err)
265 })
266 )
267
268 return stream
269 }
270 }
271}
272
273function getContactState (content) {
274 return content.blocking
275 ? false
276 : content.following
277 ? true
278 : null
279}
280
281function objectDiff (original, changed) {
282 const result = {}
283 const keys = new Set([...Object.keys(original), ...Object.keys(changed)])
284 keys.forEach(key => {
285 if (original[key] !== changed[key]) {
286 result[key] = changed[key]
287 }
288 })
289 return result
290}
291
292function resolveValues (values, yourId) {
293 const result = {}
294 if (values[yourId]) {
295 for (const id in values[yourId]) {
296 if (values[yourId][id] === true) {
297 for (const contact in values[id]) {
298 // only apply block if someone doesn't already follow
299 if (values[id][contact] != null && result[contact] !== true) {
300 result[contact] = values[id][contact]
301 }
302 }
303 }
304 }
305
306 // override with your own blocks/follows
307 for (const contact in values[yourId]) {
308 if (values[yourId][contact] != null) {
309 result[contact] = values[yourId][contact]
310 }
311 }
312 }
313 return result
314}
315

Built with git-ssb-web