git ssb

7+

dinoworm ๐Ÿ› / patchcore



Tree: 781e1c19862d0d3327dd9a68e0c497d9f876cc0f

Files: 781e1c19862d0d3327dd9a68e0c497d9f876cc0f / channel / obs / subscribed.js

2351 bytesRaw
1var pull = require('pull-stream')
2var computed = require('mutant/computed')
3var MutantPullReduce = require('mutant-pull-reduce')
4var nest = require('depnest')
5var ref = require('ssb-ref')
6
7var throttle = require('mutant/throttle')
8
9exports.needs = nest({
10 'sbot.pull.userFeed': 'first',
11 'channel.sync.normalize': 'first'
12})
13
14exports.gives = nest({
15 'channel.obs.subscribed': true,
16 'sbot.hook.publish': true
17})
18
19exports.create = function (api) {
20 var cache = {}
21 var reducers = {}
22
23 return nest({
24 'channel.obs.subscribed': subscribed,
25 'sbot.hook.publish': function (msg) {
26 if (isChannelSubscription(msg)) {
27 if (msg.value.content.channel && reducers[msg.value.author]) {
28 reducers[msg.value.author].push(msg)
29 }
30 }
31 }
32 })
33
34 function subscribed (userId) {
35 if (!ref.isFeed(userId)) throw new Error('a feed id must be specified')
36 if (cache[userId]) {
37 return cache[userId]
38 } else {
39 var stream = pull(
40 api.sbot.pull.userFeed({id: userId, live: true}),
41 pull.filter((msg) => {
42 return !msg.value || msg.value.content.type === 'channel'
43 })
44 )
45
46 var latestTimestamps = {}
47
48 var result = MutantPullReduce(stream, (result, msg) => {
49 var c = msg.value.content
50 if (c.type === 'channel' && typeof c.channel === 'string') {
51 var channel = api.channel.sync.normalize(c.channel)
52 if (channel && msg.value.timestamp > (latestTimestamps[channel] || 0)) {
53 if (channel) {
54 if (typeof c.subscribed === 'boolean') {
55 latestTimestamps[channel] = msg.value.timestamp
56 if (c.subscribed) {
57 result.add(channel)
58 } else {
59 result.delete(channel)
60 }
61 }
62 }
63 }
64 }
65 return result
66 }, {
67 startValue: new Set(),
68 nextTick: true
69 })
70
71 reducers[userId] = result
72
73 var instance = throttle(result, 2000)
74 instance.sync = result.sync
75
76 instance.has = function (value) {
77 return computed(instance, x => x.has(value))
78 }
79
80 cache[userId] = instance
81 return instance
82 }
83 }
84}
85
86function isChannelSubscription (msg) {
87 return msg.value && msg.value.content && msg.value.content.type === 'channel'
88}
89

Built with git-ssb-web