git ssb

2+

mixmix / ticktack



Tree: 4dbb2a6693e61517c65a6eb549c9f0d8acf437b7

Files: 4dbb2a6693e61517c65a6eb549c9f0d8acf437b7 / state / obs.js

2600 bytesRaw
1var PullObv = require('pull-obv')
2var threadReduce = require('ssb-reduce-stream')
3var pull = require('pull-stream')
4var Next = require('pull-next')
5
6var nest = require('depnest')
7
8function isObject (o) {
9 return 'object' === typeof o
10}
11
12exports.gives = nest('state.obs.threads', true)
13
14exports.needs = nest({
15 'message.sync.unbox': 'first',
16 'sbot.pull.log': 'first'
17})
18
19exports.create = function (api) {
20 var threadsObs
21
22 function createStateObs (threadReduce, createStream, initial) {
23 var lastTimestamp = initial ? initial.last : Date.now()
24 var firstTimestamp = initial ? initial.first || Date.now() : Date.now()
25
26 function unbox () {
27 return pull(
28 pull.map(function (data) {
29// lastTimestamp = data.timestamp
30 if(isObject(data.value.content)) return data
31 return api.message.sync.unbox(data)
32 }),
33 pull.filter(Boolean)
34 )
35 }
36
37 var threadsObs = PullObv(
38 threadReduce,
39 pull(
40 Next(function () {
41 return createStream({reverse: true, limit: 500, lt: lastTimestamp})
42 }),
43 pull.through(function (data) {
44 lastTimestamp = data.timestamp
45 }),
46 unbox()
47 ),
48 //value recovered from localStorage
49 initial
50 )
51
52 //stream live messages. this *should* work.
53 //there is no back pressure on new events
54 //only a show more on the top (currently)
55 pull(
56 Next(function () {
57 return createStream({limit: 500, gt: firstTimestamp, live: true})
58 }),
59 pull.drain(function (data) {
60 if(data.sync) return
61 firstTimestamp = data.timestamp
62 threadsObs.set(threadReduce(threadsObs.value, data))
63 })
64 )
65
66 return threadsObs
67 }
68
69
70 return nest('state.obs.threads', function buildThreadObs() {
71 if(threadsObs) return threadsObs
72
73// var initial
74// try { initial = JSON.parse(localStorage.threadsState) }
75// catch (_) { }
76//
77
78 initial = {}
79
80 threadsObs = createStateObs(threadReduce, api.sbot.pull.log, initial)
81
82 threadsObs(function (threadsState) {
83 if(threadsState.ended && threadsState.ended !== true)
84 console.error('threadObs error:', threadsState.ended)
85 })
86
87// var timer
88// //keep localStorage up to date
89// threadsObs(function (threadsState) {
90// if(timer) return
91// timer = setTimeout(function () {
92// timer = null
93// threadsState.last = lastTimestamp
94// console.log('save state')
95// localStorage.threadsState = JSON.stringify(threadsState)
96// }, 1000)
97// })
98//
99
100 return threadsObs
101 })
102}
103
104
105
106
107

Built with git-ssb-web