git ssb

2+

mixmix / ticktack



Tree: 98233c821a36cfebd65825ed3c2c4e9474f43e6e

Files: 98233c821a36cfebd65825ed3c2c4e9474f43e6e / channel / obs / subscribed.js

2880 bytesRaw
1var pull = require('pull-stream')
2var { Dict, Value, computed, resolve, onceTrue } = require('mutant')
3var get = require('lodash/get')
4var MutantPullReduce = require('mutant-pull-reduce')
5var nest = require('depnest')
6var ref = require('ssb-ref')
7
8var throttle = require('mutant/throttle')
9
10exports.needs = nest({
11 'sbot.pull.stream': 'first',
12 'app.obs.pluginsOk': 'first',
13})
14
15exports.gives = nest({
16 'channel.obs.subscribed': true
17})
18
19exports.create = function (api) {
20 var cache = Dict()
21 cache.sync = Value(false)
22
23 var answerCache = {}
24
25 return nest({
26 'channel.obs.subscribed': subscribed,
27 })
28
29 function subscribed (userId) {
30 if (!cache.keys().length) startCache()
31
32 return getAnswer(userId)
33 }
34
35 function getAnswer (userId) {
36 if (!answerCache[userId]) {
37 answerCache[userId] = computed(cache, cache => {
38 const answer = Object.keys(cache)
39 .filter(channel => {
40 return cache[channel]
41 .map(entry => entry[0])
42 .includes(userId)
43 })
44
45 return new Set(answer)
46 // previous channel.obs.subscribed uses Sets ...
47 })
48 answerCache[userId].sync = cache.sync
49 }
50
51 return answerCache[userId]
52 }
53
54 function startCache () {
55 var initialReceived = false
56
57 onceTrue(api.app.obs.pluginsOk(), startStream)
58
59 function startStream (val) {
60 pull(
61 api.sbot.pull.stream(sbot => {
62 return sbot.channel.stream({ live: true })
63 }),
64 pull.drain(val => {
65 if (val === null) {
66 return
67 }
68
69 if (!initialReceived) {
70 initialReceived = true
71 cache.set(val)
72 cache.sync.set(true)
73 return
74 }
75
76 if (val.sync === true) {
77 cache.sync.set(true)
78 return
79 }
80
81 // Object.assign seems to bee needed otherwise the cache.set hits some codition where the resolved value gets over-ridden
82 // before and set to {} right before the actual set happens!
83 var newCache = reduce(Object.assign({}, resolve(cache)), val)
84 cache.set(newCache)
85 })
86 )
87 }
88 }
89}
90
91
92// TODO - add feature to flumeview-reduce which gives you a copy of the friggen reducer
93function reduce (soFar, newSub) {
94 // process.stdout.write('c')
95 const { author, timestamp, channel, subscribed } = newSub
96
97 var channelSubs = get(soFar, [channel], [])
98 var current = channelSubs.find(entry => entry[0] === author)
99
100 // if current recorded statement was more recent than this 'newSub', ignore newSub
101 if (current && current[1] > timestamp) return soFar
102
103 // remove all subs entries for this author
104 channelSubs = channelSubs.filter(entry => entry[0] !== author)
105
106 if (subscribed) channelSubs.push([author, Number(new Date())])
107
108 soFar[channel] = channelSubs
109
110 return soFar
111}
112
113
114

Built with git-ssb-web