Files: 400d6307414c6b622a0408421a7c1881697c893e / channel / obs / subscribed.js
2692 bytesRaw
1 | var pull = require('pull-stream') |
2 | var { Dict, Value, computed, resolve } = require('mutant') |
3 | var get = require('lodash/get') |
4 | var MutantPullReduce = require('mutant-pull-reduce') |
5 | var nest = require('depnest') |
6 | var ref = require('ssb-ref') |
7 | |
8 | var throttle = require('mutant/throttle') |
9 | |
10 | exports.needs = nest({ |
11 | 'sbot.pull.stream': 'first' |
12 | }) |
13 | |
14 | exports.gives = nest({ |
15 | 'channel.obs.subscribed': true |
16 | }) |
17 | |
18 | exports.create = function (api) { |
19 | var cache = Dict() |
20 | cache.sync = Value(false) |
21 | |
22 | var answerCache = {} |
23 | |
24 | return nest({ |
25 | 'channel.obs.subscribed': subscribed, |
26 | }) |
27 | |
28 | function subscribed (userId) { |
29 | if (!cache.keys().length) startCache() |
30 | |
31 | return getAnswer(userId) |
32 | } |
33 | |
34 | function getAnswer (userId) { |
35 | if (!answerCache[userId]) { |
36 | answerCache[userId] = computed(cache, cache => { |
37 | const answer = Object.keys(cache) |
38 | .filter(channel => { |
39 | return cache[channel] |
40 | .map(entry => entry[0]) |
41 | .includes(userId) |
42 | }) |
43 | |
44 | return new Set(answer) |
45 | // previous channel.obs.subscribed uses Sets ... |
46 | }) |
47 | answerCache[userId].sync = cache.sync |
48 | } |
49 | |
50 | return answerCache[userId] |
51 | } |
52 | |
53 | function startCache () { |
54 | var initialReceived = false |
55 | |
56 | pull( |
57 | api.sbot.pull.stream(sbot => { |
58 | return sbot.channel.stream({ live: true }) |
59 | }), |
60 | pull.drain(val => { |
61 | if (val === null) { |
62 | return |
63 | } |
64 | |
65 | if (!initialReceived) { |
66 | initialReceived = true |
67 | cache.set(val) |
68 | cache.sync.set(true) |
69 | return |
70 | } |
71 | |
72 | if (val.sync === true) { |
73 | cache.sync.set(true) |
74 | return |
75 | } |
76 | |
77 | // Object.assign seems to bee needed otherwise the cache.set hits some codition where the resolved value gets over-ridden |
78 | // before and set to {} right before the actual set happens! |
79 | var newCache = reduce(Object.assign({}, resolve(cache)), val) |
80 | cache.set(newCache) |
81 | }) |
82 | ) |
83 | } |
84 | } |
85 | |
86 | |
87 | // TODO - add feature to flumeview-reduce which gives you a copy of the friggen reducer |
88 | function reduce (soFar, newSub) { |
89 | // process.stdout.write('c') |
90 | const { author, timestamp, channel, subscribed } = newSub |
91 | |
92 | var channelSubs = get(soFar, [channel], []) |
93 | var current = channelSubs.find(entry => entry[0] === author) |
94 | |
95 | // if current recorded statement was more recent than this 'newSub', ignore newSub |
96 | if (current && current[1] > timestamp) return soFar |
97 | |
98 | // remove all subs entries for this author |
99 | channelSubs = channelSubs.filter(entry => entry[0] !== author) |
100 | |
101 | if (subscribed) channelSubs.push([author, Number(new Date())]) |
102 | |
103 | soFar[channel] = channelSubs |
104 | |
105 | return soFar |
106 | } |
107 | |
108 | |
109 |
Built with git-ssb-web