git ssb

2+

mixmix / ticktack



Tree: 400d6307414c6b622a0408421a7c1881697c893e

Files: 400d6307414c6b622a0408421a7c1881697c893e / channel / obs / subscribed.js

2692 bytesRaw
1var pull = require('pull-stream')
2var { Dict, Value, computed, resolve } = require('mutant')
3var get = require('lodash/get')
4var MutantPullReduce = require('mutant-pull-reduce')
5var nest = require('depnest')
6var ref = require('ssb-ref')
7
8var throttle = require('mutant/throttle')
9
10exports.needs = nest({
11 'sbot.pull.stream': 'first'
12})
13
14exports.gives = nest({
15 'channel.obs.subscribed': true
16})
17
18exports.create = function (api) {
19 var cache = Dict()
20 cache.sync = Value(false)
21
22 var answerCache = {}
23
24 return nest({
25 'channel.obs.subscribed': subscribed,
26 })
27
28 function subscribed (userId) {
29 if (!cache.keys().length) startCache()
30
31 return getAnswer(userId)
32 }
33
34 function getAnswer (userId) {
35 if (!answerCache[userId]) {
36 answerCache[userId] = computed(cache, cache => {
37 const answer = Object.keys(cache)
38 .filter(channel => {
39 return cache[channel]
40 .map(entry => entry[0])
41 .includes(userId)
42 })
43
44 return new Set(answer)
45 // previous channel.obs.subscribed uses Sets ...
46 })
47 answerCache[userId].sync = cache.sync
48 }
49
50 return answerCache[userId]
51 }
52
53 function startCache () {
54 var initialReceived = false
55
56 pull(
57 api.sbot.pull.stream(sbot => {
58 return sbot.channel.stream({ live: true })
59 }),
60 pull.drain(val => {
61 if (val === null) {
62 return
63 }
64
65 if (!initialReceived) {
66 initialReceived = true
67 cache.set(val)
68 cache.sync.set(true)
69 return
70 }
71
72 if (val.sync === true) {
73 cache.sync.set(true)
74 return
75 }
76
77 // Object.assign seems to bee needed otherwise the cache.set hits some codition where the resolved value gets over-ridden
78 // before and set to {} right before the actual set happens!
79 var newCache = reduce(Object.assign({}, resolve(cache)), val)
80 cache.set(newCache)
81 })
82 )
83 }
84}
85
86
87// TODO - add feature to flumeview-reduce which gives you a copy of the friggen reducer
88function reduce (soFar, newSub) {
89 // process.stdout.write('c')
90 const { author, timestamp, channel, subscribed } = newSub
91
92 var channelSubs = get(soFar, [channel], [])
93 var current = channelSubs.find(entry => entry[0] === author)
94
95 // if current recorded statement was more recent than this 'newSub', ignore newSub
96 if (current && current[1] > timestamp) return soFar
97
98 // remove all subs entries for this author
99 channelSubs = channelSubs.filter(entry => entry[0] !== author)
100
101 if (subscribed) channelSubs.push([author, Number(new Date())])
102
103 soFar[channel] = channelSubs
104
105 return soFar
106}
107
108
109

Built with git-ssb-web