git ssb

2+

mixmix / ticktack



Tree: c150758bf6e1b53d32f0c670914cc96ff228b33b

Files: c150758bf6e1b53d32f0c670914cc96ff228b33b / state / obs.js

2216 bytesRaw
1var PullObv = require('pull-obv')
2var threadReduce = require('ssb-reduce-stream')
3var pull = require('pull-stream')
4const Next = require('pull-next')
5
6function isObject (o) {
7 return 'object' === typeof o
8}
9
10exports.gives = {
11 state: {obs: {threads: true}}
12}
13
14exports.needs = {
15 message: {sync: {unbox: 'first'}},
16 sbot: {pull: {log: 'first'}}
17}
18
19exports.create = function (api) {
20
21 var initial
22 try { initial = JSON.parse(localStorage.threadsState) }
23 catch (_) { }
24
25 var lastTimestamp = initial ? initial.last : Date.now()
26 var firstTimestamp = initial ? initial.last : Date.now()
27
28 function unbox () {
29 return pull(
30 pull.map(function (data) {
31 lastTimestamp = data.timestamp
32 if(isObject(data.value.content)) return data
33 return api.message.sync.unbox(data)
34 }),
35 pull.filter(Boolean)
36 )
37 }
38
39 var threadsObs = PullObv(
40 threadReduce,
41 pull(
42 Next(function () {
43 console.log('More', lastTimestamp)
44 return api.sbot.pull.log({reverse: true, limit: 500, lt: lastTimestamp})
45 }),
46 pull.through(function (data) {
47 console.log(data.timestamp)
48 lastTimestamp = data.timestamp
49 }),
50 unbox()
51 ),
52 //value recovered from localStorage
53 initial
54 )
55
56 threadsObs(function (threadsState) {
57 if(threadsState.ended && threadsState.ended !== true)
58 console.error('threadObs error:', threadsState.ended)
59 })
60
61 var timer
62 //keep localStorage up to date
63 threadsObs(function (threadsState) {
64 console.log(threadsState)
65 clearTimeout(timer)
66 setTimeout(function () {
67 threadsState.last = lastTimestamp
68 localStorage.threadsState = JSON.stringify(threadsState)
69 }, 1000)
70 })
71
72 //stream live messages. this *should* work.
73 //there is no back pressure on new events
74 //only a show more on the top (currently)
75
76// pull(
77// Next(function () {
78// return api.sbot.pull.log({reverse: true, limit: 500, gte: firstTimestamp})
79// }),
80// pull.drain(function (data) {
81// firstTimestamp = data.timestamp
82// threadsObs.set(threadReduce(threadsObs.value, data))
83// })
84// )
85
86 return {state: {obs: {threads: function () {
87 return threadsObs
88 }}}}
89
90}
91
92
93
94
95
96
97

Built with git-ssb-web