git ssb

2+

mixmix / ticktack



Tree: 444eaa30adcfd10079189eda3f5f4b4ccfce3f61

Files: 444eaa30adcfd10079189eda3f5f4b4ccfce3f61 / state / obs.js

2333 bytesRaw
1var PullObv = require('pull-obv')
2var threadReduce = require('ssb-reduce-stream')
3var pull = require('pull-stream')
4const Next = require('pull-next')
5
6var nest = require('depnest')
7
8function isObject (o) {
9 return 'object' === typeof o
10}
11
12exports.gives = nest('state.obs.threads', true)
13
14//{
15// state: {obs: {threads: true}}
16//}
17
18exports.needs = nest({
19 'message.sync.unbox': 'first',
20 'sbot.pull.log': 'first'
21})
22
23exports.create = function (api) {
24 var threadsObs
25
26 return nest('state.obs.threads', function buildThreadObs() {
27 if(threadsObs) return threadsObs
28
29 var initial
30 try { initial = JSON.parse(localStorage.threadsState) }
31 catch (_) { }
32
33 var lastTimestamp = initial ? initial.last : Date.now()
34 var firstTimestamp = initial ? initial.last : Date.now()
35
36 function unbox () {
37 return pull(
38 pull.map(function (data) {
39 lastTimestamp = data.timestamp
40 if(isObject(data.value.content)) return data
41 return api.message.sync.unbox(data)
42 }),
43 pull.filter(Boolean)
44 )
45 }
46
47 threadsObs = PullObv(
48 threadReduce,
49 pull(
50 Next(function () {
51 return api.sbot.pull.log({reverse: true, limit: 500, lt: lastTimestamp})
52 }),
53 pull.through(function (data) {
54 lastTimestamp = data.timestamp
55 }),
56 unbox()
57 ),
58 //value recovered from localStorage
59 initial
60 )
61
62 threadsObs(function (threadsState) {
63 if(threadsState.ended && threadsState.ended !== true)
64 console.error('threadObs error:', threadsState.ended)
65 })
66
67 var timer
68 //keep localStorage up to date
69 threadsObs(function (threadsState) {
70 clearTimeout(timer)
71 setTimeout(function () {
72 threadsState.last = lastTimestamp
73 localStorage.threadsState = JSON.stringify(threadsState)
74 }, 1000)
75 })
76
77 //stream live messages. this *should* work.
78 //there is no back pressure on new events
79 //only a show more on the top (currently)
80
81 pull(
82 Next(function () {
83 return api.sbot.pull.log({reverse: true, limit: 500, gte: firstTimestamp})
84 }),
85 pull.drain(function (data) {
86 firstTimestamp = data.timestamp
87 threadsObs.set(threadReduce(threadsObs.value, data))
88 })
89 )
90 return threadsObs
91 })
92)
93
94}
95
96
97
98
99
100
101

Built with git-ssb-web