git ssb

2+

mixmix / ticktack



Commit c150758bf6e1b53d32f0c670914cc96ff228b33b

refactor threadsObs out into state topic

Dominic Tarr committed on 8/11/2017, 11:07:11 AM
Parent: d12d963d1dc2340bf7726e17d111c407fda58c7d

Files changed

index.jschanged
state/obs.jsadded
index.jsView
@@ -2,8 +2,9 @@
22 app: require('./app'),
33 blob: require('./blob'),
44 router: require('./router'),
55 styles: require('./styles'),
6- translations: require('./translations/sync')
6+ translations: require('./translations/sync'),
7+ state: require('./state/obs')
78 }
89
910 module.exports = ticktack
state/obs.jsView
@@ -1,0 +1,96 @@
1+var PullObv = require('pull-obv')
2+var threadReduce = require('ssb-reduce-stream')
3+var pull = require('pull-stream')
4+const Next = require('pull-next')
5+
6+function isObject (o) {
7+ return 'object' === typeof o
8+}
9+
10+exports.gives = {
11+ state: {obs: {threads: true}}
12+}
13+
14+exports.needs = {
15+ message: {sync: {unbox: 'first'}},
16+ sbot: {pull: {log: 'first'}}
17+}
18+
19+exports.create = function (api) {
20+
21+ var initial
22+ try { initial = JSON.parse(localStorage.threadsState) }
23+ catch (_) { }
24+
25+ var lastTimestamp = initial ? initial.last : Date.now()
26+ var firstTimestamp = initial ? initial.last : Date.now()
27+
28+ function unbox () {
29+ return pull(
30+ pull.map(function (data) {
31+ lastTimestamp = data.timestamp
32+ if(isObject(data.value.content)) return data
33+ return api.message.sync.unbox(data)
34+ }),
35+ pull.filter(Boolean)
36+ )
37+ }
38+
39+ var threadsObs = PullObv(
40+ threadReduce,
41+ pull(
42+ Next(function () {
43+ console.log('More', lastTimestamp)
44+ return api.sbot.pull.log({reverse: true, limit: 500, lt: lastTimestamp})
45+ }),
46+ pull.through(function (data) {
47+ console.log(data.timestamp)
48+ lastTimestamp = data.timestamp
49+ }),
50+ unbox()
51+ ),
52+ //value recovered from localStorage
53+ initial
54+ )
55+
56+ threadsObs(function (threadsState) {
57+ if(threadsState.ended && threadsState.ended !== true)
58+ console.error('threadObs error:', threadsState.ended)
59+ })
60+
61+ var timer
62+ //keep localStorage up to date
63+ threadsObs(function (threadsState) {
64+ console.log(threadsState)
65+ clearTimeout(timer)
66+ setTimeout(function () {
67+ threadsState.last = lastTimestamp
68+ localStorage.threadsState = JSON.stringify(threadsState)
69+ }, 1000)
70+ })
71+
72+ //stream live messages. this *should* work.
73+ //there is no back pressure on new events
74+ //only a show more on the top (currently)
75+
76+// pull(
77+// Next(function () {
78+// return api.sbot.pull.log({reverse: true, limit: 500, gte: firstTimestamp})
79+// }),
80+// pull.drain(function (data) {
81+// firstTimestamp = data.timestamp
82+// threadsObs.set(threadReduce(threadsObs.value, data))
83+// })
84+// )
85+
86+ return {state: {obs: {threads: function () {
87+ return threadsObs
88+ }}}}
89+
90+}
91+
92+
93+
94+
95+
96+

Built with git-ssb-web