git ssb

6+

Dominic / epidemic-broadcast-trees



Tree: da8b90d365d449e737c606b1ba23cc612834778e

Files: da8b90d365d449e737c606b1ba23cc612834778e / index.js

7041 bytesRaw
1'use strict'
2var S = require('./state')
3var u = require('./util')
4var isNote = u.isNote
5var isNotes = u.isNotes
6var isMessage = u.isMessage
7var progress = require('./progress')
8
9function oldest(ready, states) {
10 //could do ready.sort but that is O(n*log(n)) (i think?) so faster to iterate
11
12 var min = null
13 for(var i = ready.length - 1; i >= 0; i--) {
14 if(!isMessage(ready[i].ready))
15 ready.splice(i, 1) //this item is not a ready message (any more) remove from queue)
16 else if(min == null)
17 min = i
18 else if (ready[i].ready.timestamp < ready[i].ready.timestamp) min = i
19 }
20
21 if(min != null) {
22 var state = ready[min]
23 ready.splice(i, 1)
24 return state
25 }
26
27}
28
29function Next () {
30 var fn
31 return function next (_fn) {
32 if(fn) {
33 if(_fn) throw new Error('already waiting! '+fn.toString())
34 else {
35 _fn = fn; fn = null; _fn()
36 }
37 }
38 else {
39 fn = _fn
40 }
41 }
42}
43
44function toEnd(err) {
45 return err === true ? null : err
46}
47
48//get, append are tied to the protocol
49//seqs, onChange, onRequest, callback are tied to the instance.
50module.exports = function (get, append) {
51
52 return function (opts, callback) {
53 if('function' === typeof opts)
54 callback = opts, opts = {}
55
56 var readyMsg = [], readyNote = {}
57 var onChange = opts.onChange || require('./bounce')(function () {
58 console.log(progress(states))
59 }, 1000)
60
61 //called if this feed is has not been requested
62 var onRequest = opts.onRequest || function (id, seq) {
63 stream.request(id, 0)
64 }
65
66 function maybeQueue(key, state) {
67 if('string' !== typeof key) throw new Error('key should be string')
68 if('object' !== typeof state)
69 throw new Error('state should be object')
70
71 if(isMessage(state.ready))
72 readyMsg.push(state)
73 else if(isNote(state.ready))
74 readyNote[key] = true
75 }
76
77 var states = {}, error
78
79 var next = Next()
80 function checkNote (k) {
81 if(isNote(states[k].effect)) {
82 get(k, states[k].effect, function (err, msg) {
83 if(msg) {
84 maybeQueue(k, states[k] = S.gotMessage(states[k], msg))
85 if(states[k].ready) next()
86 }
87 })
88 }
89 }
90
91 var stream = {
92 sink: function (read) {
93 read(null, function cb (err, data) {
94 //handle errors and aborts
95 if(err && !error) { //if this sink got an error before source was aborted.
96 callback(toEnd(error = err))
97 }
98 if(error) return read(error, function () {})
99
100 if(isMessage(data)) {
101 maybeQueue(data.author, states[data.author] = S.receiveMessage(states[data.author], data))
102 if(isMessage(states[data.author].effect)) {//append this message
103 states[data.author].effect = null
104 // *** append MUST call onAppend before the callback ***
105 //for performance, append should verify + queue the append, but not write to database.
106 //also note, there may be other messages which have been received
107 //and we could theirfore do parallel calls to append, but would make this
108 //code quite complex.
109 append(data, function (err) {
110 onChange()
111 read(null, cb)
112 next()
113 })
114 }
115 else
116 read(null, cb)
117
118 next()
119 }
120 else if (isNotes(data)) {
121 var ready = false
122
123 for(var k in data) {
124 //if we havn't requested this yet, see if we want it.
125 //if we _don't want it_ we should say, otherwise
126 //they'll ask us again next time.
127 if(!states[k]) {
128 states[k] = S.init(null)
129 onRequest(k, data[k])
130 }
131 maybeQueue(k, states[k] = S.receiveNote(states[k], data[k]))
132 if(states[k].ready != null)
133 ready = true
134 checkNote(k)
135 }
136
137 if(ready) next()
138 onChange()
139 read(null, cb)
140 }
141 else
142 cb(new TypeError('invalid data'))
143 })
144 },
145 source: function (abort, cb) {
146 //if there are any states with a message to send, take the oldest one.
147 //else, collect all states with a note, and send as a bundle.
148 //handle errors and aborts
149 if(abort) {
150 if(!error) //if the source was aborted before the sink got an error
151 return callback(toEnd(error = abort))
152 else
153 error = abort
154 }
155 ;(function read () {
156 //this happens when the client
157 if(error) return cb(error)
158
159 var state
160 if(readyMsg.length && (state = oldest(readyMsg)) && isMessage(state.ready)) {
161 var msg = state.ready
162 maybeQueue(msg.author, state = S.read(state))
163 checkNote(msg.author)
164 onChange()
165 cb(null, msg)
166 }
167 else {
168 var notes = {}, n = 0
169
170 for(k in readyNote) {
171 if(isNote(states[k].ready)) {
172 n ++
173 notes[k] = states[k].ready
174 states[k] = S.read(states[k])
175 checkNote(k)
176 }
177 }
178
179 readyNote = {}
180
181 onChange()
182 if(n) cb(null, notes)
183 else next(read)
184 }
185 })()
186 },
187 progress: function () {
188 return progress(states)
189 },
190 onAppend: function (msg) {
191 var k = msg.author
192 //TMP, call a user provided function to decide how to handle this.
193// if(!states[k]) {
194// console.log('ON_APPEND', msg.author, msg.sequence)
195// maybeQueue(k, states[k] = S.init(msg.sequence))
196// }
197//
198 if(states[k]) {
199 maybeQueue(k, states[k] = S.appendMessage(states[k], msg))
200 checkNote(k)
201 next()
202 }
203 },
204 //but what if you want to push in a bunch
205 //of items but not trigger next just yet?
206 request: function (id, seq, isSingle) {
207 //only allow updates if it's gonna change the state.
208 //this section should move into state.js
209 if(!Number.isInteger(seq))
210 throw new Error('ebt.stream.request(seq): seq must be integer')
211 if(!states[id]) {
212 states[id] = S.init(seq)
213 readyNote[id] = true
214 }
215 else if(
216 states[id].local.seq == null ||
217 states[id].local.seq == -1 ||
218 (seq === -1 && states[id].local.seq != -1)
219 ) {
220 //MUST set the local state, otherwise receiveMessage won't work.
221 if(seq >= 0) states[id].local.seq = seq
222 states[id].ready = seq
223 readyNote[id] = true
224 if(isSingle !== false) {
225 next()
226 }
227 }
228 },
229 states: states,
230 next: next
231 }
232
233 if(opts.seqs) {
234 for(var k in opts.seqs)
235 stream.request(k, opts.seqs[k])
236 }
237 return stream
238
239 }
240}
241
242

Built with git-ssb-web