git ssb

6+

Dominic / epidemic-broadcast-trees



Tree: 61537ea49c1a1f22f2d8f0ef263fdbef5f2d5e7d

Files: 61537ea49c1a1f22f2d8f0ef263fdbef5f2d5e7d / index.js

6369 bytesRaw
1var S = require('./state')
2var u = require('./util')
3var isNote = u.isNote
4var isMessage = u.isMessage
5var progress = require('./progress')
6
7function oldest(ready, states) {
8 //could do ready.sort but that is O(n*log(n)) (i think?) so faster to iterate
9
10 var min = null
11 for(var i = ready.length - 1; i >= 0; i--) {
12 if(!isMessage(ready[i].ready))
13 ready.splice(i, 1) //this item is not a ready message (any more) remove from queue)
14 else if(min == null)
15 min = i
16 else if (ready[i].ready.timestamp < ready[i].ready.timestamp) min = i
17 }
18
19 if(min != null) {
20 var state = ready[min]
21 ready.splice(i, 1)
22 return state
23 }
24
25}
26
27function Next () {
28 var fn
29 return function next (_fn) {
30 if(fn) {
31 if(_fn) throw new Error('already waiting! '+fn.toString())
32 else {
33 _fn = fn; fn = null; _fn()
34 }
35 }
36 else {
37 fn = _fn
38 }
39 }
40}
41
42function toEnd(err) {
43 return err === true ? null : err
44}
45
46//get, append are tied to the protocol
47//seqs, onChange, onRequest, callback are tied to the instance.
48module.exports = function (get, append) {
49
50 return function (opts, callback) {
51 if('function' === typeof opts)
52 callback = opts, opts = {}
53
54 var readyMsg = [], readyNote = {}
55 onChange = opts.onChange || require('./bounce')(function () {
56 console.log(progress(states))
57 }, 1000)
58
59 //called if this feed is has not been requested
60 var onRequest = opts.onRequest || function (id, seq) {
61 stream.request(id, 0)
62 }
63
64 function maybeQueue(key, state) {
65 if('string' !== typeof key) throw new Error('key should be string')
66 if('object' !== typeof state)
67 throw new Error('state should be object')
68
69 if(isMessage(state.ready))
70 readyMsg.push(state)
71 else if(isNote(state.ready))
72 readyNote[key] = true
73 }
74
75 var states = {}, error
76
77 var next = Next()
78 function checkNote (k) {
79 if(isNote(states[k].effect)) {
80 get(k, states[k].effect, function (err, msg) {
81 if(msg) {
82 maybeQueue(k, states[k] = S.gotMessage(states[k], msg))
83 if(states[k].ready) next()
84 }
85 })
86 }
87 }
88
89 var stream = {
90 sink: function (read) {
91 read(null, function cb (err, data) {
92 //handle errors and aborts
93 if(err && !error) { //if this sink got an error before source was aborted.
94 callback(toEnd(error = err))
95 }
96 if(error) return read(error, function () {})
97
98 if(isMessage(data)) {
99 maybeQueue(data.author, states[data.author] = S.receiveMessage(states[data.author], data))
100 if(isMessage(states[data.author].effect)) {//append this message
101 states[data.author].effect = null
102 // *** append MUST call onAppend before the callback ***
103 //for performance, append should verify + queue the append, but not write to database.
104 //also note, there may be other messages which have been received
105 //and we could theirfore do parallel calls to append, but would make this
106 //code quite complex.
107 append(data, function (err) {
108 onChange()
109 read(null, cb)
110 next()
111 })
112 }
113 else
114 read(null, cb)
115
116 next()
117 }
118 else {
119 var ready = false
120
121 for(var k in data) {
122 //if we havn't requested this yet, see if we want it.
123 //if we _don't want it_ we should say, otherwise
124 //they'll ask us again next time.
125 if(!states[k]) {
126 states[k] = S.init(null)
127 onRequest(k, data[k])
128 }
129 maybeQueue(k, states[k] = S.receiveNote(states[k], data[k]))
130 if(states[k].ready != null)
131 ready = true
132 checkNote(k)
133 }
134
135 if(ready) next()
136 onChange()
137 read(null, cb)
138 }
139 })
140 },
141 source: function (abort, cb) {
142 //if there are any states with a message to send, take the oldest one.
143 //else, collect all states with a note, and send as a bundle.
144 //handle errors and aborts
145 if(abort) {
146 if(!error) //if the source was aborted before the sink got an error
147 return callback(toEnd(error = abort))
148 else
149 error = abort
150 }
151 ;(function read () {
152 //this happens when the client
153 if(error) return cb(error)
154
155 var state
156 if(readyMsg.length && (state = oldest(readyMsg)) && isMessage(state.ready)) {
157 var msg = state.ready
158 maybeQueue(msg.author, state = S.read(state))
159 checkNote(msg.author)
160 onChange()
161 cb(null, msg)
162 }
163 else {
164 var notes = {}, n = 0
165
166 for(k in readyNote) {
167 if(isNote(states[k].ready)) {
168 n ++
169 notes[k] = states[k].ready
170 states[k] = S.read(states[k])
171 checkNote(k)
172 }
173 }
174
175 readyNote = {}
176
177 onChange()
178 if(n) cb(null, notes)
179 else next(read)
180 }
181 })()
182 },
183 progress: function () {
184 return progress(states)
185 },
186 onAppend: function (msg) {
187 var k = msg.author
188 //TMP, call a user provided function to decide how to handle this.
189 if(!states[k]) maybeQueue(k, states[k] = S.init(msg.sequence))
190 if(states[k]) {
191 maybeQueue(k, states[k] = S.appendMessage(states[k], msg))
192 checkNote(k)
193 next()
194 }
195 },
196 request: function (id, seq) {
197 //only allow updates if it's gonna change the state.
198 if(!states[id]) {
199 states[id] = S.init(seq)
200 readyNote[id] = true
201 }
202 else if(
203 states[id].local.seq == null ||
204 states[id].local.seq == -1 ||
205 (seq === -1 && states[id].local.seq != -1)
206 ) {
207 states[id].ready = seq
208 readyNote[id] = true
209 next()
210 }
211 },
212 states: states
213 }
214
215 if(opts.seqs) {
216 for(var k in opts.seqs)
217 stream.request(k, opts.seqs[k])
218 }
219 return stream
220
221 }
222}
223
224

Built with git-ssb-web