git ssb

2+

mixmix / ticktack



Tree: 17b2f7bb97825fb0650e466902418fcf8442c5fd

Files: 17b2f7bb97825fb0650e466902418fcf8442c5fd / state / obs.js

3753 bytesRaw
1var PullObv = require('pull-obv')
2var threadReduce = require('ssb-reduce-stream')
3var pull = require('pull-stream')
4var Next = require('pull-next')
5
6var nest = require('depnest')
7
8function isObject (o) {
9 return 'object' === typeof o
10}
11
12exports.gives = nest({
13 'state.obs.threads': true,
14 'state.obs.channel': true
15})
16
17exports.needs = nest({
18 'message.sync.unbox': 'first',
19 'sbot.pull.log': 'first',
20 'sbot.async.get': 'first',
21 'feed.pull.channel': 'first'
22})
23
24exports.create = function (api) {
25 var threadsObs
26
27 function createStateObs (reduce, createStream, opts, initial) {
28 var lastTimestamp = opts.last || Date.now()
29 var firstTimestamp = opts.first || Date.now()
30
31 function unbox () {
32 return pull(
33 pull.map(function (data) {
34 if(isObject(data.value.content)) return data
35 return api.message.sync.unbox(data)
36 }),
37 pull.filter(Boolean)
38 )
39 }
40
41 var obs = PullObv(
42 reduce,
43 pull(
44 Next(function () {
45 return createStream({reverse: true, limit: 500, lt: lastTimestamp})
46 }),
47 pull.through(function (data) {
48 lastTimestamp = data.timestamp
49 }),
50 unbox()
51 ),
52 //value recovered from localStorage
53 initial
54 )
55
56 var getting = {}, g = 0, t = 0
57 var maybe = {}
58 obs(function (state) {
59 var effect = state.effect
60 if(!effect) return
61
62 state.effect = null
63 if(getting[effect.key]) return
64
65 getting[effect.key] = true
66 api.sbot.async.get(effect.key, (err, msg) => {
67 if (!msg) return
68 obs.set(reduce(obs.value, {key: effect.key, value: msg}))
69 })
70 })
71
72 //stream live messages. this *should* work.
73 //there is no back pressure on new events
74 //only a show more on the top (currently)
75 pull(
76 Next(function () {
77 return createStream({limit: 500, gt: firstTimestamp, live: true})
78 }),
79 pull.drain(function (data) {
80 if(data.sync) return
81 firstTimestamp = data.timestamp
82 obs.set(reduce(obs.value, data))
83 })
84 )
85
86 return obs
87 }
88
89
90 return nest({
91 'state.obs.channel': function (channel) {
92
93 return createStateObs(
94 threadReduce,
95 function (opts) {
96 return opts.reverse ?
97 api.feed.pull.channel(channel)(opts):
98 pull(api.sbot.pull.log(opts), pull.filter(function (data) {
99 if(data.sync) return false
100 return data.value.content.channel === channel
101 }))
102 },
103 {}
104 )
105
106 // var channelObs = PullObv(
107 // threadReduce,
108 // createChannelStream({reverse: true, limit: 1000})
109 // )
110
111
112 },
113
114 'state.obs.threads': function buildThreadObs() {
115 if(threadsObs) return threadsObs
116
117 // DISABLE localStorage cache. mainly disabling this to make debugging the other stuff
118 // easier. maybe re-enable this later? also, should this be for every channel too? not sure.
119
120 // var initial
121 // try { initial = JSON.parse(localStorage.threadsState) }
122 // catch (_) { }
123
124 initial = {}
125
126 threadsObs = createStateObs(threadReduce, api.sbot.pull.log, initial, {})
127
128 threadsObs(function (threadsState) {
129 if(threadsState.ended && threadsState.ended !== true)
130 console.error('threadObs error:', threadsState.ended)
131 })
132
133 // var timer
134 // //keep localStorage up to date
135 // threadsObs(function (threadsState) {
136 // if(timer) return
137 // timer = setTimeout(function () {
138 // timer = null
139 // threadsState.last = lastTimestamp
140 // console.log('save state')
141 // localStorage.threadsState = JSON.stringify(threadsState)
142 // }, 1000)
143 // })
144
145 return threadsObs
146 }
147 })
148}
149
150
151

Built with git-ssb-web