Files: 0b68762e851dab73abc45fa60add0d96d3b5c7ac / channel / obs / subscribed.js
2351 bytesRaw
1 | var pull = require('pull-stream') |
2 | var computed = require('mutant/computed') |
3 | var MutantPullReduce = require('mutant-pull-reduce') |
4 | var nest = require('depnest') |
5 | var ref = require('ssb-ref') |
6 | |
7 | var throttle = require('mutant/throttle') |
8 | |
9 | exports.needs = nest({ |
10 | 'sbot.pull.userFeed': 'first', |
11 | 'channel.sync.normalize': 'first' |
12 | }) |
13 | |
14 | exports.gives = nest({ |
15 | 'channel.obs.subscribed': true, |
16 | 'sbot.hook.publish': true |
17 | }) |
18 | |
19 | exports.create = function (api) { |
20 | var cache = {} |
21 | var reducers = {} |
22 | |
23 | return nest({ |
24 | 'channel.obs.subscribed': subscribed, |
25 | 'sbot.hook.publish': function (msg) { |
26 | if (isChannelSubscription(msg)) { |
27 | if (msg.value.content.channel && reducers[msg.value.author]) { |
28 | reducers[msg.value.author].push(msg) |
29 | } |
30 | } |
31 | } |
32 | }) |
33 | |
34 | function subscribed (userId) { |
35 | if (!ref.isFeed(userId)) throw new Error('a feed id must be specified') |
36 | if (cache[userId]) { |
37 | return cache[userId] |
38 | } else { |
39 | var stream = pull( |
40 | api.sbot.pull.userFeed({id: userId, live: true}), |
41 | pull.filter((msg) => { |
42 | return !msg.value || msg.value.content.type === 'channel' |
43 | }) |
44 | ) |
45 | |
46 | var latestTimestamps = {} |
47 | |
48 | var result = MutantPullReduce(stream, (result, msg) => { |
49 | var c = msg.value.content |
50 | if (c.type === 'channel' && typeof c.channel === 'string') { |
51 | var channel = api.channel.sync.normalize(c.channel) |
52 | if (channel && msg.value.timestamp > (latestTimestamps[channel] || 0)) { |
53 | if (channel) { |
54 | if (typeof c.subscribed === 'boolean') { |
55 | latestTimestamps[channel] = msg.value.timestamp |
56 | if (c.subscribed) { |
57 | result.add(channel) |
58 | } else { |
59 | result.delete(channel) |
60 | } |
61 | } |
62 | } |
63 | } |
64 | } |
65 | return result |
66 | }, { |
67 | startValue: new Set(), |
68 | nextTick: true |
69 | }) |
70 | |
71 | reducers[userId] = result |
72 | |
73 | var instance = throttle(result, 2000) |
74 | instance.sync = result.sync |
75 | |
76 | instance.has = function (value) { |
77 | return computed(instance, x => x.has(value)) |
78 | } |
79 | |
80 | cache[userId] = instance |
81 | return instance |
82 | } |
83 | } |
84 | } |
85 | |
86 | function isChannelSubscription (msg) { |
87 | return msg.value && msg.value.content && msg.value.content.type === 'channel' |
88 | } |
89 |
Built with git-ssb-web