Files: 98233c821a36cfebd65825ed3c2c4e9474f43e6e / channel / obs / subscribed.js
2880 bytesRaw
1 | var pull = require('pull-stream') |
2 | var { Dict, Value, computed, resolve, onceTrue } = require('mutant') |
3 | var get = require('lodash/get') |
4 | var MutantPullReduce = require('mutant-pull-reduce') |
5 | var nest = require('depnest') |
6 | var ref = require('ssb-ref') |
7 | |
8 | var throttle = require('mutant/throttle') |
9 | |
10 | exports.needs = nest({ |
11 | 'sbot.pull.stream': 'first', |
12 | 'app.obs.pluginsOk': 'first', |
13 | }) |
14 | |
15 | exports.gives = nest({ |
16 | 'channel.obs.subscribed': true |
17 | }) |
18 | |
19 | exports.create = function (api) { |
20 | var cache = Dict() |
21 | cache.sync = Value(false) |
22 | |
23 | var answerCache = {} |
24 | |
25 | return nest({ |
26 | 'channel.obs.subscribed': subscribed, |
27 | }) |
28 | |
29 | function subscribed (userId) { |
30 | if (!cache.keys().length) startCache() |
31 | |
32 | return getAnswer(userId) |
33 | } |
34 | |
35 | function getAnswer (userId) { |
36 | if (!answerCache[userId]) { |
37 | answerCache[userId] = computed(cache, cache => { |
38 | const answer = Object.keys(cache) |
39 | .filter(channel => { |
40 | return cache[channel] |
41 | .map(entry => entry[0]) |
42 | .includes(userId) |
43 | }) |
44 | |
45 | return new Set(answer) |
46 | // previous channel.obs.subscribed uses Sets ... |
47 | }) |
48 | answerCache[userId].sync = cache.sync |
49 | } |
50 | |
51 | return answerCache[userId] |
52 | } |
53 | |
54 | function startCache () { |
55 | var initialReceived = false |
56 | |
57 | onceTrue(api.app.obs.pluginsOk(), startStream) |
58 | |
59 | function startStream (val) { |
60 | pull( |
61 | api.sbot.pull.stream(sbot => { |
62 | return sbot.channel.stream({ live: true }) |
63 | }), |
64 | pull.drain(val => { |
65 | if (val === null) { |
66 | return |
67 | } |
68 | |
69 | if (!initialReceived) { |
70 | initialReceived = true |
71 | cache.set(val) |
72 | cache.sync.set(true) |
73 | return |
74 | } |
75 | |
76 | if (val.sync === true) { |
77 | cache.sync.set(true) |
78 | return |
79 | } |
80 | |
81 | // Object.assign seems to bee needed otherwise the cache.set hits some codition where the resolved value gets over-ridden |
82 | // before and set to {} right before the actual set happens! |
83 | var newCache = reduce(Object.assign({}, resolve(cache)), val) |
84 | cache.set(newCache) |
85 | }) |
86 | ) |
87 | } |
88 | } |
89 | } |
90 | |
91 | |
92 | // TODO - add feature to flumeview-reduce which gives you a copy of the friggen reducer |
93 | function reduce (soFar, newSub) { |
94 | // process.stdout.write('c') |
95 | const { author, timestamp, channel, subscribed } = newSub |
96 | |
97 | var channelSubs = get(soFar, [channel], []) |
98 | var current = channelSubs.find(entry => entry[0] === author) |
99 | |
100 | // if current recorded statement was more recent than this 'newSub', ignore newSub |
101 | if (current && current[1] > timestamp) return soFar |
102 | |
103 | // remove all subs entries for this author |
104 | channelSubs = channelSubs.filter(entry => entry[0] !== author) |
105 | |
106 | if (subscribed) channelSubs.push([author, Number(new Date())]) |
107 | |
108 | soFar[channel] = channelSubs |
109 | |
110 | return soFar |
111 | } |
112 | |
113 | |
114 |
Built with git-ssb-web