Commit 89f71075d1cca296946814be2c184d128e17d3d1
move function to own scope
Dominic Tarr committed on 8/14/2017, 12:36:45 PMParent: ccad428636797b9b8b88aedf21978ba1e991bb60
Files changed
state/obs.js | changed |
state/obs.js | ||
---|---|---|
@@ -18,8 +18,58 @@ | ||
18 | 18 | |
19 | 19 | exports.create = function (api) { |
20 | 20 | var threadsObs |
21 | 21 | |
22 | + function createStateObs (threadReduce, createStream, opts, initial) { | |
23 | +// var lastTimestamp = initial ? initial.last : Date.now() | |
24 | +// var firstTimestamp = initial ? initial.first || Date.now() : Date.now() | |
25 | + | |
26 | + var lastTimestamp = opts.last || Date.now() | |
27 | + var firstTimestamp = opts.first || Date.now() | |
28 | + | |
29 | + function unbox () { | |
30 | + return pull( | |
31 | + pull.map(function (data) { | |
32 | + if(isObject(data.value.content)) return data | |
33 | + return api.message.sync.unbox(data) | |
34 | + }), | |
35 | + pull.filter(Boolean) | |
36 | + ) | |
37 | + } | |
38 | + | |
39 | + var threadsObs = PullObv( | |
40 | + threadReduce, | |
41 | + pull( | |
42 | + Next(function () { | |
43 | + return createStream({reverse: true, limit: 500, lt: lastTimestamp}) | |
44 | + }), | |
45 | + pull.through(function (data) { | |
46 | + lastTimestamp = data.timestamp | |
47 | + }), | |
48 | + unbox() | |
49 | + ), | |
50 | + //value recovered from localStorage | |
51 | + initial | |
52 | + ) | |
53 | + | |
54 | + //stream live messages. this *should* work. | |
55 | + //there is no back pressure on new events | |
56 | + //only a show more on the top (currently) | |
57 | + pull( | |
58 | + Next(function () { | |
59 | + return createStream({limit: 500, gt: firstTimestamp, live: true}) | |
60 | + }), | |
61 | + pull.drain(function (data) { | |
62 | + if(data.sync) return | |
63 | + firstTimestamp = data.timestamp | |
64 | + threadsObs.set(threadReduce(threadsObs.value, data)) | |
65 | + }) | |
66 | + ) | |
67 | + | |
68 | + return threadsObs | |
69 | + } | |
70 | + | |
71 | + | |
22 | 72 | return nest('state.obs.threads', function buildThreadObs() { |
23 | 73 | if(threadsObs) return threadsObs |
24 | 74 | |
25 | 75 | // var initial |
@@ -28,57 +78,10 @@ | ||
28 | 78 | // |
29 | 79 | |
30 | 80 | initial = {} |
31 | 81 | |
32 | - function createStateObs (threadReduce, createStream, initial) { | |
33 | - var lastTimestamp = initial ? initial.last : Date.now() | |
34 | - var firstTimestamp = initial ? initial.first || Date.now() : Date.now() | |
82 | + threadsObs = createStateObs(threadReduce, api.sbot.pull.log, initial, {}) | |
35 | 83 | |
36 | - function unbox () { | |
37 | - return pull( | |
38 | - pull.map(function (data) { | |
39 | - lastTimestamp = data.timestamp | |
40 | - if(isObject(data.value.content)) return data | |
41 | - return api.message.sync.unbox(data) | |
42 | - }), | |
43 | - pull.filter(Boolean) | |
44 | - ) | |
45 | - } | |
46 | - | |
47 | - var threadsObs = PullObv( | |
48 | - threadReduce, | |
49 | - pull( | |
50 | - Next(function () { | |
51 | - return api.sbot.pull.log({reverse: true, limit: 500, lt: lastTimestamp}) | |
52 | - }), | |
53 | - pull.through(function (data) { | |
54 | - lastTimestamp = data.timestamp | |
55 | - }), | |
56 | - unbox() | |
57 | - ), | |
58 | - //value recovered from localStorage | |
59 | - initial | |
60 | - ) | |
61 | - | |
62 | - //stream live messages. this *should* work. | |
63 | - //there is no back pressure on new events | |
64 | - //only a show more on the top (currently) | |
65 | - pull( | |
66 | - Next(function () { | |
67 | - return api.sbot.pull.log({limit: 500, gt: firstTimestamp, live: true}) | |
68 | - }), | |
69 | - pull.drain(function (data) { | |
70 | - if(data.sync) return | |
71 | - firstTimestamp = data.timestamp | |
72 | - threadsObs.set(threadReduce(threadsObs.value, data)) | |
73 | - }) | |
74 | - ) | |
75 | - | |
76 | - return threadsObs | |
77 | - } | |
78 | - | |
79 | - threadsObs = createStateObs(threadReduce, null, initial) | |
80 | - | |
81 | 84 | threadsObs(function (threadsState) { |
82 | 85 | if(threadsState.ended && threadsState.ended !== true) |
83 | 86 | console.error('threadObs error:', threadsState.ended) |
84 | 87 | }) |
@@ -101,4 +104,6 @@ | ||
101 | 104 | } |
102 | 105 | |
103 | 106 | |
104 | 107 | |
108 | + | |
109 | + |
Built with git-ssb-web