Files: 387dcd7ac040579bb840df3e94bfd3bf9ab361b2 / state / obs.js
3414 bytesRaw
1 | var nest = require('depnest') |
2 | var PullObv = require('pull-obv') |
3 | var threadReduce = require('ssb-reduce-stream') |
4 | var pull = require('pull-stream') |
5 | var Next = require('pull-next') |
6 | var isObject = require('lodash/isObject') |
7 | |
8 | exports.gives = nest({ |
9 | 'state.obs.threads': true, |
10 | 'state.obs.channel': true |
11 | }) |
12 | |
13 | exports.needs = nest({ |
14 | 'message.sync.unbox': 'first', |
15 | 'sbot.pull.log': 'first', |
16 | 'sbot.async.get': 'first', |
17 | 'feed.pull.channel': 'first', |
18 | 'feed.pull.rollup': 'first' |
19 | }) |
20 | |
21 | exports.create = function (api) { |
22 | var threadsObs |
23 | |
24 | function createStateObs (reduce, createStream, opts, initial) { |
25 | var lastTimestamp = opts.last || Date.now() |
26 | var firstTimestamp = opts.first || Date.now() |
27 | |
28 | function unbox (data) { |
29 | if (data.sync) return data |
30 | if (isObject(data.value.content)) return data |
31 | return api.message.sync.unbox(data) |
32 | } |
33 | |
34 | var obs = PullObv( |
35 | reduce, |
36 | pull( |
37 | Next(function () { |
38 | return createStream({reverse: true, limit: 500, lt: lastTimestamp}) |
39 | }), |
40 | pull.through(function (data) { |
41 | lastTimestamp = data.timestamp |
42 | }), |
43 | pull.map(unbox), |
44 | pull.filter(Boolean), |
45 | api.feed.pull.rollup() |
46 | ), |
47 | // value recovered from localStorage |
48 | initial |
49 | ) |
50 | |
51 | // stream live messages. this *should* work. |
52 | // there is no back pressure on new events |
53 | // only a show more on the top (currently) |
54 | pull( |
55 | Next(function () { |
56 | return createStream({limit: 500, gt: firstTimestamp, live: true}) |
57 | }), |
58 | pull.map(unbox), pull.filter(Boolean), |
59 | pull.drain(function (data) { |
60 | if (data.sync) return |
61 | firstTimestamp = data.timestamp |
62 | obs.set(reduce(obs.value, data)) |
63 | }) |
64 | ) |
65 | |
66 | return obs |
67 | } |
68 | |
69 | return nest({ |
70 | 'state.obs.channel': function (channel) { |
71 | return createStateObs( |
72 | threadReduce, |
73 | function (opts) { |
74 | return opts.reverse |
75 | ? api.feed.pull.channel(channel)(opts) |
76 | : pull(api.sbot.pull.log(opts), pull.filter(function (data) { |
77 | if (data.sync) return false |
78 | return data.value.content.channel === channel |
79 | })) |
80 | }, |
81 | {} |
82 | ) |
83 | |
84 | // var channelObs = PullObv( |
85 | // threadReduce, |
86 | // createChannelStream({reverse: true, limit: 1000}) |
87 | // ) |
88 | }, |
89 | |
90 | 'state.obs.threads': function buildThreadObs () { |
91 | if (threadsObs) return threadsObs |
92 | |
93 | // DISABLE localStorage cache. mainly disabling this to make debugging the other stuff |
94 | // easier. maybe re-enable this later? also, should this be for every channel too? not sure. |
95 | |
96 | // var initial |
97 | // try { initial = JSON.parse(localStorage.threadsState) } |
98 | // catch (_) { } |
99 | |
100 | initial = {} |
101 | |
102 | threadsObs = createStateObs(threadReduce, api.sbot.pull.log, initial, {}) |
103 | |
104 | threadsObs(function (threadsState) { |
105 | if (threadsState.ended && threadsState.ended !== true) { console.error('threadObs error:', threadsState.ended) } |
106 | }) |
107 | |
108 | // var timer |
109 | // //keep localStorage up to date |
110 | // threadsObs(function (threadsState) { |
111 | // if(timer) return |
112 | // timer = setTimeout(function () { |
113 | // timer = null |
114 | // threadsState.last = lastTimestamp |
115 | // console.log('save state') |
116 | // localStorage.threadsState = JSON.stringify(threadsState) |
117 | // }, 1000) |
118 | // }) |
119 | |
120 | return threadsObs |
121 | } |
122 | }) |
123 | } |
124 |
Built with git-ssb-web