git ssb

2+

mixmix / ticktack



Tree: 8639104816f8873a79f9777728e495ac65b7361e

Files: 8639104816f8873a79f9777728e495ac65b7361e / state / obs.js

3703 bytesRaw
1var nest = require('depnest')
2var PullObv = require('pull-obv')
3var threadReduce = require('ssb-reduce-stream')
4var pull = require('pull-stream')
5var Next = require('pull-next')
6var isObject = require('lodash/isObject')
7
8exports.gives = nest({
9 'state.obs.threads': true,
10 'state.obs.channel': true
11})
12
13exports.needs = nest({
14 'message.sync.unbox': 'first',
15 'sbot.pull.log': 'first',
16 'sbot.async.get': 'first',
17 'feed.pull.channel': 'first'
18})
19
20exports.create = function (api) {
21 var threadsObs
22
23 function createStateObs (reduce, createStream, opts, initial) {
24 var lastTimestamp = opts.last || Date.now()
25 var firstTimestamp = opts.first || Date.now()
26
27 function unbox () {
28 return pull(
29 pull.map(function (data) {
30 if(isObject(data.value.content)) return data
31 return api.message.sync.unbox(data)
32 }),
33 pull.filter(Boolean)
34 )
35 }
36
37 var obs = PullObv(
38 reduce,
39 pull(
40 Next(function () {
41 return createStream({reverse: true, limit: 500, lt: lastTimestamp})
42 }),
43 pull.through(function (data) {
44 lastTimestamp = data.timestamp
45 }),
46 unbox()
47 ),
48 //value recovered from localStorage
49 initial
50 )
51
52 var getting = {}
53 obs(function (state) {
54 var effect = state.effect
55 if(!effect) return
56
57 state.effect = null
58 if(getting[effect.key]) return
59
60 getting[effect.key] = true
61 api.sbot.async.get(effect.key, (err, msg) => {
62 if (!msg) return
63 obs.set(reduce(obs.value, {key: effect.key, value: msg}))
64 })
65 })
66
67 //stream live messages. this *should* work.
68 //there is no back pressure on new events
69 //only a show more on the top (currently)
70 pull(
71 Next(function () {
72 return createStream({limit: 500, gt: firstTimestamp, live: true})
73 }),
74 pull.drain(function (data) {
75 if(data.sync) return
76 firstTimestamp = data.timestamp
77 obs.set(reduce(obs.value, data))
78 })
79 )
80
81 return obs
82 }
83
84
85 return nest({
86 'state.obs.channel': function (channel) {
87
88 return createStateObs(
89 threadReduce,
90 function (opts) {
91 return opts.reverse ?
92 api.feed.pull.channel(channel)(opts):
93 pull(api.sbot.pull.log(opts), pull.filter(function (data) {
94 if(data.sync) return false
95 return data.value.content.channel === channel
96 }))
97 },
98 {}
99 )
100
101 // var channelObs = PullObv(
102 // threadReduce,
103 // createChannelStream({reverse: true, limit: 1000})
104 // )
105
106
107 },
108
109 'state.obs.threads': function buildThreadObs() {
110 if(threadsObs) return threadsObs
111
112 // DISABLE localStorage cache. mainly disabling this to make debugging the other stuff
113 // easier. maybe re-enable this later? also, should this be for every channel too? not sure.
114
115 // var initial
116 // try { initial = JSON.parse(localStorage.threadsState) }
117 // catch (_) { }
118
119 initial = {}
120
121 threadsObs = createStateObs(threadReduce, api.sbot.pull.log, initial, {})
122
123 threadsObs(function (threadsState) {
124 if(threadsState.ended && threadsState.ended !== true)
125 console.error('threadObs error:', threadsState.ended)
126 })
127
128 // var timer
129 // //keep localStorage up to date
130 // threadsObs(function (threadsState) {
131 // if(timer) return
132 // timer = setTimeout(function () {
133 // timer = null
134 // threadsState.last = lastTimestamp
135 // console.log('save state')
136 // localStorage.threadsState = JSON.stringify(threadsState)
137 // }, 1000)
138 // })
139
140 return threadsObs
141 }
142 })
143}
144
145
146

Built with git-ssb-web