Files: 58ab0241031aa549a35cce1e678c27065ae66221 / lib / plugins / subscriptions2.js
1598 bytesRaw
1 | const normalizeChannel = require('ssb-ref').normalizeChannel |
2 | const pull = require('pull-stream') |
3 | |
4 | exports.manifest = { |
5 | get: 'async' |
6 | } |
7 | |
8 | exports.init = function (ssb) { |
9 | const cbs = {} |
10 | const caches = {} |
11 | |
12 | return { |
13 | get: function ({ id }, cb) { |
14 | if (caches[id]) { |
15 | cb(null, caches[id]) |
16 | } else { |
17 | // cache not loaded yet, queue |
18 | if (!cbs[id]) { |
19 | // first request, start loading |
20 | cbs[id] = [cb] |
21 | loadCache(id) |
22 | } else { |
23 | // subsequent request, add to queue |
24 | cbs[id].push(cb) |
25 | } |
26 | } |
27 | } |
28 | } |
29 | |
30 | function update (msg, cache) { |
31 | cache[normalizeChannel(msg.value.content.channel)] = { |
32 | subscribed: msg.value.content.subscribed, |
33 | timestamp: msg.value.timestamp |
34 | } |
35 | } |
36 | |
37 | function loadCache (id) { |
38 | const subscriptions = {} |
39 | pull( |
40 | ssb.query.read({ |
41 | query: [{ |
42 | $filter: { |
43 | value: { |
44 | author: id, |
45 | content: { |
46 | type: 'channel' |
47 | } |
48 | } |
49 | } |
50 | }, { $map: true }], |
51 | old: true, |
52 | live: true |
53 | }), |
54 | pull.drain(msg => { |
55 | if (msg.sync) { |
56 | caches[id] = subscriptions |
57 | const callbacks = cbs[id] || [] |
58 | cbs[id] = null |
59 | callbacks.forEach(cb => { |
60 | cb(null, caches[id]) |
61 | }) |
62 | } else { |
63 | update(msg, subscriptions) |
64 | } |
65 | }, (err) => { |
66 | const callbacks = cbs[id] || [] |
67 | cbs[id] = null |
68 | callbacks.forEach(cb => { |
69 | cb(err) |
70 | }) |
71 | }) |
72 | ) |
73 | } |
74 | } |
75 |
Built with git-ssb-web