git ssb

1+

Daan Patchwork / patchwork



Tree: 58ab0241031aa549a35cce1e678c27065ae66221

Files: 58ab0241031aa549a35cce1e678c27065ae66221 / lib / plugins / contacts.js

8302 bytesRaw
1const pull = require('pull-stream')
2const PullPushAbort = require('../pull-push-abort')
3const FlumeReduce = require('flumeview-reduce')
4
5const ref = require('ssb-ref')
6
7exports.manifest = {
8 replicateStream: 'source',
9 stateStream: 'source',
10 ignoreStream: 'source',
11 isFollowing: 'async',
12 isBlocking: 'async'
13}
14
15exports.init = function (ssb) {
16 let values = {}
17
18 const view = ssb._flumeUse('patchwork-contacts', FlumeReduce(0, function reduce (result, item) {
19 // used by the reducer view
20 if (!result) result = {}
21 if (item) {
22 for (const author in item) {
23 for (const contact in item[author]) {
24 if (!result[author]) result[author] = {}
25 result[author][contact] = item[author][contact]
26 }
27 }
28 }
29
30 // always make sure values is the latest result
31 // hack around result being null on index initialize
32 values = result
33 return result
34 }, function map (msg) {
35 // used by the reducer view
36 if (msg.value && msg.value.content && msg.value.content.type === 'contact' && ref.isFeed(msg.value.content.contact)) {
37 return {
38 [msg.value.author]: {
39 [msg.value.content.contact]: getContactState(msg.value.content)
40 }
41 }
42 }
43 }))
44
45 view.get((err, result) => {
46 if (!err && result) {
47 // initialize values
48 values = result
49 }
50 })
51
52 return {
53 // expose raw view to other plugins (not over rpc)
54 raw: view,
55
56 isFollowing: function ({ source, dest }, cb) {
57 if (values && values[source]) {
58 cb(null, values[source][dest] === true)
59 } else {
60 view.get((err, graph) => {
61 if (err) return cb(err)
62 const following = graph && graph[source] && graph[source][dest] === true
63 cb(null, following)
64 })
65 }
66 },
67
68 isBlocking: function ({ source, dest }, cb) {
69 if (values && values[source]) {
70 cb(null, values[source][dest] === false)
71 } else {
72 view.get((err, graph) => {
73 if (err) return cb(err)
74 const blocking = graph && graph[source] && graph[source][dest] === false
75 cb(null, blocking)
76 })
77 }
78 },
79
80 /// return a list of everyone you have blocked privately
81 ignoreStream: function ({ live }) {
82 const stream = PullPushAbort()
83
84 const result = {}
85 let sync = null
86
87 pull(
88 ssb.query.read({
89 query: [{
90 $filter: {
91 value: {
92 author: ssb.id,
93 content: {
94 type: 'contact'
95 }
96 }
97 }
98 }],
99 private: true,
100 live
101 }),
102 stream.aborter,
103 pull.drain((msg) => {
104 if (msg.sync) {
105 sync = true
106 if (live) {
107 stream.push(result)
108 }
109 return
110 }
111
112 const isPrivate = msg.value && msg.value.meta && msg.value.meta.private
113 const content = msg.value.content
114
115 // if a non-private state has been set since last private, revert to null
116 // patchwork will always try to set a new ignore status after the public if it differs
117 // this is just to handle the case where the private state was set by another client
118 // (which will override ignore due to the way ssb-friends handles private blocks)
119
120 const value = isPrivate ? !!content.blocking : null
121
122 if (sync) {
123 stream.push({ [content.contact]: value })
124 } else {
125 result[content.contact] = value
126 }
127 }, (err) => {
128 if (err) return stream.end(err)
129 if (stream.ended || sync) return
130
131 if (!live) {
132 stream.push(result)
133 }
134
135 stream.end()
136 })
137 )
138
139 return stream
140 },
141
142 // return who a given contact publicly follows and blocks (or reverse)
143 stateStream: function ({ feedId, live = false, reverse = false }) {
144 const stream = PullPushAbort()
145
146 const result = {}
147 let sync = null
148
149 // stream reverse states if option specified
150 const queryStream = reverse ? ssb.backlinks.read({
151 query: [{
152 $filter: {
153 dest: feedId,
154 value: {
155 content: {
156 type: 'contact',
157 contact: feedId
158 }
159 }
160 }
161 }],
162 live
163 }) : ssb.query.read({
164 query: [{
165 $filter: {
166 value: {
167 author: feedId,
168 content: {
169 type: 'contact'
170 }
171 }
172 }
173 }],
174 live
175 })
176
177 pull(
178 queryStream,
179 stream.aborter,
180 pull.filter(msg => (msg.value && msg.value.content && msg.value.content.type) || msg.sync),
181 pull.drain((msg) => {
182 if (msg.sync) {
183 // send first reduced result when running in live mode
184 sync = true
185 if (live) stream.push(result)
186 return
187 }
188
189 const isPrivate = msg.value && msg.value.meta && msg.value.meta.private && msg.value.author === ssb.id
190
191 if (!isPrivate) {
192 const content = msg.value.content
193 const contact = reverse ? msg.value.author : content.contact
194 const value = getContactState(msg.value.content)
195
196 if (sync) {
197 // send updated state in live mode
198 stream.push({ [contact]: value })
199 } else {
200 result[contact] = value
201 }
202 }
203 }, (err) => {
204 if (err) return stream.end(err)
205 if (stream.ended || sync) return
206
207 // send final result when not live
208 if (!live) stream.push(result)
209 stream.end()
210 })
211 )
212
213 return stream
214 },
215
216 // get the reduced follows list starting at yourId (who to replicate, block)
217 replicateStream: function ({ throttle = 5000, live }) {
218 const stream = PullPushAbort()
219
220 let lastResolvedValues = {}
221
222 let timer = null
223 let queued = false
224 let sync = false
225
226 const update = () => {
227 // clear queue
228 clearTimeout(timer)
229 queued = false
230
231 // get latest replication state (merge together values)
232 const resolvedValues = resolveValues(values, ssb.id)
233
234 // push changes since last update
235 stream.push(objectDiff(lastResolvedValues, resolvedValues))
236
237 // update internal de-dupe list
238 lastResolvedValues = resolvedValues
239 }
240
241 pull(
242 view.stream({ live }),
243 stream.aborter,
244 pull.drain((msg) => {
245 if (stream.ended) return
246
247 if (!sync) {
248 // we'll store the incoming values (they will be updated as the view updates so
249 // do not need to be manually patched)
250 sync = true
251 update()
252
253 // if not live, we can close stream
254 if (!live) stream.end()
255 } else if (msg) {
256 if (!queued) {
257 queued = true
258 timer = setTimeout(update, throttle)
259 }
260 }
261 }, (err) => {
262 if (err) return stream.end(err)
263 })
264 )
265
266 return stream
267 }
268 }
269}
270
271function getContactState (content) {
272 return content.blocking
273 ? false
274 : content.following
275 ? true
276 : null
277}
278
279function objectDiff (original, changed) {
280 const result = {}
281 const keys = new Set([...Object.keys(original), ...Object.keys(changed)])
282 keys.forEach(key => {
283 if (original[key] !== changed[key]) {
284 result[key] = changed[key]
285 }
286 })
287 return result
288}
289
290function resolveValues (values, yourId) {
291 const result = {}
292 if (values[yourId]) {
293 for (const id in values[yourId]) {
294 if (values[yourId][id] === true) {
295 for (const contact in values[id]) {
296 // only apply block if someone doesn't already follow
297 if (values[id][contact] != null && result[contact] !== true) {
298 result[contact] = values[id][contact]
299 }
300 }
301 }
302 }
303
304 // override with your own blocks/follows
305 for (const contact in values[yourId]) {
306 if (values[yourId][contact] != null) {
307 result[contact] = values[yourId][contact]
308 }
309 }
310 }
311 return result
312}
313

Built with git-ssb-web