Files: da8b90d365d449e737c606b1ba23cc612834778e / index.js
7041 bytesRaw
1 | |
2 | var S = require('./state') |
3 | var u = require('./util') |
4 | var isNote = u.isNote |
5 | var isNotes = u.isNotes |
6 | var isMessage = u.isMessage |
7 | var progress = require('./progress') |
8 | |
9 | function oldest(ready, states) { |
10 | //could do ready.sort but that is O(n*log(n)) (i think?) so faster to iterate |
11 | |
12 | var min = null |
13 | for(var i = ready.length - 1; i >= 0; i--) { |
14 | if(!isMessage(ready[i].ready)) |
15 | ready.splice(i, 1) //this item is not a ready message (any more) remove from queue) |
16 | else if(min == null) |
17 | min = i |
18 | else if (ready[i].ready.timestamp < ready[i].ready.timestamp) min = i |
19 | } |
20 | |
21 | if(min != null) { |
22 | var state = ready[min] |
23 | ready.splice(i, 1) |
24 | return state |
25 | } |
26 | |
27 | } |
28 | |
29 | function Next () { |
30 | var fn |
31 | return function next (_fn) { |
32 | if(fn) { |
33 | if(_fn) throw new Error('already waiting! '+fn.toString()) |
34 | else { |
35 | _fn = fn; fn = null; _fn() |
36 | } |
37 | } |
38 | else { |
39 | fn = _fn |
40 | } |
41 | } |
42 | } |
43 | |
44 | function toEnd(err) { |
45 | return err === true ? null : err |
46 | } |
47 | |
48 | //get, append are tied to the protocol |
49 | //seqs, onChange, onRequest, callback are tied to the instance. |
50 | module.exports = function (get, append) { |
51 | |
52 | return function (opts, callback) { |
53 | if('function' === typeof opts) |
54 | callback = opts, opts = {} |
55 | |
56 | var readyMsg = [], readyNote = {} |
57 | var onChange = opts.onChange || require('./bounce')(function () { |
58 | console.log(progress(states)) |
59 | }, 1000) |
60 | |
61 | //called if this feed is has not been requested |
62 | var onRequest = opts.onRequest || function (id, seq) { |
63 | stream.request(id, 0) |
64 | } |
65 | |
66 | function maybeQueue(key, state) { |
67 | if('string' !== typeof key) throw new Error('key should be string') |
68 | if('object' !== typeof state) |
69 | throw new Error('state should be object') |
70 | |
71 | if(isMessage(state.ready)) |
72 | readyMsg.push(state) |
73 | else if(isNote(state.ready)) |
74 | readyNote[key] = true |
75 | } |
76 | |
77 | var states = {}, error |
78 | |
79 | var next = Next() |
80 | function checkNote (k) { |
81 | if(isNote(states[k].effect)) { |
82 | get(k, states[k].effect, function (err, msg) { |
83 | if(msg) { |
84 | maybeQueue(k, states[k] = S.gotMessage(states[k], msg)) |
85 | if(states[k].ready) next() |
86 | } |
87 | }) |
88 | } |
89 | } |
90 | |
91 | var stream = { |
92 | sink: function (read) { |
93 | read(null, function cb (err, data) { |
94 | //handle errors and aborts |
95 | if(err && !error) { //if this sink got an error before source was aborted. |
96 | callback(toEnd(error = err)) |
97 | } |
98 | if(error) return read(error, function () {}) |
99 | |
100 | if(isMessage(data)) { |
101 | maybeQueue(data.author, states[data.author] = S.receiveMessage(states[data.author], data)) |
102 | if(isMessage(states[data.author].effect)) {//append this message |
103 | states[data.author].effect = null |
104 | // *** append MUST call onAppend before the callback *** |
105 | //for performance, append should verify + queue the append, but not write to database. |
106 | //also note, there may be other messages which have been received |
107 | //and we could theirfore do parallel calls to append, but would make this |
108 | //code quite complex. |
109 | append(data, function (err) { |
110 | onChange() |
111 | read(null, cb) |
112 | next() |
113 | }) |
114 | } |
115 | else |
116 | read(null, cb) |
117 | |
118 | next() |
119 | } |
120 | else if (isNotes(data)) { |
121 | var ready = false |
122 | |
123 | for(var k in data) { |
124 | //if we havn't requested this yet, see if we want it. |
125 | //if we _don't want it_ we should say, otherwise |
126 | //they'll ask us again next time. |
127 | if(!states[k]) { |
128 | states[k] = S.init(null) |
129 | onRequest(k, data[k]) |
130 | } |
131 | maybeQueue(k, states[k] = S.receiveNote(states[k], data[k])) |
132 | if(states[k].ready != null) |
133 | ready = true |
134 | checkNote(k) |
135 | } |
136 | |
137 | if(ready) next() |
138 | onChange() |
139 | read(null, cb) |
140 | } |
141 | else |
142 | cb(new TypeError('invalid data')) |
143 | }) |
144 | }, |
145 | source: function (abort, cb) { |
146 | //if there are any states with a message to send, take the oldest one. |
147 | //else, collect all states with a note, and send as a bundle. |
148 | //handle errors and aborts |
149 | if(abort) { |
150 | if(!error) //if the source was aborted before the sink got an error |
151 | return callback(toEnd(error = abort)) |
152 | else |
153 | error = abort |
154 | } |
155 | ;(function read () { |
156 | //this happens when the client |
157 | if(error) return cb(error) |
158 | |
159 | var state |
160 | if(readyMsg.length && (state = oldest(readyMsg)) && isMessage(state.ready)) { |
161 | var msg = state.ready |
162 | maybeQueue(msg.author, state = S.read(state)) |
163 | checkNote(msg.author) |
164 | onChange() |
165 | cb(null, msg) |
166 | } |
167 | else { |
168 | var notes = {}, n = 0 |
169 | |
170 | for(k in readyNote) { |
171 | if(isNote(states[k].ready)) { |
172 | n ++ |
173 | notes[k] = states[k].ready |
174 | states[k] = S.read(states[k]) |
175 | checkNote(k) |
176 | } |
177 | } |
178 | |
179 | readyNote = {} |
180 | |
181 | onChange() |
182 | if(n) cb(null, notes) |
183 | else next(read) |
184 | } |
185 | })() |
186 | }, |
187 | progress: function () { |
188 | return progress(states) |
189 | }, |
190 | onAppend: function (msg) { |
191 | var k = msg.author |
192 | //TMP, call a user provided function to decide how to handle this. |
193 | // if(!states[k]) { |
194 | // console.log('ON_APPEND', msg.author, msg.sequence) |
195 | // maybeQueue(k, states[k] = S.init(msg.sequence)) |
196 | // } |
197 | // |
198 | if(states[k]) { |
199 | maybeQueue(k, states[k] = S.appendMessage(states[k], msg)) |
200 | checkNote(k) |
201 | next() |
202 | } |
203 | }, |
204 | //but what if you want to push in a bunch |
205 | //of items but not trigger next just yet? |
206 | request: function (id, seq, isSingle) { |
207 | //only allow updates if it's gonna change the state. |
208 | //this section should move into state.js |
209 | if(!Number.isInteger(seq)) |
210 | throw new Error('ebt.stream.request(seq): seq must be integer') |
211 | if(!states[id]) { |
212 | states[id] = S.init(seq) |
213 | readyNote[id] = true |
214 | } |
215 | else if( |
216 | states[id].local.seq == null || |
217 | states[id].local.seq == -1 || |
218 | (seq === -1 && states[id].local.seq != -1) |
219 | ) { |
220 | //MUST set the local state, otherwise receiveMessage won't work. |
221 | if(seq >= 0) states[id].local.seq = seq |
222 | states[id].ready = seq |
223 | readyNote[id] = true |
224 | if(isSingle !== false) { |
225 | next() |
226 | } |
227 | } |
228 | }, |
229 | states: states, |
230 | next: next |
231 | } |
232 | |
233 | if(opts.seqs) { |
234 | for(var k in opts.seqs) |
235 | stream.request(k, opts.seqs[k]) |
236 | } |
237 | return stream |
238 | |
239 | } |
240 | } |
241 | |
242 |
Built with git-ssb-web