Files: 61537ea49c1a1f22f2d8f0ef263fdbef5f2d5e7d / index.js
6369 bytesRaw
1 | var S = require('./state') |
2 | var u = require('./util') |
3 | var isNote = u.isNote |
4 | var isMessage = u.isMessage |
5 | var progress = require('./progress') |
6 | |
7 | function oldest(ready, states) { |
8 | //could do ready.sort but that is O(n*log(n)) (i think?) so faster to iterate |
9 | |
10 | var min = null |
11 | for(var i = ready.length - 1; i >= 0; i--) { |
12 | if(!isMessage(ready[i].ready)) |
13 | ready.splice(i, 1) //this item is not a ready message (any more) remove from queue) |
14 | else if(min == null) |
15 | min = i |
16 | else if (ready[i].ready.timestamp < ready[i].ready.timestamp) min = i |
17 | } |
18 | |
19 | if(min != null) { |
20 | var state = ready[min] |
21 | ready.splice(i, 1) |
22 | return state |
23 | } |
24 | |
25 | } |
26 | |
27 | function Next () { |
28 | var fn |
29 | return function next (_fn) { |
30 | if(fn) { |
31 | if(_fn) throw new Error('already waiting! '+fn.toString()) |
32 | else { |
33 | _fn = fn; fn = null; _fn() |
34 | } |
35 | } |
36 | else { |
37 | fn = _fn |
38 | } |
39 | } |
40 | } |
41 | |
42 | function toEnd(err) { |
43 | return err === true ? null : err |
44 | } |
45 | |
46 | //get, append are tied to the protocol |
47 | //seqs, onChange, onRequest, callback are tied to the instance. |
48 | module.exports = function (get, append) { |
49 | |
50 | return function (opts, callback) { |
51 | if('function' === typeof opts) |
52 | callback = opts, opts = {} |
53 | |
54 | var readyMsg = [], readyNote = {} |
55 | onChange = opts.onChange || require('./bounce')(function () { |
56 | console.log(progress(states)) |
57 | }, 1000) |
58 | |
59 | //called if this feed is has not been requested |
60 | var onRequest = opts.onRequest || function (id, seq) { |
61 | stream.request(id, 0) |
62 | } |
63 | |
64 | function maybeQueue(key, state) { |
65 | if('string' !== typeof key) throw new Error('key should be string') |
66 | if('object' !== typeof state) |
67 | throw new Error('state should be object') |
68 | |
69 | if(isMessage(state.ready)) |
70 | readyMsg.push(state) |
71 | else if(isNote(state.ready)) |
72 | readyNote[key] = true |
73 | } |
74 | |
75 | var states = {}, error |
76 | |
77 | var next = Next() |
78 | function checkNote (k) { |
79 | if(isNote(states[k].effect)) { |
80 | get(k, states[k].effect, function (err, msg) { |
81 | if(msg) { |
82 | maybeQueue(k, states[k] = S.gotMessage(states[k], msg)) |
83 | if(states[k].ready) next() |
84 | } |
85 | }) |
86 | } |
87 | } |
88 | |
89 | var stream = { |
90 | sink: function (read) { |
91 | read(null, function cb (err, data) { |
92 | //handle errors and aborts |
93 | if(err && !error) { //if this sink got an error before source was aborted. |
94 | callback(toEnd(error = err)) |
95 | } |
96 | if(error) return read(error, function () {}) |
97 | |
98 | if(isMessage(data)) { |
99 | maybeQueue(data.author, states[data.author] = S.receiveMessage(states[data.author], data)) |
100 | if(isMessage(states[data.author].effect)) {//append this message |
101 | states[data.author].effect = null |
102 | // *** append MUST call onAppend before the callback *** |
103 | //for performance, append should verify + queue the append, but not write to database. |
104 | //also note, there may be other messages which have been received |
105 | //and we could theirfore do parallel calls to append, but would make this |
106 | //code quite complex. |
107 | append(data, function (err) { |
108 | onChange() |
109 | read(null, cb) |
110 | next() |
111 | }) |
112 | } |
113 | else |
114 | read(null, cb) |
115 | |
116 | next() |
117 | } |
118 | else { |
119 | var ready = false |
120 | |
121 | for(var k in data) { |
122 | //if we havn't requested this yet, see if we want it. |
123 | //if we _don't want it_ we should say, otherwise |
124 | //they'll ask us again next time. |
125 | if(!states[k]) { |
126 | states[k] = S.init(null) |
127 | onRequest(k, data[k]) |
128 | } |
129 | maybeQueue(k, states[k] = S.receiveNote(states[k], data[k])) |
130 | if(states[k].ready != null) |
131 | ready = true |
132 | checkNote(k) |
133 | } |
134 | |
135 | if(ready) next() |
136 | onChange() |
137 | read(null, cb) |
138 | } |
139 | }) |
140 | }, |
141 | source: function (abort, cb) { |
142 | //if there are any states with a message to send, take the oldest one. |
143 | //else, collect all states with a note, and send as a bundle. |
144 | //handle errors and aborts |
145 | if(abort) { |
146 | if(!error) //if the source was aborted before the sink got an error |
147 | return callback(toEnd(error = abort)) |
148 | else |
149 | error = abort |
150 | } |
151 | ;(function read () { |
152 | //this happens when the client |
153 | if(error) return cb(error) |
154 | |
155 | var state |
156 | if(readyMsg.length && (state = oldest(readyMsg)) && isMessage(state.ready)) { |
157 | var msg = state.ready |
158 | maybeQueue(msg.author, state = S.read(state)) |
159 | checkNote(msg.author) |
160 | onChange() |
161 | cb(null, msg) |
162 | } |
163 | else { |
164 | var notes = {}, n = 0 |
165 | |
166 | for(k in readyNote) { |
167 | if(isNote(states[k].ready)) { |
168 | n ++ |
169 | notes[k] = states[k].ready |
170 | states[k] = S.read(states[k]) |
171 | checkNote(k) |
172 | } |
173 | } |
174 | |
175 | readyNote = {} |
176 | |
177 | onChange() |
178 | if(n) cb(null, notes) |
179 | else next(read) |
180 | } |
181 | })() |
182 | }, |
183 | progress: function () { |
184 | return progress(states) |
185 | }, |
186 | onAppend: function (msg) { |
187 | var k = msg.author |
188 | //TMP, call a user provided function to decide how to handle this. |
189 | if(!states[k]) maybeQueue(k, states[k] = S.init(msg.sequence)) |
190 | if(states[k]) { |
191 | maybeQueue(k, states[k] = S.appendMessage(states[k], msg)) |
192 | checkNote(k) |
193 | next() |
194 | } |
195 | }, |
196 | request: function (id, seq) { |
197 | //only allow updates if it's gonna change the state. |
198 | if(!states[id]) { |
199 | states[id] = S.init(seq) |
200 | readyNote[id] = true |
201 | } |
202 | else if( |
203 | states[id].local.seq == null || |
204 | states[id].local.seq == -1 || |
205 | (seq === -1 && states[id].local.seq != -1) |
206 | ) { |
207 | states[id].ready = seq |
208 | readyNote[id] = true |
209 | next() |
210 | } |
211 | }, |
212 | states: states |
213 | } |
214 | |
215 | if(opts.seqs) { |
216 | for(var k in opts.seqs) |
217 | stream.request(k, opts.seqs[k]) |
218 | } |
219 | return stream |
220 | |
221 | } |
222 | } |
223 | |
224 |
Built with git-ssb-web