git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: b0afea5e68a6a31afda1eeb38830433a9beedd2b

Files: b0afea5e68a6a31afda1eeb38830433a9beedd2b / channel / obs / subscribed.js

2293 bytesRaw
1var pull = require('pull-stream')
2var computed = require('mutant/computed')
3var MutantPullReduce = require('mutant-pull-reduce')
4var nest = require('depnest')
5var ref = require('ssb-ref')
6
7var throttle = require('mutant/throttle')
8
9exports.needs = nest({
10 'sbot.pull.userFeed': 'first'
11})
12
13exports.gives = nest({
14 'channel.obs.subscribed': true,
15 'sbot.hook.publish': true
16})
17
18exports.create = function (api) {
19 var cache = {}
20 var reducers = {}
21
22 return nest({
23 'channel.obs.subscribed': subscribed,
24 'sbot.hook.publish': function (msg) {
25 if (isChannelSubscription(msg)) {
26 if (msg.value.content.channel && reducers[msg.value.author]) {
27 reducers[msg.value.author].push(msg)
28 }
29 }
30 }
31 })
32
33 function subscribed (userId) {
34 if (!ref.isFeed(userId)) throw new Error('a feed id must be specified')
35 if (cache[userId]) {
36 return cache[userId]
37 } else {
38 var stream = pull(
39 api.sbot.pull.userFeed({id: userId, live: true}),
40 pull.filter((msg) => {
41 return !msg.value || msg.value.content.type === 'channel'
42 })
43 )
44
45 var latestTimestamps = {}
46
47 var result = MutantPullReduce(stream, (result, msg) => {
48 var c = msg.value.content
49 if (c.type === 'channel' && typeof c.channel === 'string') {
50 var channel = c.channel.trim()
51 if (channel && msg.value.timestamp > (latestTimestamps[channel] || 0)) {
52 if (channel) {
53 if (typeof c.subscribed === 'boolean') {
54 latestTimestamps[channel] = msg.value.timestamp
55 if (c.subscribed) {
56 result.add(channel)
57 } else {
58 result.delete(channel)
59 }
60 }
61 }
62 }
63 }
64 return result
65 }, {
66 startValue: new Set(),
67 nextTick: true
68 })
69
70 reducers[userId] = result
71
72 var instance = throttle(result, 2000)
73 instance.sync = result.sync
74
75 instance.has = function (value) {
76 return computed(instance, x => x.has(value))
77 }
78
79 cache[userId] = instance
80 return instance
81 }
82 }
83}
84
85function isChannelSubscription (msg) {
86 return msg.value && msg.value.content && msg.value.content.type === 'channel'
87}
88

Built with git-ssb-web