Files: fee2e3b7fdd8450fbfe1e46220642ed555f52eab / state / obs.js
3723 bytesRaw
1 | var nest = require('depnest') |
2 | var PullObv = require('pull-obv') |
3 | var threadReduce = require('ssb-reduce-stream') |
4 | var pull = require('pull-stream') |
5 | var Next = require('pull-next') |
6 | var isObject = require('lodash/isObject') |
7 | |
8 | exports.gives = nest({ |
9 | 'state.obs.threads': true, |
10 | 'state.obs.channel': true |
11 | }) |
12 | |
13 | exports.needs = nest({ |
14 | 'message.sync.unbox': 'first', |
15 | 'sbot.pull.log': 'first', |
16 | 'sbot.async.get': 'first', |
17 | 'feed.pull.channel': 'first' |
18 | }) |
19 | |
20 | exports.create = function (api) { |
21 | var threadsObs |
22 | |
23 | function createStateObs (reduce, createStream, opts, initial) { |
24 | var lastTimestamp = opts.last || Date.now() |
25 | var firstTimestamp = opts.first || Date.now() |
26 | |
27 | function unbox (data) { |
28 | if(data.sync) return data |
29 | if(isObject(data.value.content)) return data |
30 | return api.message.sync.unbox(data) |
31 | } |
32 | |
33 | var obs = PullObv( |
34 | reduce, |
35 | pull( |
36 | Next(function () { |
37 | return createStream({reverse: true, limit: 500, lt: lastTimestamp}) |
38 | }), |
39 | pull.through(function (data) { |
40 | lastTimestamp = data.timestamp |
41 | }), |
42 | pull.map(unbox), pull.filter(Boolean) |
43 | ), |
44 | //value recovered from localStorage |
45 | initial |
46 | ) |
47 | |
48 | var getting = {} |
49 | obs(function (state) { |
50 | var effect = state.effect |
51 | if(!effect) return |
52 | |
53 | state.effect = null |
54 | if(getting[effect.key]) return |
55 | |
56 | getting[effect.key] = true |
57 | api.sbot.async.get(effect.key, (err, msg) => { |
58 | if (!msg) return |
59 | obs.set(STATE=reduce(obs.value, unbox({key: effect.key, value: msg}))) |
60 | }) |
61 | }) |
62 | |
63 | //stream live messages. this *should* work. |
64 | //there is no back pressure on new events |
65 | //only a show more on the top (currently) |
66 | pull( |
67 | Next(function () { |
68 | return createStream({limit: 500, gt: firstTimestamp, live: true}) |
69 | }), |
70 | pull.map(unbox), pull.filter(Boolean), |
71 | pull.drain(function (data) { |
72 | if(data.sync) return |
73 | firstTimestamp = data.timestamp |
74 | obs.set(reduce(obs.value, data)) |
75 | }) |
76 | ) |
77 | |
78 | return obs |
79 | } |
80 | |
81 | |
82 | return nest({ |
83 | 'state.obs.channel': function (channel) { |
84 | |
85 | return createStateObs( |
86 | threadReduce, |
87 | function (opts) { |
88 | return opts.reverse ? |
89 | api.feed.pull.channel(channel)(opts): |
90 | pull(api.sbot.pull.log(opts), pull.filter(function (data) { |
91 | if(data.sync) return false |
92 | return data.value.content.channel === channel |
93 | })) |
94 | }, |
95 | {} |
96 | ) |
97 | |
98 | // var channelObs = PullObv( |
99 | // threadReduce, |
100 | // createChannelStream({reverse: true, limit: 1000}) |
101 | // ) |
102 | |
103 | |
104 | }, |
105 | |
106 | 'state.obs.threads': function buildThreadObs() { |
107 | if(threadsObs) return threadsObs |
108 | |
109 | // DISABLE localStorage cache. mainly disabling this to make debugging the other stuff |
110 | // easier. maybe re-enable this later? also, should this be for every channel too? not sure. |
111 | |
112 | // var initial |
113 | // try { initial = JSON.parse(localStorage.threadsState) } |
114 | // catch (_) { } |
115 | |
116 | initial = {} |
117 | |
118 | threadsObs = createStateObs(threadReduce, api.sbot.pull.log, initial, {}) |
119 | |
120 | threadsObs(function (threadsState) { |
121 | if(threadsState.ended && threadsState.ended !== true) |
122 | console.error('threadObs error:', threadsState.ended) |
123 | }) |
124 | |
125 | // var timer |
126 | // //keep localStorage up to date |
127 | // threadsObs(function (threadsState) { |
128 | // if(timer) return |
129 | // timer = setTimeout(function () { |
130 | // timer = null |
131 | // threadsState.last = lastTimestamp |
132 | // console.log('save state') |
133 | // localStorage.threadsState = JSON.stringify(threadsState) |
134 | // }, 1000) |
135 | // }) |
136 | |
137 | return threadsObs |
138 | } |
139 | }) |
140 | } |
141 | |
142 | |
143 | |
144 | |
145 | |
146 | |
147 | |
148 | |
149 | |
150 | |
151 |
Built with git-ssb-web