git ssb

2+

mixmix / ticktack



Tree: 31dca712149ea746e5ee65adb698bb4ce4a82c4e

Files: 31dca712149ea746e5ee65adb698bb4ce4a82c4e / state / obs.js

3333 bytesRaw
1var PullObv = require('pull-obv')
2var threadReduce = require('ssb-reduce-stream')
3var pull = require('pull-stream')
4var Next = require('pull-next')
5
6var nest = require('depnest')
7
8function isObject (o) {
9 return 'object' === typeof o
10}
11
12exports.gives = nest({
13 'state.obs.threads': true,
14 'state.obs.channel': true
15})
16
17exports.needs = nest({
18 'message.sync.unbox': 'first',
19 'sbot.pull.log': 'first',
20 'feed.pull.channel': 'first'
21})
22
23exports.create = function (api) {
24 var threadsObs
25
26 function createStateObs (reduce, createStream, opts, initial) {
27 var lastTimestamp = opts.last || Date.now()
28 var firstTimestamp = opts.first || Date.now()
29
30 function unbox () {
31 return pull(
32 pull.map(function (data) {
33 if(isObject(data.value.content)) return data
34 return api.message.sync.unbox(data)
35 }),
36 pull.filter(Boolean)
37 )
38 }
39
40 var obs = PullObv(
41 reduce,
42 pull(
43 Next(function () {
44 return createStream({reverse: true, limit: 500, lt: lastTimestamp})
45 }),
46 pull.through(function (data) {
47 lastTimestamp = data.timestamp
48 }),
49 unbox()
50 ),
51 //value recovered from localStorage
52 initial
53 )
54
55 //stream live messages. this *should* work.
56 //there is no back pressure on new events
57 //only a show more on the top (currently)
58 pull(
59 Next(function () {
60 return createStream({limit: 500, gt: firstTimestamp, live: true})
61 }),
62 pull.drain(function (data) {
63 if(data.sync) return
64 firstTimestamp = data.timestamp
65 obs.set(reduce(threadsObs.value, data))
66 })
67 )
68
69 return obs
70 }
71
72
73 return nest({
74 'state.obs.channel': function (channel) {
75
76 return createStateObs(
77 threadReduce,
78 function (opts) {
79 return opts.reverse ?
80 api.feed.pull.channel(channel)(opts):
81 pull(api.sbot.pull.log(opts), pull.filter(function (data) {
82 if(data.sync) return false
83 return data.value.content.channel === channel
84 }))
85 },
86 {}
87 )
88
89 // var channelObs = PullObv(
90 // threadReduce,
91 // createChannelStream({reverse: true, limit: 1000})
92 // )
93
94
95 },
96 'state.obs.threads': function buildThreadObs() {
97 if(threadsObs) return threadsObs
98
99 // DISABLE localStorage cache. mainly disabling this to make debugging the other stuff
100 // easier. maybe re-enable this later? also, should this be for every channel too? not sure.
101
102 // var initial
103 // try { initial = JSON.parse(localStorage.threadsState) }
104 // catch (_) { }
105
106 initial = {}
107
108 threadsObs = createStateObs(threadReduce, api.sbot.pull.log, initial, {})
109
110 threadsObs(function (threadsState) {
111 if(threadsState.ended && threadsState.ended !== true)
112 console.error('threadObs error:', threadsState.ended)
113 })
114
115 // var timer
116 // //keep localStorage up to date
117 // threadsObs(function (threadsState) {
118 // if(timer) return
119 // timer = setTimeout(function () {
120 // timer = null
121 // threadsState.last = lastTimestamp
122 // console.log('save state')
123 // localStorage.threadsState = JSON.stringify(threadsState)
124 // }, 1000)
125 // })
126
127 return threadsObs
128 }
129 })
130}
131
132
133
134
135

Built with git-ssb-web