git ssb

2+

mixmix / ticktack



Tree: ccad428636797b9b8b88aedf21978ba1e991bb60

Files: ccad428636797b9b8b88aedf21978ba1e991bb60 / state / obs.js

2677 bytesRaw
1var PullObv = require('pull-obv')
2var threadReduce = require('ssb-reduce-stream')
3var pull = require('pull-stream')
4var Next = require('pull-next')
5
6var nest = require('depnest')
7
8function isObject (o) {
9 return 'object' === typeof o
10}
11
12exports.gives = nest('state.obs.threads', true)
13
14exports.needs = nest({
15 'message.sync.unbox': 'first',
16 'sbot.pull.log': 'first'
17})
18
19exports.create = function (api) {
20 var threadsObs
21
22 return nest('state.obs.threads', function buildThreadObs() {
23 if(threadsObs) return threadsObs
24
25// var initial
26// try { initial = JSON.parse(localStorage.threadsState) }
27// catch (_) { }
28//
29
30 initial = {}
31
32 function createStateObs (threadReduce, createStream, initial) {
33 var lastTimestamp = initial ? initial.last : Date.now()
34 var firstTimestamp = initial ? initial.first || Date.now() : Date.now()
35
36 function unbox () {
37 return pull(
38 pull.map(function (data) {
39 lastTimestamp = data.timestamp
40 if(isObject(data.value.content)) return data
41 return api.message.sync.unbox(data)
42 }),
43 pull.filter(Boolean)
44 )
45 }
46
47 var threadsObs = PullObv(
48 threadReduce,
49 pull(
50 Next(function () {
51 return api.sbot.pull.log({reverse: true, limit: 500, lt: lastTimestamp})
52 }),
53 pull.through(function (data) {
54 lastTimestamp = data.timestamp
55 }),
56 unbox()
57 ),
58 //value recovered from localStorage
59 initial
60 )
61
62 //stream live messages. this *should* work.
63 //there is no back pressure on new events
64 //only a show more on the top (currently)
65 pull(
66 Next(function () {
67 return api.sbot.pull.log({limit: 500, gt: firstTimestamp, live: true})
68 }),
69 pull.drain(function (data) {
70 if(data.sync) return
71 firstTimestamp = data.timestamp
72 threadsObs.set(threadReduce(threadsObs.value, data))
73 })
74 )
75
76 return threadsObs
77 }
78
79 threadsObs = createStateObs(threadReduce, null, initial)
80
81 threadsObs(function (threadsState) {
82 if(threadsState.ended && threadsState.ended !== true)
83 console.error('threadObs error:', threadsState.ended)
84 })
85
86// var timer
87// //keep localStorage up to date
88// threadsObs(function (threadsState) {
89// if(timer) return
90// timer = setTimeout(function () {
91// timer = null
92// threadsState.last = lastTimestamp
93// console.log('save state')
94// localStorage.threadsState = JSON.stringify(threadsState)
95// }, 1000)
96// })
97//
98
99 return threadsObs
100 })
101}
102
103
104
105

Built with git-ssb-web