git ssb

6+

Dominic / epidemic-broadcast-trees



Commit c79c1ed68cdb08e095da574b14e78174ef71b2b3

Merge branch 'request'

Dominic Tarr committed on 5/6/2017, 12:16:54 AM
Parent: ec16e1429ac292be5b2db5a83aa29a9f0ffffbf6
Parent: 9cee0d09a7c1c37afcf65080d167442c46a0b8fe

Files changed

example.jschanged
index.jschanged
test/stream.jschanged
example.jsView
@@ -30,9 +30,8 @@
3030 //and only one value per instance (instead of potentially many named events)
3131
3232
3333 var stream = createEbtStream(
34- vectorClock,
3534 //pass a get(id, seq, cb)
3635 function (id, seq, cb) {
3736 if(!chat.logs[id] || !chat.logs[id][seq-1])
3837 return cb(new Error('not found'))
@@ -42,9 +41,11 @@
4241 function (msg, cb) {
4342 chat.append(msg)
4443 cb()
4544 }
46- )
45 + ) ({
46 + seqs: vectorClock,
47 + })
4748
4849 chat.onAppend(stream.onAppend)
4950
5051 return stream
@@ -73,4 +74,6 @@
7374 exports.createChatModel = createChatModel
7475 exports.createStream = createStream
7576
7677
78 +
79 +
index.jsView
@@ -42,147 +42,178 @@
4242 function toEnd(err) {
4343 return err === true ? null : err
4444 }
4545
46-module.exports = function (seqs, get, append, onChange, callback) {
46 +//get, append are tied to the protocol
47 +//seqs, onChange, onRequest, callback are tied to the instance.
48 +module.exports = function (get, append) {
4749
48- var readyMsg = [], readyNote = {}
49- onChange = onChange || require('./bounce')(function () {
50- console.log(progress(states))
51- }, 1000)
50 + return function (opts, callback) {
51 + if('function' === typeof opts)
52 + callback = opts, opts = {}
5253
53- function maybeQueue(key, state) {
54- if('string' !== typeof key) throw new Error('key should be string')
55- if('object' !== typeof state)
56- throw new Error('state should be object')
54 + var readyMsg = [], readyNote = {}
55 + onChange = opts.onChange || require('./bounce')(function () {
56 + console.log(progress(states))
57 + }, 1000)
5758
58- if(isMessage(state.ready))
59- readyMsg.push(state)
60- else if(isNote(state.ready))
61- readyNote[key] = true
62- }
59 + //called if this feed is has not been requested
60 + var onRequest = opts.onRequest || function (id, seq) {
61 + stream.request(id, 0)
62 + }
6363
64- var states = {}, error
65- for(var k in seqs)
66- states[k] = S.init(seqs[k])
64 + function maybeQueue(key, state) {
65 + if('string' !== typeof key) throw new Error('key should be string')
66 + if('object' !== typeof state)
67 + throw new Error('state should be object')
6768
68- var next = Next()
69- function checkNote (k) {
70- if(isNote(states[k].effect)) {
71- get(k, states[k].effect, function (err, msg) {
72- if(msg) {
73- maybeQueue(k, states[k] = S.gotMessage(states[k], msg))
74- if(states[k].ready) next()
75- }
76- })
69 + if(isMessage(state.ready))
70 + readyMsg.push(state)
71 + else if(isNote(state.ready))
72 + readyNote[key] = true
7773 }
78- }
79- for(var k in seqs)
80- readyNote[k] = true
8174
82- var stream
83- return stream = {
84- sink: function (read) {
85- read(null, function cb (err, data) {
86- //handle errors and aborts
87- if(err && !error) { //if this sink got an error before source was aborted.
88- callback(toEnd(error = err))
89- }
90- if(error) return read(error, function () {})
75 + var states = {}, error
9176
92- if(isMessage(data)) {
93- if(!states[data.author]) throw new Error('received strange author')
94- maybeQueue(data.author, states[data.author] = S.receiveMessage(states[data.author], data))
95- if(isMessage(states[data.author].effect)) {//append this message
96- states[data.author].effect = null
97- // *** append MUST call onAppend before the callback ***
98- //for performance, append should verify + queue the append, but not write to database.
99- //also note, there may be other messages which have been received
100- //and we could theirfore do parallel calls to append, but would make this
101- //code quite complex.
102- append(data, function (err) {
103- onChange()
104- read(null, cb)
105- next()
106- })
77 + var next = Next()
78 + function checkNote (k) {
79 + if(isNote(states[k].effect)) {
80 + get(k, states[k].effect, function (err, msg) {
81 + if(msg) {
82 + maybeQueue(k, states[k] = S.gotMessage(states[k], msg))
83 + if(states[k].ready) next()
10784 }
108- else
109- read(null, cb)
85 + })
86 + }
87 + }
11088
111- next()
112- }
113- else {
114- var ready = false
89 + var stream = {
90 + sink: function (read) {
91 + read(null, function cb (err, data) {
92 + //handle errors and aborts
93 + if(err && !error) { //if this sink got an error before source was aborted.
94 + callback(toEnd(error = err))
95 + }
96 + if(error) return read(error, function () {})
11597
116- for(var k in data) {
117- //TEMP, just request back anything they ask for...
118- if(!states[k]) states[k] = S.init(0)
119- maybeQueue(k, states[k] = S.receiveNote(states[k], data[k]))
120- if(states[k].ready != null)
121- ready = true
122- checkNote(k)
98 + if(isMessage(data)) {
99 + if(!states[data.author]) throw new Error('received strange author')
100 + maybeQueue(data.author, states[data.author] = S.receiveMessage(states[data.author], data))
101 + if(isMessage(states[data.author].effect)) {//append this message
102 + states[data.author].effect = null
103 + // *** append MUST call onAppend before the callback ***
104 + //for performance, append should verify + queue the append, but not write to database.
105 + //also note, there may be other messages which have been received
106 + //and we could theirfore do parallel calls to append, but would make this
107 + //code quite complex.
108 + append(data, function (err) {
109 + onChange()
110 + read(null, cb)
111 + next()
112 + })
113 + }
114 + else
115 + read(null, cb)
116 +
117 + next()
123118 }
119 + else {
120 + var ready = false
124121
125- if(ready) next()
126- onChange()
127- read(null, cb)
128- }
129- })
130- },
131- source: function (abort, cb) {
132- //if there are any states with a message to send, take the oldest one.
133- //else, collect all states with a note, and send as a bundle.
134- //handle errors and aborts
135- if(abort) {
136- if(!error) //if the source was aborted before the sink got an error
137- return callback(toEnd(error = abort))
138- else
139- error = abort
140- }
141- ;(function read () {
142- //this happens when the client
143- if(error) return cb(error)
122 + for(var k in data) {
123 + //if we havn't requested this yet, see if we want it.
124 + //if we _don't want it_ we should say, otherwise
125 + //they'll ask us again next time.
126 + if(!states[k]) onRequest(k, data[k])
144127
145- var state
146- if(readyMsg.length && (state = oldest(readyMsg)) && isMessage(state.ready)) {
147- var msg = state.ready
148- maybeQueue(msg.author, state = S.read(state))
149- checkNote(msg.author)
150- onChange()
151- cb(null, msg)
152- }
153- else {
154- var notes = {}, n = 0
155-
156- for(k in readyNote) {
157- if(isNote(states[k].ready)) {
158- n ++
159- notes[k] = states[k].ready
160- states[k] = S.read(states[k])
128 + maybeQueue(k, states[k] = S.receiveNote(states[k], data[k]))
129 + if(states[k].ready != null)
130 + ready = true
161131 checkNote(k)
162132 }
133 +
134 + if(ready) next()
135 + onChange()
136 + read(null, cb)
163137 }
138 + })
139 + },
140 + source: function (abort, cb) {
141 + //if there are any states with a message to send, take the oldest one.
142 + //else, collect all states with a note, and send as a bundle.
143 + //handle errors and aborts
144 + if(abort) {
145 + if(!error) //if the source was aborted before the sink got an error
146 + return callback(toEnd(error = abort))
147 + else
148 + error = abort
149 + }
150 + ;(function read () {
151 + //this happens when the client
152 + if(error) return cb(error)
164153
165- readyNote = {}
154 + var state
155 + if(readyMsg.length && (state = oldest(readyMsg)) && isMessage(state.ready)) {
156 + var msg = state.ready
157 + maybeQueue(msg.author, state = S.read(state))
158 + checkNote(msg.author)
159 + onChange()
160 + cb(null, msg)
161 + }
162 + else {
163 + var notes = {}, n = 0
166164
167- onChange()
168- if(n) cb(null, notes)
169- else next(read)
165 + for(k in readyNote) {
166 + if(isNote(states[k].ready)) {
167 + n ++
168 + notes[k] = states[k].ready
169 + states[k] = S.read(states[k])
170 + checkNote(k)
171 + }
172 + }
173 +
174 + readyNote = {}
175 +
176 + onChange()
177 + if(n) cb(null, notes)
178 + else next(read)
179 + }
180 + })()
181 + },
182 + progress: function () {
183 + return progress(states)
184 + },
185 + onAppend: function (msg) {
186 + var k = msg.author
187 + //TMP, call a user provided function to decide how to handle this.
188 + if(!states[k]) maybeQueue(k, states[k] = S.init(msg.sequence))
189 + if(states[k]) {
190 + maybeQueue(k, states[k] = S.appendMessage(states[k], msg))
191 + checkNote(k)
192 + next()
170193 }
171- })()
172- },
173- progress: function () {
174- return progress(states)
175- },
176- onAppend: function (msg) {
177- var k = msg.author
178- //TMP, call a user provided function to decide how to handle this.
179- if(!states[k]) maybeQueue(k, states[k] = S.init(msg.sequence))
180- if(states[k]) {
181- maybeQueue(k, states[k] = S.appendMessage(states[k], msg))
182- checkNote(k)
183- next()
184- }
194 + },
195 + request: function (id, seq) {
196 + if(!states[id]) {
197 + states[id] = S.init(seq)
198 + readyNote[id] = true
199 + }
200 + },
201 + states: states
185202 }
203 +
204 + if(opts.seqs) {
205 + for(var k in opts.seqs)
206 + stream.request(k, opts.seqs[k])
207 + }
208 + return stream
209 +
186210 }
187211 }
188212
213 +
214 +
215 +
216 +
217 +
218 +
219 +
test/stream.jsView
@@ -19,9 +19,8 @@
1919 if(k[0] != '_')
2020 states[k] = logs[k].length
2121
2222 var stream = Stream(
23- states,
2423 function get (id, seq, cb) {
2524 cb(null, logs[id][seq - 1])
2625 },
2726 function append (msg, cb) {
@@ -30,12 +29,14 @@
3029 onAppend(msg)
3130 cb()
3231 }
3332 else cb(new Error('could not append'))
33 + }
34 + ) ({
35 + seqs: states,
36 + onChange: console.log,
3437 },
35- console.log,
36- cb
37- )
38 + cb)
3839
3940 logs._append.push(stream.onAppend)
4041
4142 return stream
@@ -216,4 +217,7 @@
216217 })
217218
218219 })
219220
221 +
222 +
223 +

Built with git-ssb-web