Commit d30608cc6c438ea4527a506e061d183c30099168
tested saturation
wanderer committed on 6/27/2017, 10:22:50 PMParent: 710c6eda4be5c2e63702e608a3b03af9c647ee2e
Files changed
exoInterface.js | changed |
scheduler.js | changed |
tests/index.js | changed |
exoInterface.js | ||
---|---|---|
@@ -47,51 +47,45 @@ | ||
47 | 47 | // waits for the next message |
48 | 48 | async _runNextMessage () { |
49 | 49 | // check if the ports are saturated, if so we don't have to wait on the |
50 | 50 | // scheduler |
51 | - try { | |
52 | - let message = this.ports.peekNextMessage() | |
53 | - let saturated = this.ports.isSaturated() | |
54 | - let oldestTime = this.hypervisor.scheduler.smallest() | |
51 | + let message = this.ports.peekNextMessage() | |
52 | + let saturated = this.ports.isSaturated() | |
53 | + let oldestTime = this.hypervisor.scheduler.smallest() | |
55 | 54 | |
56 | - // this.hypervisor.scheduler.print() | |
57 | - while (!saturated && | |
58 | - !((message && oldestTime >= message._fromTicks) || | |
59 | - (!message && (oldestTime === this.ticks || !this.hypervisor.scheduler._running.size)))) { | |
60 | - const ticksToWait = message ? message._fromTicks : this.ticks | |
55 | + while (!saturated && | |
56 | + !(message && oldestTime >= message._fromTicks || | |
57 | + !message && oldestTime === this.ticks)) { | |
58 | + const ticksToWait = message ? message._fromTicks : this.ticks | |
61 | 59 | |
62 | - await Promise.race([ | |
63 | - this.hypervisor.scheduler.wait(ticksToWait, this.id).then(m => { | |
64 | - // this.hypervisor.scheduler.print() | |
65 | - message = this.ports.peekNextMessage() | |
66 | - }), | |
67 | - this.ports.olderMessage(message).then(m => { | |
68 | - message = m | |
69 | - }), | |
70 | - this.ports.whenSaturated().then(() => { | |
71 | - saturated = true | |
72 | - }) | |
73 | - ]) | |
60 | + await Promise.race([ | |
61 | + this.hypervisor.scheduler.wait(ticksToWait, this.id).then(m => { | |
62 | + message = this.ports.peekNextMessage() | |
63 | + }), | |
64 | + this.ports.olderMessage(message).then(m => { | |
65 | + message = m | |
66 | + }), | |
67 | + this.ports.whenSaturated().then(() => { | |
68 | + saturated = true | |
69 | + message = this.ports.peekNextMessage() | |
70 | + }) | |
71 | + ]) | |
74 | 72 | |
75 | - oldestTime = this.hypervisor.scheduler.smallest() | |
76 | - saturated = this.ports.isSaturated() | |
77 | - } | |
73 | + oldestTime = this.hypervisor.scheduler.smallest() | |
74 | + saturated = this.ports.isSaturated() | |
75 | + } | |
78 | 76 | |
79 | - if (!message) { | |
80 | - // if no more messages then shut down | |
81 | - this.hypervisor.scheduler.done(this) | |
82 | - return | |
83 | - } | |
84 | - | |
77 | + if (!message) { | |
78 | + // if no more messages then shut down | |
79 | + this.hypervisor.scheduler.done(this) | |
80 | + } else { | |
85 | 81 | message.fromPort.messages.shift() |
86 | 82 | if (message._fromTicks > this.ticks) { |
87 | 83 | this.ticks = message._fromTicks |
88 | 84 | } |
89 | 85 | this.hypervisor.scheduler.update(this) |
90 | 86 | // run the next message |
91 | 87 | this.run(message) |
92 | - } catch (e) { | |
93 | - console.log(e) | |
94 | 88 | } |
95 | 89 | } |
96 | 90 | |
97 | 91 | /** |
scheduler.js | ||
---|---|---|
@@ -34,9 +34,8 @@ | ||
34 | 34 | _update (instance) { |
35 | 35 | this._running.add(instance.id) |
36 | 36 | this.instances.delete(instance.id) |
37 | 37 | const instanceArray = [...this.instances] |
38 | - // console.log(instanceArray) | |
39 | 38 | binarySearchInsert(instanceArray, instancesComparator, [instance.id, instance]) |
40 | 39 | this.instances = new Map(instanceArray) |
41 | 40 | } |
42 | 41 | |
@@ -51,19 +50,15 @@ | ||
51 | 50 | } |
52 | 51 | |
53 | 52 | wait (ticks = Infinity, id) { |
54 | 53 | this._running.delete(id) |
55 | - if (!this.locks.size && ticks <= this.smallest()) { | |
56 | - return Promise.resolve() | |
57 | - } else { | |
58 | - return new Promise((resolve, reject) => { | |
59 | - binarySearchInsert(this._waits, comparator, { | |
60 | - ticks: ticks, | |
61 | - resolve: resolve | |
62 | - }) | |
63 | - this._checkWaits() | |
54 | + return new Promise((resolve, reject) => { | |
55 | + binarySearchInsert(this._waits, comparator, { | |
56 | + ticks: ticks, | |
57 | + resolve: resolve | |
64 | 58 | }) |
65 | - } | |
59 | + this._checkWaits() | |
60 | + }) | |
66 | 61 | } |
67 | 62 | |
68 | 63 | smallest () { |
69 | 64 | return this.instances.size ? [...this.instances][0][1].ticks : 0 |
@@ -77,30 +72,23 @@ | ||
77 | 72 | this._waits.forEach(wait => wait.resolve()) |
78 | 73 | this._waits = [] |
79 | 74 | } else if (!this._running.size) { |
80 | 75 | const smallest = this._waits[0].ticks |
81 | - const toUpdate = [] | |
82 | 76 | for (let instance of this.instances) { |
83 | 77 | instance = instance[1] |
84 | - const ticks = instance.ticks | |
85 | - if (ticks > smallest) { | |
78 | + if (instance.ticks > smallest) { | |
86 | 79 | break |
87 | 80 | } else { |
88 | - toUpdate.push(instance) | |
81 | + instance.ticks = smallest | |
82 | + this._update(instance) | |
89 | 83 | } |
90 | 84 | } |
91 | - toUpdate.forEach(instance => { | |
92 | - instance.ticks = smallest | |
93 | - this._update(instance) | |
94 | - }) | |
95 | - this._checkWaits() | |
85 | + return this._checkWaits() | |
96 | 86 | } else { |
97 | 87 | const smallest = this.smallest() |
98 | 88 | for (const index in this._waits) { |
99 | 89 | const wait = this._waits[index] |
100 | 90 | if (wait.ticks <= smallest) { |
101 | - // this.print() | |
102 | - // console.log('resolve', wait.ticks) | |
103 | 91 | wait.resolve() |
104 | 92 | } else { |
105 | 93 | this._waits.splice(0, index) |
106 | 94 | break |
tests/index.js | ||
---|---|---|
@@ -182,18 +182,76 @@ | ||
182 | 182 | runs++ |
183 | 183 | const one = this.exInterface.ports.create('first') |
184 | 184 | const two = this.exInterface.ports.create('second') |
185 | 185 | |
186 | + this.exInterface.ports.bind('two', two) | |
186 | 187 | this.exInterface.ports.bind('one', one) |
187 | - this.exInterface.ports.bind('two', two) | |
188 | 188 | |
189 | 189 | await Promise.all([ |
190 | 190 | this.exInterface.send(one, this.exInterface.createMessage()), |
191 | 191 | this.exInterface.send(two, this.exInterface.createMessage()) |
192 | 192 | ]) |
193 | 193 | } else if (runs === 1) { |
194 | 194 | runs++ |
195 | + t.equals(m.data, 'first', 'should recive the first message') | |
196 | + } else if (runs === 2) { | |
195 | 197 | t.equals(m.data, 'second', 'should recived the second message') |
198 | + } | |
199 | + } | |
200 | + } | |
201 | + | |
202 | + class First extends BaseContainer { | |
203 | + run (m) { | |
204 | + this.exInterface.incrementTicks(2) | |
205 | + return this.exInterface.send(m.fromPort, this.exInterface.createMessage({ | |
206 | + data: 'first' | |
207 | + })) | |
208 | + } | |
209 | + } | |
210 | + | |
211 | + class Second extends BaseContainer { | |
212 | + run (m) { | |
213 | + this.exInterface.incrementTicks(3) | |
214 | + return this.exInterface.send(m.fromPort, this.exInterface.createMessage({ | |
215 | + data: 'second' | |
216 | + })) | |
217 | + } | |
218 | + } | |
219 | + | |
220 | + const hypervisor = new Hypervisor(node.dag) | |
221 | + | |
222 | + hypervisor.registerContainer('root', Root) | |
223 | + hypervisor.registerContainer('first', First) | |
224 | + hypervisor.registerContainer('second', Second) | |
225 | + | |
226 | + const root = await hypervisor.createInstance('root') | |
227 | + const port = root.ports.create('root') | |
228 | + root.ports.bind('first', port) | |
229 | + | |
230 | + root.send(port, root.createMessage()) | |
231 | + }) | |
232 | + | |
233 | + tape('message should arrive in the correct oder if sent in order', async t => { | |
234 | + t.plan(2) | |
235 | + let runs = 0 | |
236 | + | |
237 | + class Root extends BaseContainer { | |
238 | + run (m) { | |
239 | + if (!runs) { | |
240 | + runs++ | |
241 | + const one = this.exInterface.ports.create('first') | |
242 | + const two = this.exInterface.ports.create('second') | |
243 | + | |
244 | + this.exInterface.ports.bind('one', one) | |
245 | + this.exInterface.ports.bind('two', two) | |
246 | + | |
247 | + Promise.all([ | |
248 | + this.exInterface.send(one, this.exInterface.createMessage()), | |
249 | + this.exInterface.send(two, this.exInterface.createMessage()) | |
250 | + ]) | |
251 | + } else if (runs === 1) { | |
252 | + runs++ | |
253 | + t.equals(m.data, 'second', 'should recived the second message') | |
196 | 254 | } else if (runs === 2) { |
197 | 255 | t.equals(m.data, 'first', 'should recive the first message') |
198 | 256 | } |
199 | 257 | } |
@@ -283,8 +341,84 @@ | ||
283 | 341 | |
284 | 342 | root.send(port, root.createMessage()) |
285 | 343 | }) |
286 | 344 | |
345 | + tape('saturation', async t => { | |
346 | + t.plan(2) | |
347 | + let runs = 0 | |
348 | + | |
349 | + class Root extends BaseContainer { | |
350 | + run (m) { | |
351 | + if (!runs) { | |
352 | + runs++ | |
353 | + const one = this.exInterface.ports.create('first') | |
354 | + const two = this.exInterface.ports.create('second') | |
355 | + | |
356 | + this.exInterface.ports.bind('two', two) | |
357 | + this.exInterface.ports.bind('one', one) | |
358 | + | |
359 | + Promise.all([ | |
360 | + this.exInterface.send(one, this.exInterface.createMessage()), | |
361 | + this.exInterface.send(two, this.exInterface.createMessage()) | |
362 | + ]) | |
363 | + this.exInterface.incrementTicks(6) | |
364 | + } else if (runs === 1) { | |
365 | + runs++ | |
366 | + t.equals(m.data, 'first', 'should recive the first message') | |
367 | + } else if (runs === 2) { | |
368 | + t.equals(m.data, 'second', 'should recived the second message') | |
369 | + } | |
370 | + } | |
371 | + } | |
372 | + | |
373 | + class First extends BaseContainer { | |
374 | + run (m) { | |
375 | + this.exInterface.incrementTicks(2) | |
376 | + this.exInterface.send(m.fromPort, this.exInterface.createMessage({ | |
377 | + data: 'first' | |
378 | + })) | |
379 | + } | |
380 | + } | |
381 | + | |
382 | + class Second extends BaseContainer { | |
383 | + run (m) { | |
384 | + // this.exInterface.incrementTicks(3) | |
385 | + this.exInterface.incrementTicks(3) | |
386 | + this.exInterface.send(m.fromPort, this.exInterface.createMessage({ | |
387 | + data: 'second' | |
388 | + })) | |
389 | + } | |
390 | + } | |
391 | + | |
392 | + class Waiter extends BaseContainer { | |
393 | + initailize () { | |
394 | + return new Promise((resolve, reject) => { | |
395 | + setTimeout(() => { | |
396 | + resolve() | |
397 | + }, 200) | |
398 | + }) | |
399 | + } | |
400 | + } | |
401 | + | |
402 | + const hypervisor = new Hypervisor(node.dag) | |
403 | + | |
404 | + hypervisor.registerContainer('root', Root) | |
405 | + hypervisor.registerContainer('first', First) | |
406 | + hypervisor.registerContainer('second', Second) | |
407 | + hypervisor.registerContainer('waiter', Waiter) | |
408 | + | |
409 | + const root = await hypervisor.createInstance('root') | |
410 | + const port = root.ports.create('root') | |
411 | + root.ports.bind('first', port) | |
412 | + const port1 = root.ports.create('waiter') | |
413 | + root.ports.bind('sencond', port1) | |
414 | + | |
415 | + await root.send(port, root.createMessage()) | |
416 | + root.incrementTicks(7) | |
417 | + | |
418 | + root.send(port, root.createMessage()) | |
419 | + }) | |
420 | + | |
287 | 421 | tape('message should arrive in the correct order, even in a tie of ticks', async t => { |
288 | 422 | t.plan(2) |
289 | 423 | |
290 | 424 | let runs = 0 |
@@ -576,9 +710,8 @@ | ||
576 | 710 | root.send(port, root.createMessage()) |
577 | 711 | const sr = await hypervisor.createStateRoot() |
578 | 712 | t.deepEquals(sr, expectedSr, 'should produce the corret state root') |
579 | 713 | // await hypervisor.graph.tree(sr, Infinity) |
580 | - // console.log(JSON.stringify(sr, null, 2)) | |
581 | 714 | |
582 | 715 | t.end() |
583 | 716 | }) |
584 | 717 |
Built with git-ssb-web