git ssb

2+

mixmix / ticktack



Tree: 89f71075d1cca296946814be2c184d128e17d3d1

Files: 89f71075d1cca296946814be2c184d128e17d3d1 / state / obs.js

2671 bytesRaw
1var PullObv = require('pull-obv')
2var threadReduce = require('ssb-reduce-stream')
3var pull = require('pull-stream')
4var Next = require('pull-next')
5
6var nest = require('depnest')
7
8function isObject (o) {
9 return 'object' === typeof o
10}
11
12exports.gives = nest('state.obs.threads', true)
13
14exports.needs = nest({
15 'message.sync.unbox': 'first',
16 'sbot.pull.log': 'first'
17})
18
19exports.create = function (api) {
20 var threadsObs
21
22 function createStateObs (threadReduce, createStream, opts, initial) {
23// var lastTimestamp = initial ? initial.last : Date.now()
24// var firstTimestamp = initial ? initial.first || Date.now() : Date.now()
25
26 var lastTimestamp = opts.last || Date.now()
27 var firstTimestamp = opts.first || Date.now()
28
29 function unbox () {
30 return pull(
31 pull.map(function (data) {
32 if(isObject(data.value.content)) return data
33 return api.message.sync.unbox(data)
34 }),
35 pull.filter(Boolean)
36 )
37 }
38
39 var threadsObs = PullObv(
40 threadReduce,
41 pull(
42 Next(function () {
43 return createStream({reverse: true, limit: 500, lt: lastTimestamp})
44 }),
45 pull.through(function (data) {
46 lastTimestamp = data.timestamp
47 }),
48 unbox()
49 ),
50 //value recovered from localStorage
51 initial
52 )
53
54 //stream live messages. this *should* work.
55 //there is no back pressure on new events
56 //only a show more on the top (currently)
57 pull(
58 Next(function () {
59 return createStream({limit: 500, gt: firstTimestamp, live: true})
60 }),
61 pull.drain(function (data) {
62 if(data.sync) return
63 firstTimestamp = data.timestamp
64 threadsObs.set(threadReduce(threadsObs.value, data))
65 })
66 )
67
68 return threadsObs
69 }
70
71
72 return nest('state.obs.threads', function buildThreadObs() {
73 if(threadsObs) return threadsObs
74
75// var initial
76// try { initial = JSON.parse(localStorage.threadsState) }
77// catch (_) { }
78//
79
80 initial = {}
81
82 threadsObs = createStateObs(threadReduce, api.sbot.pull.log, initial, {})
83
84 threadsObs(function (threadsState) {
85 if(threadsState.ended && threadsState.ended !== true)
86 console.error('threadObs error:', threadsState.ended)
87 })
88
89// var timer
90// //keep localStorage up to date
91// threadsObs(function (threadsState) {
92// if(timer) return
93// timer = setTimeout(function () {
94// timer = null
95// threadsState.last = lastTimestamp
96// console.log('save state')
97// localStorage.threadsState = JSON.stringify(threadsState)
98// }, 1000)
99// })
100//
101
102 return threadsObs
103 })
104}
105
106
107
108
109
110

Built with git-ssb-web