git ssb

2+

mixmix / ticktack



Tree: 96a0fbf9d489182265e23a3452bd6ee7e963abbb

Files: 96a0fbf9d489182265e23a3452bd6ee7e963abbb / state / obs.js

3723 bytesRaw
1var nest = require('depnest')
2var PullObv = require('pull-obv')
3var threadReduce = require('ssb-reduce-stream')
4var pull = require('pull-stream')
5var Next = require('pull-next')
6var isObject = require('lodash/isObject')
7
8exports.gives = nest({
9 'state.obs.threads': true,
10 'state.obs.channel': true
11})
12
13exports.needs = nest({
14 'message.sync.unbox': 'first',
15 'sbot.pull.log': 'first',
16 'sbot.async.get': 'first',
17 'feed.pull.channel': 'first'
18})
19
20exports.create = function (api) {
21 var threadsObs
22
23 function createStateObs (reduce, createStream, opts, initial) {
24 var lastTimestamp = opts.last || Date.now()
25 var firstTimestamp = opts.first || Date.now()
26
27 function unbox (data) {
28 if(data.sync) return data
29 if(isObject(data.value.content)) return data
30 return api.message.sync.unbox(data)
31 }
32
33 var obs = PullObv(
34 reduce,
35 pull(
36 Next(function () {
37 return createStream({reverse: true, limit: 500, lt: lastTimestamp})
38 }),
39 pull.through(function (data) {
40 lastTimestamp = data.timestamp
41 }),
42 pull.map(unbox), pull.filter(Boolean)
43 ),
44 //value recovered from localStorage
45 initial
46 )
47
48 var getting = {}
49 obs(function (state) {
50 var effect = state.effect
51 if(!effect) return
52
53 state.effect = null
54 if(getting[effect.key]) return
55
56 getting[effect.key] = true
57 api.sbot.async.get(effect.key, (err, msg) => {
58 if (!msg) return
59 obs.set(STATE=reduce(obs.value, unbox({key: effect.key, value: msg})))
60 })
61 })
62
63 //stream live messages. this *should* work.
64 //there is no back pressure on new events
65 //only a show more on the top (currently)
66 pull(
67 Next(function () {
68 return createStream({limit: 500, gt: firstTimestamp, live: true})
69 }),
70 pull.map(unbox), pull.filter(Boolean),
71 pull.drain(function (data) {
72 if(data.sync) return
73 firstTimestamp = data.timestamp
74 obs.set(reduce(obs.value, data))
75 })
76 )
77
78 return obs
79 }
80
81
82 return nest({
83 'state.obs.channel': function (channel) {
84
85 return createStateObs(
86 threadReduce,
87 function (opts) {
88 return opts.reverse ?
89 api.feed.pull.channel(channel)(opts):
90 pull(api.sbot.pull.log(opts), pull.filter(function (data) {
91 if(data.sync) return false
92 return data.value.content.channel === channel
93 }))
94 },
95 {}
96 )
97
98 // var channelObs = PullObv(
99 // threadReduce,
100 // createChannelStream({reverse: true, limit: 1000})
101 // )
102
103
104 },
105
106 'state.obs.threads': function buildThreadObs() {
107 if(threadsObs) return threadsObs
108
109 // DISABLE localStorage cache. mainly disabling this to make debugging the other stuff
110 // easier. maybe re-enable this later? also, should this be for every channel too? not sure.
111
112 // var initial
113 // try { initial = JSON.parse(localStorage.threadsState) }
114 // catch (_) { }
115
116 initial = {}
117
118 threadsObs = createStateObs(threadReduce, api.sbot.pull.log, initial, {})
119
120 threadsObs(function (threadsState) {
121 if(threadsState.ended && threadsState.ended !== true)
122 console.error('threadObs error:', threadsState.ended)
123 })
124
125 // var timer
126 // //keep localStorage up to date
127 // threadsObs(function (threadsState) {
128 // if(timer) return
129 // timer = setTimeout(function () {
130 // timer = null
131 // threadsState.last = lastTimestamp
132 // console.log('save state')
133 // localStorage.threadsState = JSON.stringify(threadsState)
134 // }, 1000)
135 // })
136
137 return threadsObs
138 }
139 })
140}
141
142
143
144
145
146
147
148
149
150
151

Built with git-ssb-web