Files: 8f710d26c85cbaa5cb40a9ab64fe40cc85df0934 / state / obs.js
3840 bytesRaw
1 | var PullObv = require('pull-obv') |
2 | var threadReduce = require('ssb-reduce-stream') |
3 | var pull = require('pull-stream') |
4 | var Next = require('pull-next') |
5 | |
6 | var nest = require('depnest') |
7 | |
8 | function isObject (o) { |
9 | return 'object' === typeof o |
10 | } |
11 | |
12 | exports.gives = nest({ |
13 | 'state.obs.threads': true, |
14 | 'state.obs.channel': true |
15 | }) |
16 | |
17 | exports.needs = nest({ |
18 | 'message.sync.unbox': 'first', |
19 | 'sbot.pull.log': 'first', |
20 | 'sbot.async.get': 'first', |
21 | 'feed.pull.channel': 'first' |
22 | }) |
23 | |
24 | exports.create = function (api) { |
25 | var threadsObs |
26 | |
27 | function createStateObs (reduce, createStream, opts, initial) { |
28 | var lastTimestamp = opts.last || Date.now() |
29 | var firstTimestamp = opts.first || Date.now() |
30 | |
31 | function unbox () { |
32 | return pull( |
33 | pull.map(function (data) { |
34 | if(isObject(data.value.content)) return data |
35 | return api.message.sync.unbox(data) |
36 | }), |
37 | pull.filter(Boolean) |
38 | ) |
39 | } |
40 | |
41 | var obs = PullObv( |
42 | reduce, |
43 | pull( |
44 | Next(function () { |
45 | return createStream({reverse: true, limit: 500, lt: lastTimestamp}) |
46 | }), |
47 | pull.through(function (data) { |
48 | lastTimestamp = data.timestamp |
49 | }), |
50 | unbox() |
51 | ), |
52 | //value recovered from localStorage |
53 | initial |
54 | ) |
55 | |
56 | var getting = {}, g= 0, t = 0 |
57 | var maybe = {} |
58 | obs(function (state) { |
59 | if(state.effect) { |
60 | var effect = state.effect |
61 | state.effect = null |
62 | if(!getting[effect.key]) { |
63 | getting[effect.key] = true |
64 | api.sbot.async.get(effect.key, function (err, msg) {- console.log('g', g, ++t) |
65 | if(msg) { |
66 | obs.set(reduce(obs.value, {key: effect.key, value: msg})) |
67 | } |
68 | }) |
69 | } |
70 | } |
71 | }) |
72 | |
73 | //stream live messages. this *should* work. |
74 | //there is no back pressure on new events |
75 | //only a show more on the top (currently) |
76 | pull( |
77 | Next(function () { |
78 | return createStream({limit: 500, gt: firstTimestamp, live: true}) |
79 | }), |
80 | pull.drain(function (data) { |
81 | if(data.sync) return |
82 | firstTimestamp = data.timestamp |
83 | obs.set(reduce(obs.value, data)) |
84 | }) |
85 | ) |
86 | |
87 | return obs |
88 | } |
89 | |
90 | |
91 | return nest({ |
92 | 'state.obs.channel': function (channel) { |
93 | |
94 | return createStateObs( |
95 | threadReduce, |
96 | function (opts) { |
97 | return opts.reverse ? |
98 | api.feed.pull.channel(channel)(opts): |
99 | pull(api.sbot.pull.log(opts), pull.filter(function (data) { |
100 | if(data.sync) return false |
101 | return data.value.content.channel === channel |
102 | })) |
103 | }, |
104 | {} |
105 | ) |
106 | |
107 | // var channelObs = PullObv( |
108 | // threadReduce, |
109 | // createChannelStream({reverse: true, limit: 1000}) |
110 | // ) |
111 | |
112 | |
113 | }, |
114 | |
115 | 'state.obs.threads': function buildThreadObs() { |
116 | if(threadsObs) return threadsObs |
117 | |
118 | // DISABLE localStorage cache. mainly disabling this to make debugging the other stuff |
119 | // easier. maybe re-enable this later? also, should this be for every channel too? not sure. |
120 | |
121 | // var initial |
122 | // try { initial = JSON.parse(localStorage.threadsState) } |
123 | // catch (_) { } |
124 | |
125 | initial = {} |
126 | |
127 | threadsObs = createStateObs(threadReduce, api.sbot.pull.log, initial, {}) |
128 | |
129 | threadsObs(function (threadsState) { |
130 | if(threadsState.ended && threadsState.ended !== true) |
131 | console.error('threadObs error:', threadsState.ended) |
132 | }) |
133 | |
134 | // var timer |
135 | // //keep localStorage up to date |
136 | // threadsObs(function (threadsState) { |
137 | // if(timer) return |
138 | // timer = setTimeout(function () { |
139 | // timer = null |
140 | // threadsState.last = lastTimestamp |
141 | // console.log('save state') |
142 | // localStorage.threadsState = JSON.stringify(threadsState) |
143 | // }, 1000) |
144 | // }) |
145 | |
146 | return threadsObs |
147 | } |
148 | }) |
149 | } |
150 | |
151 | |
152 |
Built with git-ssb-web