Files: f5dde01a771f059ca6c79e815d0d184dbc80523e / state / obs.js
2216 bytesRaw
1 | var PullObv = require('pull-obv') |
2 | var threadReduce = require('ssb-reduce-stream') |
3 | var pull = require('pull-stream') |
4 | const Next = require('pull-next') |
5 | |
6 | function isObject (o) { |
7 | return 'object' === typeof o |
8 | } |
9 | |
10 | exports.gives = { |
11 | state: {obs: {threads: true}} |
12 | } |
13 | |
14 | exports.needs = { |
15 | message: {sync: {unbox: 'first'}}, |
16 | sbot: {pull: {log: 'first'}} |
17 | } |
18 | |
19 | exports.create = function (api) { |
20 | |
21 | var initial |
22 | try { initial = JSON.parse(localStorage.threadsState) } |
23 | catch (_) { } |
24 | |
25 | var lastTimestamp = initial ? initial.last : Date.now() |
26 | var firstTimestamp = initial ? initial.last : Date.now() |
27 | |
28 | function unbox () { |
29 | return pull( |
30 | pull.map(function (data) { |
31 | lastTimestamp = data.timestamp |
32 | if(isObject(data.value.content)) return data |
33 | return api.message.sync.unbox(data) |
34 | }), |
35 | pull.filter(Boolean) |
36 | ) |
37 | } |
38 | |
39 | var threadsObs = PullObv( |
40 | threadReduce, |
41 | pull( |
42 | Next(function () { |
43 | console.log('More', lastTimestamp) |
44 | return api.sbot.pull.log({reverse: true, limit: 500, lt: lastTimestamp}) |
45 | }), |
46 | pull.through(function (data) { |
47 | console.log(data.timestamp) |
48 | lastTimestamp = data.timestamp |
49 | }), |
50 | unbox() |
51 | ), |
52 | //value recovered from localStorage |
53 | initial |
54 | ) |
55 | |
56 | threadsObs(function (threadsState) { |
57 | if(threadsState.ended && threadsState.ended !== true) |
58 | console.error('threadObs error:', threadsState.ended) |
59 | }) |
60 | |
61 | var timer |
62 | //keep localStorage up to date |
63 | threadsObs(function (threadsState) { |
64 | console.log(threadsState) |
65 | clearTimeout(timer) |
66 | setTimeout(function () { |
67 | threadsState.last = lastTimestamp |
68 | localStorage.threadsState = JSON.stringify(threadsState) |
69 | }, 1000) |
70 | }) |
71 | |
72 | //stream live messages. this *should* work. |
73 | //there is no back pressure on new events |
74 | //only a show more on the top (currently) |
75 | |
76 | // pull( |
77 | // Next(function () { |
78 | // return api.sbot.pull.log({reverse: true, limit: 500, gte: firstTimestamp}) |
79 | // }), |
80 | // pull.drain(function (data) { |
81 | // firstTimestamp = data.timestamp |
82 | // threadsObs.set(threadReduce(threadsObs.value, data)) |
83 | // }) |
84 | // ) |
85 | |
86 | return {state: {obs: {threads: function () { |
87 | return threadsObs |
88 | }}}} |
89 | |
90 | } |
91 | |
92 | |
93 | |
94 | |
95 | |
96 | |
97 |
Built with git-ssb-web