Commit ccad428636797b9b8b88aedf21978ba1e991bb60
refactor common thread obs into function
Dominic Tarr committed on 8/14/2017, 12:31:29 PMParent: 9586c1b4818398af637ad0672a2db42e5487ab29
Files changed
state/obs.js | changed |
state/obs.js | ||
---|---|---|
@@ -27,51 +27,57 @@ | ||
27 | 27 | // catch (_) { } |
28 | 28 | // |
29 | 29 | |
30 | 30 | initial = {} |
31 | - var lastTimestamp = initial ? initial.last : Date.now() | |
32 | - var firstTimestamp = initial ? initial.first || Date.now() : Date.now() | |
33 | 31 | |
34 | - function unbox () { | |
35 | - return pull( | |
36 | - pull.map(function (data) { | |
37 | - lastTimestamp = data.timestamp | |
38 | - if(isObject(data.value.content)) return data | |
39 | - return api.message.sync.unbox(data) | |
40 | - }), | |
41 | - pull.filter(Boolean) | |
32 | + function createStateObs (threadReduce, createStream, initial) { | |
33 | + var lastTimestamp = initial ? initial.last : Date.now() | |
34 | + var firstTimestamp = initial ? initial.first || Date.now() : Date.now() | |
35 | + | |
36 | + function unbox () { | |
37 | + return pull( | |
38 | + pull.map(function (data) { | |
39 | + lastTimestamp = data.timestamp | |
40 | + if(isObject(data.value.content)) return data | |
41 | + return api.message.sync.unbox(data) | |
42 | + }), | |
43 | + pull.filter(Boolean) | |
44 | + ) | |
45 | + } | |
46 | + | |
47 | + var threadsObs = PullObv( | |
48 | + threadReduce, | |
49 | + pull( | |
50 | + Next(function () { | |
51 | + return api.sbot.pull.log({reverse: true, limit: 500, lt: lastTimestamp}) | |
52 | + }), | |
53 | + pull.through(function (data) { | |
54 | + lastTimestamp = data.timestamp | |
55 | + }), | |
56 | + unbox() | |
57 | + ), | |
58 | + //value recovered from localStorage | |
59 | + initial | |
42 | 60 | ) |
43 | - } | |
44 | 61 | |
45 | - threadsObs = PullObv( | |
46 | - threadReduce, | |
62 | + //stream live messages. this *should* work. | |
63 | + //there is no back pressure on new events | |
64 | + //only a show more on the top (currently) | |
47 | 65 | pull( |
48 | 66 | Next(function () { |
49 | - return api.sbot.pull.log({reverse: true, limit: 500, lt: lastTimestamp}) | |
67 | + return api.sbot.pull.log({limit: 500, gt: firstTimestamp, live: true}) | |
50 | 68 | }), |
51 | - pull.through(function (data) { | |
52 | - lastTimestamp = data.timestamp | |
53 | - }), | |
54 | - unbox() | |
55 | - ), | |
56 | - //value recovered from localStorage | |
57 | - initial | |
58 | - ) | |
69 | + pull.drain(function (data) { | |
70 | + if(data.sync) return | |
71 | + firstTimestamp = data.timestamp | |
72 | + threadsObs.set(threadReduce(threadsObs.value, data)) | |
73 | + }) | |
74 | + ) | |
59 | 75 | |
60 | - //stream live messages. this *should* work. | |
61 | - //there is no back pressure on new events | |
62 | - //only a show more on the top (currently) | |
63 | - pull( | |
64 | - Next(function () { | |
65 | - return api.sbot.pull.log({limit: 500, gt: firstTimestamp, live: true}) | |
66 | - }), | |
67 | - pull.drain(function (data) { | |
68 | - if(data.sync) return | |
69 | - firstTimestamp = data.timestamp | |
70 | - threadsObs.set(threadReduce(threadsObs.value, data)) | |
71 | - }) | |
72 | - ) | |
76 | + return threadsObs | |
77 | + } | |
73 | 78 | |
79 | + threadsObs = createStateObs(threadReduce, null, initial) | |
74 | 80 | |
75 | 81 | threadsObs(function (threadsState) { |
76 | 82 | if(threadsState.ended && threadsState.ended !== true) |
77 | 83 | console.error('threadObs error:', threadsState.ended) |
Built with git-ssb-web