git ssb

2+

mixmix / ticktack



Tree: aa4c25f3c365e275c42b08f8e34ef4e6c2b66d93

Files: aa4c25f3c365e275c42b08f8e34ef4e6c2b66d93 / state / obs.js

3417 bytesRaw
1var nest = require('depnest')
2var PullObv = require('pull-obv')
3var threadReduce = require('ssb-reduce-stream')
4var pull = require('pull-stream')
5var Next = require('pull-next')
6var isObject = require('lodash/isObject')
7
8exports.gives = nest({
9 'state.obs.threads': true,
10 'state.obs.channel': true
11})
12
13exports.needs = nest({
14 'message.sync.unbox': 'first',
15 'sbot.pull.log': 'first',
16 'sbot.async.get': 'first',
17 'feed.pull.channel': 'first',
18 'feed.pull.rollup': 'first'
19})
20
21exports.create = function (api) {
22 var threadsObs
23
24 function createStateObs (reduce, createStream, opts, initial) {
25 var lastTimestamp = opts.last || Date.now()
26 var firstTimestamp = opts.first || Date.now()
27
28 function unbox (data) {
29 if(data.sync) return data
30 if(isObject(data.value.content)) return data
31 return api.message.sync.unbox(data)
32 }
33
34 var obs = PullObv(
35 reduce,
36 pull(
37 Next(function () {
38 return createStream({reverse: true, limit: 500, lt: lastTimestamp})
39 }),
40 pull.through(function (data) {
41 lastTimestamp = data.timestamp
42 }),
43 pull.map(unbox),
44 pull.filter(Boolean),
45 api.feed.pull.rollup()
46 ),
47 //value recovered from localStorage
48 initial
49 )
50
51 //stream live messages. this *should* work.
52 //there is no back pressure on new events
53 //only a show more on the top (currently)
54 pull(
55 Next(function () {
56 return createStream({limit: 500, gt: firstTimestamp, live: true})
57 }),
58 pull.map(unbox), pull.filter(Boolean),
59 pull.drain(function (data) {
60 if(data.sync) return
61 firstTimestamp = data.timestamp
62 obs.set(reduce(obs.value, data))
63 })
64 )
65
66 return obs
67 }
68
69
70 return nest({
71 'state.obs.channel': function (channel) {
72
73 return createStateObs(
74 threadReduce,
75 function (opts) {
76 return opts.reverse ?
77 api.feed.pull.channel(channel)(opts):
78 pull(api.sbot.pull.log(opts), pull.filter(function (data) {
79 if(data.sync) return false
80 return data.value.content.channel === channel
81 }))
82 },
83 {}
84 )
85
86 // var channelObs = PullObv(
87 // threadReduce,
88 // createChannelStream({reverse: true, limit: 1000})
89 // )
90
91
92 },
93
94 'state.obs.threads': function buildThreadObs() {
95 if (threadsObs) return threadsObs
96
97 // DISABLE localStorage cache. mainly disabling this to make debugging the other stuff
98 // easier. maybe re-enable this later? also, should this be for every channel too? not sure.
99
100 // var initial
101 // try { initial = JSON.parse(localStorage.threadsState) }
102 // catch (_) { }
103
104 initial = {}
105
106 threadsObs = createStateObs(threadReduce, api.sbot.pull.log, initial, {})
107
108 threadsObs(function (threadsState) {
109 if(threadsState.ended && threadsState.ended !== true)
110 console.error('threadObs error:', threadsState.ended)
111 })
112
113 // var timer
114 // //keep localStorage up to date
115 // threadsObs(function (threadsState) {
116 // if(timer) return
117 // timer = setTimeout(function () {
118 // timer = null
119 // threadsState.last = lastTimestamp
120 // console.log('save state')
121 // localStorage.threadsState = JSON.stringify(threadsState)
122 // }, 1000)
123 // })
124
125 return threadsObs
126 }
127 })
128}
129
130
131
132
133
134
135
136
137
138
139

Built with git-ssb-web