Files: 58ab0241031aa549a35cce1e678c27065ae66221 / lib / plugins / contacts.js
8302 bytesRaw
1 | const pull = require('pull-stream') |
2 | const PullPushAbort = require('../pull-push-abort') |
3 | const FlumeReduce = require('flumeview-reduce') |
4 | |
5 | const ref = require('ssb-ref') |
6 | |
7 | exports.manifest = { |
8 | replicateStream: 'source', |
9 | stateStream: 'source', |
10 | ignoreStream: 'source', |
11 | isFollowing: 'async', |
12 | isBlocking: 'async' |
13 | } |
14 | |
15 | exports.init = function (ssb) { |
16 | let values = {} |
17 | |
18 | const view = ssb._flumeUse('patchwork-contacts', FlumeReduce(0, function reduce (result, item) { |
19 | // used by the reducer view |
20 | if (!result) result = {} |
21 | if (item) { |
22 | for (const author in item) { |
23 | for (const contact in item[author]) { |
24 | if (!result[author]) result[author] = {} |
25 | result[author][contact] = item[author][contact] |
26 | } |
27 | } |
28 | } |
29 | |
30 | // always make sure values is the latest result |
31 | // hack around result being null on index initialize |
32 | values = result |
33 | return result |
34 | }, function map (msg) { |
35 | // used by the reducer view |
36 | if (msg.value && msg.value.content && msg.value.content.type === 'contact' && ref.isFeed(msg.value.content.contact)) { |
37 | return { |
38 | [msg.value.author]: { |
39 | [msg.value.content.contact]: getContactState(msg.value.content) |
40 | } |
41 | } |
42 | } |
43 | })) |
44 | |
45 | view.get((err, result) => { |
46 | if (!err && result) { |
47 | // initialize values |
48 | values = result |
49 | } |
50 | }) |
51 | |
52 | return { |
53 | // expose raw view to other plugins (not over rpc) |
54 | raw: view, |
55 | |
56 | isFollowing: function ({ source, dest }, cb) { |
57 | if (values && values[source]) { |
58 | cb(null, values[source][dest] === true) |
59 | } else { |
60 | view.get((err, graph) => { |
61 | if (err) return cb(err) |
62 | const following = graph && graph[source] && graph[source][dest] === true |
63 | cb(null, following) |
64 | }) |
65 | } |
66 | }, |
67 | |
68 | isBlocking: function ({ source, dest }, cb) { |
69 | if (values && values[source]) { |
70 | cb(null, values[source][dest] === false) |
71 | } else { |
72 | view.get((err, graph) => { |
73 | if (err) return cb(err) |
74 | const blocking = graph && graph[source] && graph[source][dest] === false |
75 | cb(null, blocking) |
76 | }) |
77 | } |
78 | }, |
79 | |
80 | /// return a list of everyone you have blocked privately |
81 | ignoreStream: function ({ live }) { |
82 | const stream = PullPushAbort() |
83 | |
84 | const result = {} |
85 | let sync = null |
86 | |
87 | pull( |
88 | ssb.query.read({ |
89 | query: [{ |
90 | $filter: { |
91 | value: { |
92 | author: ssb.id, |
93 | content: { |
94 | type: 'contact' |
95 | } |
96 | } |
97 | } |
98 | }], |
99 | private: true, |
100 | live |
101 | }), |
102 | stream.aborter, |
103 | pull.drain((msg) => { |
104 | if (msg.sync) { |
105 | sync = true |
106 | if (live) { |
107 | stream.push(result) |
108 | } |
109 | return |
110 | } |
111 | |
112 | const isPrivate = msg.value && msg.value.meta && msg.value.meta.private |
113 | const content = msg.value.content |
114 | |
115 | // if a non-private state has been set since last private, revert to null |
116 | // patchwork will always try to set a new ignore status after the public if it differs |
117 | // this is just to handle the case where the private state was set by another client |
118 | // (which will override ignore due to the way ssb-friends handles private blocks) |
119 | |
120 | const value = isPrivate ? !!content.blocking : null |
121 | |
122 | if (sync) { |
123 | stream.push({ [content.contact]: value }) |
124 | } else { |
125 | result[content.contact] = value |
126 | } |
127 | }, (err) => { |
128 | if (err) return stream.end(err) |
129 | if (stream.ended || sync) return |
130 | |
131 | if (!live) { |
132 | stream.push(result) |
133 | } |
134 | |
135 | stream.end() |
136 | }) |
137 | ) |
138 | |
139 | return stream |
140 | }, |
141 | |
142 | // return who a given contact publicly follows and blocks (or reverse) |
143 | stateStream: function ({ feedId, live = false, reverse = false }) { |
144 | const stream = PullPushAbort() |
145 | |
146 | const result = {} |
147 | let sync = null |
148 | |
149 | // stream reverse states if option specified |
150 | const queryStream = reverse ? ssb.backlinks.read({ |
151 | query: [{ |
152 | $filter: { |
153 | dest: feedId, |
154 | value: { |
155 | content: { |
156 | type: 'contact', |
157 | contact: feedId |
158 | } |
159 | } |
160 | } |
161 | }], |
162 | live |
163 | }) : ssb.query.read({ |
164 | query: [{ |
165 | $filter: { |
166 | value: { |
167 | author: feedId, |
168 | content: { |
169 | type: 'contact' |
170 | } |
171 | } |
172 | } |
173 | }], |
174 | live |
175 | }) |
176 | |
177 | pull( |
178 | queryStream, |
179 | stream.aborter, |
180 | pull.filter(msg => (msg.value && msg.value.content && msg.value.content.type) || msg.sync), |
181 | pull.drain((msg) => { |
182 | if (msg.sync) { |
183 | // send first reduced result when running in live mode |
184 | sync = true |
185 | if (live) stream.push(result) |
186 | return |
187 | } |
188 | |
189 | const isPrivate = msg.value && msg.value.meta && msg.value.meta.private && msg.value.author === ssb.id |
190 | |
191 | if (!isPrivate) { |
192 | const content = msg.value.content |
193 | const contact = reverse ? msg.value.author : content.contact |
194 | const value = getContactState(msg.value.content) |
195 | |
196 | if (sync) { |
197 | // send updated state in live mode |
198 | stream.push({ [contact]: value }) |
199 | } else { |
200 | result[contact] = value |
201 | } |
202 | } |
203 | }, (err) => { |
204 | if (err) return stream.end(err) |
205 | if (stream.ended || sync) return |
206 | |
207 | // send final result when not live |
208 | if (!live) stream.push(result) |
209 | stream.end() |
210 | }) |
211 | ) |
212 | |
213 | return stream |
214 | }, |
215 | |
216 | // get the reduced follows list starting at yourId (who to replicate, block) |
217 | replicateStream: function ({ throttle = 5000, live }) { |
218 | const stream = PullPushAbort() |
219 | |
220 | let lastResolvedValues = {} |
221 | |
222 | let timer = null |
223 | let queued = false |
224 | let sync = false |
225 | |
226 | const update = () => { |
227 | // clear queue |
228 | clearTimeout(timer) |
229 | queued = false |
230 | |
231 | // get latest replication state (merge together values) |
232 | const resolvedValues = resolveValues(values, ssb.id) |
233 | |
234 | // push changes since last update |
235 | stream.push(objectDiff(lastResolvedValues, resolvedValues)) |
236 | |
237 | // update internal de-dupe list |
238 | lastResolvedValues = resolvedValues |
239 | } |
240 | |
241 | pull( |
242 | view.stream({ live }), |
243 | stream.aborter, |
244 | pull.drain((msg) => { |
245 | if (stream.ended) return |
246 | |
247 | if (!sync) { |
248 | // we'll store the incoming values (they will be updated as the view updates so |
249 | // do not need to be manually patched) |
250 | sync = true |
251 | update() |
252 | |
253 | // if not live, we can close stream |
254 | if (!live) stream.end() |
255 | } else if (msg) { |
256 | if (!queued) { |
257 | queued = true |
258 | timer = setTimeout(update, throttle) |
259 | } |
260 | } |
261 | }, (err) => { |
262 | if (err) return stream.end(err) |
263 | }) |
264 | ) |
265 | |
266 | return stream |
267 | } |
268 | } |
269 | } |
270 | |
271 | function getContactState (content) { |
272 | return content.blocking |
273 | ? false |
274 | : content.following |
275 | ? true |
276 | : null |
277 | } |
278 | |
279 | function objectDiff (original, changed) { |
280 | const result = {} |
281 | const keys = new Set([...Object.keys(original), ...Object.keys(changed)]) |
282 | keys.forEach(key => { |
283 | if (original[key] !== changed[key]) { |
284 | result[key] = changed[key] |
285 | } |
286 | }) |
287 | return result |
288 | } |
289 | |
290 | function resolveValues (values, yourId) { |
291 | const result = {} |
292 | if (values[yourId]) { |
293 | for (const id in values[yourId]) { |
294 | if (values[yourId][id] === true) { |
295 | for (const contact in values[id]) { |
296 | // only apply block if someone doesn't already follow |
297 | if (values[id][contact] != null && result[contact] !== true) { |
298 | result[contact] = values[id][contact] |
299 | } |
300 | } |
301 | } |
302 | } |
303 | |
304 | // override with your own blocks/follows |
305 | for (const contact in values[yourId]) { |
306 | if (values[yourId][contact] != null) { |
307 | result[contact] = values[yourId][contact] |
308 | } |
309 | } |
310 | } |
311 | return result |
312 | } |
313 |
Built with git-ssb-web