git ssb

6+

Dominic / epidemic-broadcast-trees



Tree: 86a0d059a6618c5b0e01793ac5a710c66c9b4dda

Files: 86a0d059a6618c5b0e01793ac5a710c66c9b4dda / index.js

6827 bytesRaw
1'use strict'
2var S = require('./state')
3var u = require('./util')
4var isNote = u.isNote
5var isMessage = u.isMessage
6var progress = require('./progress')
7
8function oldest(ready, states) {
9 //could do ready.sort but that is O(n*log(n)) (i think?) so faster to iterate
10
11 var min = null
12 for(var i = ready.length - 1; i >= 0; i--) {
13 if(!isMessage(ready[i].ready))
14 ready.splice(i, 1) //this item is not a ready message (any more) remove from queue)
15 else if(min == null)
16 min = i
17 else if (ready[i].ready.timestamp < ready[i].ready.timestamp) min = i
18 }
19
20 if(min != null) {
21 var state = ready[min]
22 ready.splice(i, 1)
23 return state
24 }
25
26}
27
28function Next () {
29 var fn
30 return function next (_fn) {
31 if(fn) {
32 if(_fn) throw new Error('already waiting! '+fn.toString())
33 else {
34 _fn = fn; fn = null; _fn()
35 }
36 }
37 else {
38 fn = _fn
39 }
40 }
41}
42
43function toEnd(err) {
44 return err === true ? null : err
45}
46
47//get, append are tied to the protocol
48//seqs, onChange, onRequest, callback are tied to the instance.
49module.exports = function (get, append) {
50
51 return function (opts, callback) {
52 if('function' === typeof opts)
53 callback = opts, opts = {}
54
55 var readyMsg = [], readyNote = {}
56 var onChange = opts.onChange || require('./bounce')(function () {
57 console.log(progress(states))
58 }, 1000)
59
60 //called if this feed is has not been requested
61 var onRequest = opts.onRequest || function (id, seq) {
62 stream.request(id, 0)
63 }
64
65 function maybeQueue(key, state) {
66 if('string' !== typeof key) throw new Error('key should be string')
67 if('object' !== typeof state)
68 throw new Error('state should be object')
69
70 if(isMessage(state.ready))
71 readyMsg.push(state)
72 else if(isNote(state.ready))
73 readyNote[key] = true
74 }
75
76 var states = {}, error
77
78 var next = Next()
79 function checkNote (k) {
80 if(isNote(states[k].effect)) {
81 get(k, states[k].effect, function (err, msg) {
82 if(msg) {
83 maybeQueue(k, states[k] = S.gotMessage(states[k], msg))
84 if(states[k].ready) next()
85 }
86 })
87 }
88 }
89
90 var stream = {
91 sink: function (read) {
92 read(null, function cb (err, data) {
93 //handle errors and aborts
94 if(err && !error) { //if this sink got an error before source was aborted.
95 callback(toEnd(error = err))
96 }
97 if(error) return read(error, function () {})
98
99 if(isMessage(data)) {
100 maybeQueue(data.author, states[data.author] = S.receiveMessage(states[data.author], data))
101 if(isMessage(states[data.author].effect)) {//append this message
102 states[data.author].effect = null
103 // *** append MUST call onAppend before the callback ***
104 //for performance, append should verify + queue the append, but not write to database.
105 //also note, there may be other messages which have been received
106 //and we could theirfore do parallel calls to append, but would make this
107 //code quite complex.
108 append(data, function (err) {
109 onChange()
110 read(null, cb)
111 next()
112 })
113 }
114 else
115 read(null, cb)
116
117 next()
118 }
119 else {
120 var ready = false
121
122 for(var k in data) {
123 //if we havn't requested this yet, see if we want it.
124 //if we _don't want it_ we should say, otherwise
125 //they'll ask us again next time.
126 if(!states[k]) {
127 states[k] = S.init(null)
128 onRequest(k, data[k])
129 }
130 maybeQueue(k, states[k] = S.receiveNote(states[k], data[k]))
131 if(states[k].ready != null)
132 ready = true
133 checkNote(k)
134 }
135
136 if(ready) next()
137 onChange()
138 read(null, cb)
139 }
140 })
141 },
142 source: function (abort, cb) {
143 //if there are any states with a message to send, take the oldest one.
144 //else, collect all states with a note, and send as a bundle.
145 //handle errors and aborts
146 if(abort) {
147 if(!error) //if the source was aborted before the sink got an error
148 return callback(toEnd(error = abort))
149 else
150 error = abort
151 }
152 ;(function read () {
153 //this happens when the client
154 if(error) return cb(error)
155
156 var state
157 if(readyMsg.length && (state = oldest(readyMsg)) && isMessage(state.ready)) {
158 var msg = state.ready
159 maybeQueue(msg.author, state = S.read(state))
160 checkNote(msg.author)
161 onChange()
162 cb(null, msg)
163 }
164 else {
165 var notes = {}, n = 0
166
167 for(k in readyNote) {
168 if(isNote(states[k].ready)) {
169 n ++
170 notes[k] = states[k].ready
171 states[k] = S.read(states[k])
172 checkNote(k)
173 }
174 }
175
176 readyNote = {}
177
178 onChange()
179 if(n) cb(null, notes)
180 else next(read)
181 }
182 })()
183 },
184 progress: function () {
185 return progress(states)
186 },
187 onAppend: function (msg) {
188 var k = msg.author
189 //TMP, call a user provided function to decide how to handle this.
190// if(!states[k]) {
191// console.log('ON_APPEND', msg.author, msg.sequence)
192// maybeQueue(k, states[k] = S.init(msg.sequence))
193// }
194//
195 if(states[k]) {
196 maybeQueue(k, states[k] = S.appendMessage(states[k], msg))
197 checkNote(k)
198 next()
199 }
200 },
201 //but what if you want to push in a bunch
202 //of items but not trigger next just yet?
203 request: function (id, seq, isSingle) {
204 //only allow updates if it's gonna change the state.
205 //this section should move into state.js
206 if(!states[id]) {
207 states[id] = S.init(seq)
208 readyNote[id] = true
209 }
210 else if(
211 states[id].local.seq == null ||
212 states[id].local.seq == -1 ||
213 (seq === -1 && states[id].local.seq != -1)
214 ) {
215 //MUST set the local state, otherwise receiveMessage won't work.
216 if(seq >= 0) states[id].local.seq = seq
217 states[id].ready = seq
218 readyNote[id] = true
219 if(isSingle !== false) {
220 next()
221 }
222 }
223 },
224 states: states,
225 next: next
226 }
227
228 if(opts.seqs) {
229 for(var k in opts.seqs)
230 stream.request(k, opts.seqs[k])
231 }
232 return stream
233
234 }
235}
236
237

Built with git-ssb-web