git ssb

1+

Daan Patchwork / patchwork



Tree: 55fc93a9190c25f467ead205ab8d676b5191dbd4

Files: 55fc93a9190c25f467ead205ab8d676b5191dbd4 / lib / plugins / subscriptions2.js

1598 bytesRaw
1const normalizeChannel = require('ssb-ref').normalizeChannel
2const pull = require('pull-stream')
3
4exports.manifest = {
5 get: 'async'
6}
7
8exports.init = function (ssb) {
9 const cbs = {}
10 const caches = {}
11
12 return {
13 get: function ({ id }, cb) {
14 if (caches[id]) {
15 cb(null, caches[id])
16 } else {
17 // cache not loaded yet, queue
18 if (!cbs[id]) {
19 // first request, start loading
20 cbs[id] = [cb]
21 loadCache(id)
22 } else {
23 // subsequent request, add to queue
24 cbs[id].push(cb)
25 }
26 }
27 }
28 }
29
30 function update (msg, cache) {
31 cache[normalizeChannel(msg.value.content.channel)] = {
32 subscribed: msg.value.content.subscribed,
33 timestamp: msg.value.timestamp
34 }
35 }
36
37 function loadCache (id) {
38 const subscriptions = {}
39 pull(
40 ssb.query.read({
41 query: [{
42 $filter: {
43 value: {
44 author: id,
45 content: {
46 type: 'channel'
47 }
48 }
49 }
50 }, { $map: true }],
51 old: true,
52 live: true
53 }),
54 pull.drain(msg => {
55 if (msg.sync) {
56 caches[id] = subscriptions
57 const callbacks = cbs[id] || []
58 cbs[id] = null
59 callbacks.forEach(cb => {
60 cb(null, caches[id])
61 })
62 } else {
63 update(msg, subscriptions)
64 }
65 }, (err) => {
66 const callbacks = cbs[id] || []
67 cbs[id] = null
68 callbacks.forEach(cb => {
69 cb(err)
70 })
71 })
72 )
73 }
74}
75

Built with git-ssb-web