Files: 86a0d059a6618c5b0e01793ac5a710c66c9b4dda / index.js
6827 bytesRaw
1 | |
2 | var S = require('./state') |
3 | var u = require('./util') |
4 | var isNote = u.isNote |
5 | var isMessage = u.isMessage |
6 | var progress = require('./progress') |
7 | |
8 | function oldest(ready, states) { |
9 | //could do ready.sort but that is O(n*log(n)) (i think?) so faster to iterate |
10 | |
11 | var min = null |
12 | for(var i = ready.length - 1; i >= 0; i--) { |
13 | if(!isMessage(ready[i].ready)) |
14 | ready.splice(i, 1) //this item is not a ready message (any more) remove from queue) |
15 | else if(min == null) |
16 | min = i |
17 | else if (ready[i].ready.timestamp < ready[i].ready.timestamp) min = i |
18 | } |
19 | |
20 | if(min != null) { |
21 | var state = ready[min] |
22 | ready.splice(i, 1) |
23 | return state |
24 | } |
25 | |
26 | } |
27 | |
28 | function Next () { |
29 | var fn |
30 | return function next (_fn) { |
31 | if(fn) { |
32 | if(_fn) throw new Error('already waiting! '+fn.toString()) |
33 | else { |
34 | _fn = fn; fn = null; _fn() |
35 | } |
36 | } |
37 | else { |
38 | fn = _fn |
39 | } |
40 | } |
41 | } |
42 | |
43 | function toEnd(err) { |
44 | return err === true ? null : err |
45 | } |
46 | |
47 | //get, append are tied to the protocol |
48 | //seqs, onChange, onRequest, callback are tied to the instance. |
49 | module.exports = function (get, append) { |
50 | |
51 | return function (opts, callback) { |
52 | if('function' === typeof opts) |
53 | callback = opts, opts = {} |
54 | |
55 | var readyMsg = [], readyNote = {} |
56 | var onChange = opts.onChange || require('./bounce')(function () { |
57 | console.log(progress(states)) |
58 | }, 1000) |
59 | |
60 | //called if this feed is has not been requested |
61 | var onRequest = opts.onRequest || function (id, seq) { |
62 | stream.request(id, 0) |
63 | } |
64 | |
65 | function maybeQueue(key, state) { |
66 | if('string' !== typeof key) throw new Error('key should be string') |
67 | if('object' !== typeof state) |
68 | throw new Error('state should be object') |
69 | |
70 | if(isMessage(state.ready)) |
71 | readyMsg.push(state) |
72 | else if(isNote(state.ready)) |
73 | readyNote[key] = true |
74 | } |
75 | |
76 | var states = {}, error |
77 | |
78 | var next = Next() |
79 | function checkNote (k) { |
80 | if(isNote(states[k].effect)) { |
81 | get(k, states[k].effect, function (err, msg) { |
82 | if(msg) { |
83 | maybeQueue(k, states[k] = S.gotMessage(states[k], msg)) |
84 | if(states[k].ready) next() |
85 | } |
86 | }) |
87 | } |
88 | } |
89 | |
90 | var stream = { |
91 | sink: function (read) { |
92 | read(null, function cb (err, data) { |
93 | //handle errors and aborts |
94 | if(err && !error) { //if this sink got an error before source was aborted. |
95 | callback(toEnd(error = err)) |
96 | } |
97 | if(error) return read(error, function () {}) |
98 | |
99 | if(isMessage(data)) { |
100 | maybeQueue(data.author, states[data.author] = S.receiveMessage(states[data.author], data)) |
101 | if(isMessage(states[data.author].effect)) {//append this message |
102 | states[data.author].effect = null |
103 | // *** append MUST call onAppend before the callback *** |
104 | //for performance, append should verify + queue the append, but not write to database. |
105 | //also note, there may be other messages which have been received |
106 | //and we could theirfore do parallel calls to append, but would make this |
107 | //code quite complex. |
108 | append(data, function (err) { |
109 | onChange() |
110 | read(null, cb) |
111 | next() |
112 | }) |
113 | } |
114 | else |
115 | read(null, cb) |
116 | |
117 | next() |
118 | } |
119 | else { |
120 | var ready = false |
121 | |
122 | for(var k in data) { |
123 | //if we havn't requested this yet, see if we want it. |
124 | //if we _don't want it_ we should say, otherwise |
125 | //they'll ask us again next time. |
126 | if(!states[k]) { |
127 | states[k] = S.init(null) |
128 | onRequest(k, data[k]) |
129 | } |
130 | maybeQueue(k, states[k] = S.receiveNote(states[k], data[k])) |
131 | if(states[k].ready != null) |
132 | ready = true |
133 | checkNote(k) |
134 | } |
135 | |
136 | if(ready) next() |
137 | onChange() |
138 | read(null, cb) |
139 | } |
140 | }) |
141 | }, |
142 | source: function (abort, cb) { |
143 | //if there are any states with a message to send, take the oldest one. |
144 | //else, collect all states with a note, and send as a bundle. |
145 | //handle errors and aborts |
146 | if(abort) { |
147 | if(!error) //if the source was aborted before the sink got an error |
148 | return callback(toEnd(error = abort)) |
149 | else |
150 | error = abort |
151 | } |
152 | ;(function read () { |
153 | //this happens when the client |
154 | if(error) return cb(error) |
155 | |
156 | var state |
157 | if(readyMsg.length && (state = oldest(readyMsg)) && isMessage(state.ready)) { |
158 | var msg = state.ready |
159 | maybeQueue(msg.author, state = S.read(state)) |
160 | checkNote(msg.author) |
161 | onChange() |
162 | cb(null, msg) |
163 | } |
164 | else { |
165 | var notes = {}, n = 0 |
166 | |
167 | for(k in readyNote) { |
168 | if(isNote(states[k].ready)) { |
169 | n ++ |
170 | notes[k] = states[k].ready |
171 | states[k] = S.read(states[k]) |
172 | checkNote(k) |
173 | } |
174 | } |
175 | |
176 | readyNote = {} |
177 | |
178 | onChange() |
179 | if(n) cb(null, notes) |
180 | else next(read) |
181 | } |
182 | })() |
183 | }, |
184 | progress: function () { |
185 | return progress(states) |
186 | }, |
187 | onAppend: function (msg) { |
188 | var k = msg.author |
189 | //TMP, call a user provided function to decide how to handle this. |
190 | // if(!states[k]) { |
191 | // console.log('ON_APPEND', msg.author, msg.sequence) |
192 | // maybeQueue(k, states[k] = S.init(msg.sequence)) |
193 | // } |
194 | // |
195 | if(states[k]) { |
196 | maybeQueue(k, states[k] = S.appendMessage(states[k], msg)) |
197 | checkNote(k) |
198 | next() |
199 | } |
200 | }, |
201 | //but what if you want to push in a bunch |
202 | //of items but not trigger next just yet? |
203 | request: function (id, seq, isSingle) { |
204 | //only allow updates if it's gonna change the state. |
205 | //this section should move into state.js |
206 | if(!states[id]) { |
207 | states[id] = S.init(seq) |
208 | readyNote[id] = true |
209 | } |
210 | else if( |
211 | states[id].local.seq == null || |
212 | states[id].local.seq == -1 || |
213 | (seq === -1 && states[id].local.seq != -1) |
214 | ) { |
215 | //MUST set the local state, otherwise receiveMessage won't work. |
216 | if(seq >= 0) states[id].local.seq = seq |
217 | states[id].ready = seq |
218 | readyNote[id] = true |
219 | if(isSingle !== false) { |
220 | next() |
221 | } |
222 | } |
223 | }, |
224 | states: states, |
225 | next: next |
226 | } |
227 | |
228 | if(opts.seqs) { |
229 | for(var k in opts.seqs) |
230 | stream.request(k, opts.seqs[k]) |
231 | } |
232 | return stream |
233 | |
234 | } |
235 | } |
236 | |
237 |
Built with git-ssb-web