Files: 3451510316992d414ec76ba5b29681fe359b7428 / lib / plugins / contacts.js
8418 bytesRaw
1 | const pull = require('pull-stream') |
2 | const PullPushAbort = require('../pull-push-abort') |
3 | const FlumeReduce = require('flumeview-reduce') |
4 | |
5 | const ref = require('ssb-ref') |
6 | |
7 | exports.manifest = { |
8 | replicateStream: 'source', |
9 | stateStream: 'source', |
10 | ignoreStream: 'source', |
11 | isFollowing: 'async', |
12 | isBlocking: 'async' |
13 | } |
14 | |
15 | exports.init = function (ssb) { |
16 | let values = {} |
17 | |
18 | const view = ssb._flumeUse('patchwork-contacts', FlumeReduce(0, function reduce (result, item) { |
19 | // used by the reducer view |
20 | if (!result) result = {} |
21 | if (item) { |
22 | for (const author in item) { |
23 | for (const contact in item[author]) { |
24 | if (!result[author]) result[author] = {} |
25 | result[author][contact] = item[author][contact] |
26 | } |
27 | } |
28 | } |
29 | |
30 | // always make sure values is the latest result |
31 | // hack around result being null on index initialize |
32 | values = result |
33 | return result |
34 | }, function map (msg) { |
35 | // used by the reducer view |
36 | if (msg.value && msg.value.content && msg.value.content.type === 'contact' && ref.isFeed(msg.value.content.contact)) { |
37 | return { |
38 | [msg.value.author]: { |
39 | [msg.value.content.contact]: getContactState(msg.value.content) |
40 | } |
41 | } |
42 | } |
43 | }, null, null, 100 * 1000)) |
44 | |
45 | view.get((err, result) => { |
46 | if (!err && result) { |
47 | // initialize values |
48 | values = result |
49 | } |
50 | }) |
51 | |
52 | return { |
53 | // expose raw view to other plugins (not over rpc) |
54 | raw: view, |
55 | |
56 | isFollowing: function ({ source, dest }, cb) { |
57 | if (values && values[source]) { |
58 | cb(null, values[source][dest] === true) |
59 | } else { |
60 | view.get((err, graph) => { |
61 | if (err) return cb(err) |
62 | const following = graph && graph[source] && graph[source][dest] === true |
63 | cb(null, following) |
64 | }) |
65 | } |
66 | }, |
67 | |
68 | isBlocking: function ({ source, dest }, cb) { |
69 | if (values && values[source]) { |
70 | cb(null, values[source][dest] === false) |
71 | } else { |
72 | view.get((err, graph) => { |
73 | if (err) return cb(err) |
74 | const blocking = graph && graph[source] && graph[source][dest] === false |
75 | cb(null, blocking) |
76 | }) |
77 | } |
78 | }, |
79 | |
80 | /// return a list of everyone you have blocked privately |
81 | ignoreStream: function ({ live }) { |
82 | const stream = PullPushAbort() |
83 | |
84 | const result = {} |
85 | let sync = null |
86 | |
87 | pull( |
88 | ssb.query.read({ |
89 | query: [{ |
90 | $filter: { |
91 | value: { |
92 | author: ssb.id, |
93 | content: { |
94 | type: 'contact' |
95 | } |
96 | } |
97 | } |
98 | }], |
99 | private: true, |
100 | live |
101 | }), |
102 | stream.aborter, |
103 | pull.drain((msg) => { |
104 | if (msg.sync) { |
105 | sync = true |
106 | if (live) { |
107 | stream.push(result) |
108 | } |
109 | return |
110 | } |
111 | |
112 | const isPrivate = msg.value && msg.value.meta && msg.value.meta.private |
113 | const content = msg.value.content |
114 | |
115 | // if a non-private state has been set since last private, revert to null |
116 | // patchwork will always try to set a new ignore status after the public if it differs |
117 | // this is just to handle the case where the private state was set by another client |
118 | // (which will override ignore due to the way ssb-friends handles private blocks) |
119 | |
120 | const value = isPrivate ? !!content.blocking : null |
121 | |
122 | if (sync) { |
123 | stream.push({ [content.contact]: value }) |
124 | } else { |
125 | result[content.contact] = value |
126 | } |
127 | }, (err) => { |
128 | if (err) return stream.end(err) |
129 | if (stream.ended || sync) return |
130 | |
131 | if (!live) { |
132 | stream.push(result) |
133 | } |
134 | |
135 | stream.end() |
136 | }) |
137 | ) |
138 | |
139 | return stream |
140 | }, |
141 | |
142 | // return who a given contact publicly follows and blocks (or reverse) |
143 | stateStream: function ({ feedId, live = false, reverse = false }) { |
144 | const stream = PullPushAbort() |
145 | |
146 | const result = {} |
147 | let sync = null |
148 | |
149 | // stream reverse states if option specified |
150 | const queryStream = reverse |
151 | ? ssb.backlinks.read({ |
152 | query: [{ |
153 | $filter: { |
154 | dest: feedId, |
155 | value: { |
156 | content: { |
157 | type: 'contact', |
158 | contact: feedId |
159 | } |
160 | } |
161 | } |
162 | }], |
163 | live |
164 | }) |
165 | : ssb.query.read({ |
166 | query: [{ |
167 | $filter: { |
168 | value: { |
169 | author: feedId, |
170 | content: { |
171 | type: 'contact' |
172 | } |
173 | } |
174 | } |
175 | }], |
176 | live |
177 | }) |
178 | |
179 | pull( |
180 | queryStream, |
181 | stream.aborter, |
182 | pull.filter(msg => (msg.value && msg.value.content && msg.value.content.type) || msg.sync), |
183 | pull.drain((msg) => { |
184 | if (msg.sync) { |
185 | // send first reduced result when running in live mode |
186 | sync = true |
187 | if (live) stream.push(result) |
188 | return |
189 | } |
190 | |
191 | const isPrivate = msg.value && msg.value.meta && msg.value.meta.private && msg.value.author === ssb.id |
192 | |
193 | if (!isPrivate) { |
194 | const content = msg.value.content |
195 | const contact = reverse ? msg.value.author : content.contact |
196 | const value = getContactState(msg.value.content) |
197 | |
198 | if (sync) { |
199 | // send updated state in live mode |
200 | stream.push({ [contact]: value }) |
201 | } else { |
202 | result[contact] = value |
203 | } |
204 | } |
205 | }, (err) => { |
206 | if (err) return stream.end(err) |
207 | if (stream.ended || sync) return |
208 | |
209 | // send final result when not live |
210 | if (!live) stream.push(result) |
211 | stream.end() |
212 | }) |
213 | ) |
214 | |
215 | return stream |
216 | }, |
217 | |
218 | // get the reduced follows list starting at yourId (who to replicate, block) |
219 | replicateStream: function ({ throttle = 5000, live }) { |
220 | const stream = PullPushAbort() |
221 | |
222 | let lastResolvedValues = {} |
223 | |
224 | let timer = null |
225 | let queued = false |
226 | let sync = false |
227 | |
228 | const update = () => { |
229 | // clear queue |
230 | clearTimeout(timer) |
231 | queued = false |
232 | |
233 | // get latest replication state (merge together values) |
234 | const resolvedValues = resolveValues(values, ssb.id) |
235 | |
236 | // push changes since last update |
237 | stream.push(objectDiff(lastResolvedValues, resolvedValues)) |
238 | |
239 | // update internal de-dupe list |
240 | lastResolvedValues = resolvedValues |
241 | } |
242 | |
243 | pull( |
244 | view.stream({ live }), |
245 | stream.aborter, |
246 | pull.drain((msg) => { |
247 | if (stream.ended) return |
248 | |
249 | if (!sync) { |
250 | // we'll store the incoming values (they will be updated as the view updates so |
251 | // do not need to be manually patched) |
252 | sync = true |
253 | update() |
254 | |
255 | // if not live, we can close stream |
256 | if (!live) stream.end() |
257 | } else if (msg) { |
258 | if (!queued) { |
259 | queued = true |
260 | timer = setTimeout(update, throttle) |
261 | } |
262 | } |
263 | }, (err) => { |
264 | if (err) return stream.end(err) |
265 | }) |
266 | ) |
267 | |
268 | return stream |
269 | } |
270 | } |
271 | } |
272 | |
273 | function getContactState (content) { |
274 | return content.blocking |
275 | ? false |
276 | : content.following |
277 | ? true |
278 | : null |
279 | } |
280 | |
281 | function objectDiff (original, changed) { |
282 | const result = {} |
283 | const keys = new Set([...Object.keys(original), ...Object.keys(changed)]) |
284 | keys.forEach(key => { |
285 | if (original[key] !== changed[key]) { |
286 | result[key] = changed[key] |
287 | } |
288 | }) |
289 | return result |
290 | } |
291 | |
292 | function resolveValues (values, yourId) { |
293 | const result = {} |
294 | if (values[yourId]) { |
295 | for (const id in values[yourId]) { |
296 | if (values[yourId][id] === true) { |
297 | for (const contact in values[id]) { |
298 | // only apply block if someone doesn't already follow |
299 | if (values[id][contact] != null && result[contact] !== true) { |
300 | result[contact] = values[id][contact] |
301 | } |
302 | } |
303 | } |
304 | } |
305 | |
306 | // override with your own blocks/follows |
307 | for (const contact in values[yourId]) { |
308 | if (values[yourId][contact] != null) { |
309 | result[contact] = values[yourId][contact] |
310 | } |
311 | } |
312 | } |
313 | return result |
314 | } |
315 |
Built with git-ssb-web