git ssb

2+

mixmix / ticktack



Tree: 8f710d26c85cbaa5cb40a9ab64fe40cc85df0934

Files: 8f710d26c85cbaa5cb40a9ab64fe40cc85df0934 / state / obs.js

3840 bytesRaw
1var PullObv = require('pull-obv')
2var threadReduce = require('ssb-reduce-stream')
3var pull = require('pull-stream')
4var Next = require('pull-next')
5
6var nest = require('depnest')
7
8function isObject (o) {
9 return 'object' === typeof o
10}
11
12exports.gives = nest({
13 'state.obs.threads': true,
14 'state.obs.channel': true
15})
16
17exports.needs = nest({
18 'message.sync.unbox': 'first',
19 'sbot.pull.log': 'first',
20 'sbot.async.get': 'first',
21 'feed.pull.channel': 'first'
22})
23
24exports.create = function (api) {
25 var threadsObs
26
27 function createStateObs (reduce, createStream, opts, initial) {
28 var lastTimestamp = opts.last || Date.now()
29 var firstTimestamp = opts.first || Date.now()
30
31 function unbox () {
32 return pull(
33 pull.map(function (data) {
34 if(isObject(data.value.content)) return data
35 return api.message.sync.unbox(data)
36 }),
37 pull.filter(Boolean)
38 )
39 }
40
41 var obs = PullObv(
42 reduce,
43 pull(
44 Next(function () {
45 return createStream({reverse: true, limit: 500, lt: lastTimestamp})
46 }),
47 pull.through(function (data) {
48 lastTimestamp = data.timestamp
49 }),
50 unbox()
51 ),
52 //value recovered from localStorage
53 initial
54 )
55
56 var getting = {}, g= 0, t = 0
57 var maybe = {}
58 obs(function (state) {
59 if(state.effect) {
60 var effect = state.effect
61 state.effect = null
62 if(!getting[effect.key]) {
63 getting[effect.key] = true
64 api.sbot.async.get(effect.key, function (err, msg) {- console.log('g', g, ++t)
65 if(msg) {
66 obs.set(reduce(obs.value, {key: effect.key, value: msg}))
67 }
68 })
69 }
70 }
71 })
72
73 //stream live messages. this *should* work.
74 //there is no back pressure on new events
75 //only a show more on the top (currently)
76 pull(
77 Next(function () {
78 return createStream({limit: 500, gt: firstTimestamp, live: true})
79 }),
80 pull.drain(function (data) {
81 if(data.sync) return
82 firstTimestamp = data.timestamp
83 obs.set(reduce(obs.value, data))
84 })
85 )
86
87 return obs
88 }
89
90
91 return nest({
92 'state.obs.channel': function (channel) {
93
94 return createStateObs(
95 threadReduce,
96 function (opts) {
97 return opts.reverse ?
98 api.feed.pull.channel(channel)(opts):
99 pull(api.sbot.pull.log(opts), pull.filter(function (data) {
100 if(data.sync) return false
101 return data.value.content.channel === channel
102 }))
103 },
104 {}
105 )
106
107 // var channelObs = PullObv(
108 // threadReduce,
109 // createChannelStream({reverse: true, limit: 1000})
110 // )
111
112
113 },
114
115 'state.obs.threads': function buildThreadObs() {
116 if(threadsObs) return threadsObs
117
118 // DISABLE localStorage cache. mainly disabling this to make debugging the other stuff
119 // easier. maybe re-enable this later? also, should this be for every channel too? not sure.
120
121 // var initial
122 // try { initial = JSON.parse(localStorage.threadsState) }
123 // catch (_) { }
124
125 initial = {}
126
127 threadsObs = createStateObs(threadReduce, api.sbot.pull.log, initial, {})
128
129 threadsObs(function (threadsState) {
130 if(threadsState.ended && threadsState.ended !== true)
131 console.error('threadObs error:', threadsState.ended)
132 })
133
134 // var timer
135 // //keep localStorage up to date
136 // threadsObs(function (threadsState) {
137 // if(timer) return
138 // timer = setTimeout(function () {
139 // timer = null
140 // threadsState.last = lastTimestamp
141 // console.log('save state')
142 // localStorage.threadsState = JSON.stringify(threadsState)
143 // }, 1000)
144 // })
145
146 return threadsObs
147 }
148 })
149}
150
151
152

Built with git-ssb-web