Files: dcb68fb50cc8bee039bf3b455495c4057767384e / channel / obs / subscribed.js
2293 bytesRaw
1 | var pull = require('pull-stream') |
2 | var computed = require('mutant/computed') |
3 | var MutantPullReduce = require('mutant-pull-reduce') |
4 | var nest = require('depnest') |
5 | var ref = require('ssb-ref') |
6 | |
7 | var throttle = require('mutant/throttle') |
8 | |
9 | exports.needs = nest({ |
10 | 'sbot.pull.userFeed': 'first' |
11 | }) |
12 | |
13 | exports.gives = nest({ |
14 | 'channel.obs.subscribed': true, |
15 | 'sbot.hook.publish': true |
16 | }) |
17 | |
18 | exports.create = function (api) { |
19 | var cache = {} |
20 | var reducers = {} |
21 | |
22 | return nest({ |
23 | 'channel.obs.subscribed': subscribed, |
24 | 'sbot.hook.publish': function (msg) { |
25 | if (isChannelSubscription(msg)) { |
26 | if (msg.value.content.channel && reducers[msg.value.author]) { |
27 | reducers[msg.value.author].push(msg) |
28 | } |
29 | } |
30 | } |
31 | }) |
32 | |
33 | function subscribed (userId) { |
34 | if (!ref.isFeed(userId)) throw new Error('a feed id must be specified') |
35 | if (cache[userId]) { |
36 | return cache[userId] |
37 | } else { |
38 | var stream = pull( |
39 | api.sbot.pull.userFeed({id: userId, live: true}), |
40 | pull.filter((msg) => { |
41 | return !msg.value || msg.value.content.type === 'channel' |
42 | }) |
43 | ) |
44 | |
45 | var latestTimestamps = {} |
46 | |
47 | var result = MutantPullReduce(stream, (result, msg) => { |
48 | var c = msg.value.content |
49 | if (c.type === 'channel' && typeof c.channel === 'string') { |
50 | var channel = c.channel.trim() |
51 | if (channel && msg.value.timestamp > (latestTimestamps[channel] || 0)) { |
52 | if (channel) { |
53 | if (typeof c.subscribed === 'boolean') { |
54 | latestTimestamps[channel] = msg.value.timestamp |
55 | if (c.subscribed) { |
56 | result.add(channel) |
57 | } else { |
58 | result.delete(channel) |
59 | } |
60 | } |
61 | } |
62 | } |
63 | } |
64 | return result |
65 | }, { |
66 | startValue: new Set(), |
67 | nextTick: true |
68 | }) |
69 | |
70 | reducers[userId] = result |
71 | |
72 | var instance = throttle(result, 2000) |
73 | instance.sync = result.sync |
74 | |
75 | instance.has = function (value) { |
76 | return computed(instance, x => x.has(value)) |
77 | } |
78 | |
79 | cache[userId] = instance |
80 | return instance |
81 | } |
82 | } |
83 | } |
84 | |
85 | function isChannelSubscription (msg) { |
86 | return msg.value && msg.value.content && msg.value.content.type === 'channel' |
87 | } |
88 |
Built with git-ssb-web