git ssb

6+

Dominic / epidemic-broadcast-trees



Tree: c79c1ed68cdb08e095da574b14e78174ef71b2b3

Files: c79c1ed68cdb08e095da574b14e78174ef71b2b3 / index.js

6066 bytesRaw
1var S = require('./state')
2var u = require('./util')
3var isNote = u.isNote
4var isMessage = u.isMessage
5var progress = require('./progress')
6
7function oldest(ready, states) {
8 //could do ready.sort but that is O(n*log(n)) (i think?) so faster to iterate
9
10 var min = null
11 for(var i = ready.length - 1; i >= 0; i--) {
12 if(!isMessage(ready[i].ready))
13 ready.splice(i, 1) //this item is not a ready message (any more) remove from queue)
14 else if(min == null)
15 min = i
16 else if (ready[i].ready.timestamp < ready[i].ready.timestamp) min = i
17 }
18
19 if(min != null) {
20 var state = ready[min]
21 ready.splice(i, 1)
22 return state
23 }
24
25}
26
27function Next () {
28 var fn
29 return function next (_fn) {
30 if(fn) {
31 if(_fn) throw new Error('already waiting! '+fn.toString())
32 else {
33 _fn = fn; fn = null; _fn()
34 }
35 }
36 else {
37 fn = _fn
38 }
39 }
40}
41
42function toEnd(err) {
43 return err === true ? null : err
44}
45
46//get, append are tied to the protocol
47//seqs, onChange, onRequest, callback are tied to the instance.
48module.exports = function (get, append) {
49
50 return function (opts, callback) {
51 if('function' === typeof opts)
52 callback = opts, opts = {}
53
54 var readyMsg = [], readyNote = {}
55 onChange = opts.onChange || require('./bounce')(function () {
56 console.log(progress(states))
57 }, 1000)
58
59 //called if this feed is has not been requested
60 var onRequest = opts.onRequest || function (id, seq) {
61 stream.request(id, 0)
62 }
63
64 function maybeQueue(key, state) {
65 if('string' !== typeof key) throw new Error('key should be string')
66 if('object' !== typeof state)
67 throw new Error('state should be object')
68
69 if(isMessage(state.ready))
70 readyMsg.push(state)
71 else if(isNote(state.ready))
72 readyNote[key] = true
73 }
74
75 var states = {}, error
76
77 var next = Next()
78 function checkNote (k) {
79 if(isNote(states[k].effect)) {
80 get(k, states[k].effect, function (err, msg) {
81 if(msg) {
82 maybeQueue(k, states[k] = S.gotMessage(states[k], msg))
83 if(states[k].ready) next()
84 }
85 })
86 }
87 }
88
89 var stream = {
90 sink: function (read) {
91 read(null, function cb (err, data) {
92 //handle errors and aborts
93 if(err && !error) { //if this sink got an error before source was aborted.
94 callback(toEnd(error = err))
95 }
96 if(error) return read(error, function () {})
97
98 if(isMessage(data)) {
99 if(!states[data.author]) throw new Error('received strange author')
100 maybeQueue(data.author, states[data.author] = S.receiveMessage(states[data.author], data))
101 if(isMessage(states[data.author].effect)) {//append this message
102 states[data.author].effect = null
103 // *** append MUST call onAppend before the callback ***
104 //for performance, append should verify + queue the append, but not write to database.
105 //also note, there may be other messages which have been received
106 //and we could theirfore do parallel calls to append, but would make this
107 //code quite complex.
108 append(data, function (err) {
109 onChange()
110 read(null, cb)
111 next()
112 })
113 }
114 else
115 read(null, cb)
116
117 next()
118 }
119 else {
120 var ready = false
121
122 for(var k in data) {
123 //if we havn't requested this yet, see if we want it.
124 //if we _don't want it_ we should say, otherwise
125 //they'll ask us again next time.
126 if(!states[k]) onRequest(k, data[k])
127
128 maybeQueue(k, states[k] = S.receiveNote(states[k], data[k]))
129 if(states[k].ready != null)
130 ready = true
131 checkNote(k)
132 }
133
134 if(ready) next()
135 onChange()
136 read(null, cb)
137 }
138 })
139 },
140 source: function (abort, cb) {
141 //if there are any states with a message to send, take the oldest one.
142 //else, collect all states with a note, and send as a bundle.
143 //handle errors and aborts
144 if(abort) {
145 if(!error) //if the source was aborted before the sink got an error
146 return callback(toEnd(error = abort))
147 else
148 error = abort
149 }
150 ;(function read () {
151 //this happens when the client
152 if(error) return cb(error)
153
154 var state
155 if(readyMsg.length && (state = oldest(readyMsg)) && isMessage(state.ready)) {
156 var msg = state.ready
157 maybeQueue(msg.author, state = S.read(state))
158 checkNote(msg.author)
159 onChange()
160 cb(null, msg)
161 }
162 else {
163 var notes = {}, n = 0
164
165 for(k in readyNote) {
166 if(isNote(states[k].ready)) {
167 n ++
168 notes[k] = states[k].ready
169 states[k] = S.read(states[k])
170 checkNote(k)
171 }
172 }
173
174 readyNote = {}
175
176 onChange()
177 if(n) cb(null, notes)
178 else next(read)
179 }
180 })()
181 },
182 progress: function () {
183 return progress(states)
184 },
185 onAppend: function (msg) {
186 var k = msg.author
187 //TMP, call a user provided function to decide how to handle this.
188 if(!states[k]) maybeQueue(k, states[k] = S.init(msg.sequence))
189 if(states[k]) {
190 maybeQueue(k, states[k] = S.appendMessage(states[k], msg))
191 checkNote(k)
192 next()
193 }
194 },
195 request: function (id, seq) {
196 if(!states[id]) {
197 states[id] = S.init(seq)
198 readyNote[id] = true
199 }
200 },
201 states: states
202 }
203
204 if(opts.seqs) {
205 for(var k in opts.seqs)
206 stream.request(k, opts.seqs[k])
207 }
208 return stream
209
210 }
211}
212
213
214
215
216
217
218
219
220

Built with git-ssb-web