Commit c77c7618fb3abd31a3d1b797af55b9f21a318173
add setTimeout to throttle actor shutdown
Signed-off-by: wanderer <mjbecze@gmail.com>wanderer committed on 1/4/2018, 8:39:54 AM
Parent: 818c7e2b325f1dac05933b54857443ddadc56c93
Files changed
actor.js | changed |
inbox.js | changed |
scheduler.js | changed |
tests/index.js | changed |
actor.js | ||
---|---|---|
@@ -1,7 +1,6 @@ | ||
1 | 1 | const Pipe = require('buffer-pipe') |
2 | 2 | const Cap = require('primea-capability') |
3 | -const Message = require('primea-message') | |
4 | 3 | const leb128 = require('leb128').unsigned |
5 | 4 | const Inbox = require('./inbox.js') |
6 | 5 | |
7 | 6 | module.exports = class Actor { |
@@ -64,21 +63,20 @@ | ||
64 | 63 | |
65 | 64 | // waits for the next message |
66 | 65 | async _startMessageLoop () { |
67 | 66 | // this ensure we only every have one loop running at a time |
68 | - while (1) { | |
69 | - const message = await this.inbox.nextMessage(0, true) | |
70 | - if (!message) break | |
71 | - | |
72 | - // run the next message | |
67 | + while (!this.inbox.isEmpty) { | |
68 | + const message = await this.inbox.nextMessage(0) | |
73 | 69 | await this.runMessage(message) |
74 | 70 | } |
75 | 71 | this.running = false |
76 | 72 | // wait for state ops to finish |
77 | 73 | await this.state.done() |
78 | - if (!this.running) { | |
79 | - this.container.onIdle() | |
80 | - } | |
74 | + setTimeout(() => { | |
75 | + if (!this.running) { | |
76 | + this.container.onIdle() | |
77 | + } | |
78 | + }, 0) | |
81 | 79 | } |
82 | 80 | |
83 | 81 | serializeMetaData () { |
84 | 82 | return Actor.serializeMetaData(this.type, this.nonce) |
inbox.js | ||
---|---|---|
@@ -29,8 +29,12 @@ | ||
29 | 29 | this._oldestMessageResolve = resolve |
30 | 30 | }) |
31 | 31 | } |
32 | 32 | |
33 | + get isEmpty () { | |
34 | + return !this._queue.length | |
35 | + } | |
36 | + | |
33 | 37 | /** |
34 | 38 | * queues a message |
35 | 39 | * @param {Message} message |
36 | 40 | */ |
@@ -69,25 +73,16 @@ | ||
69 | 73 | * Waits for the the next message if any |
70 | 74 | * @param {Integer} timeout |
71 | 75 | * @returns {Promise} |
72 | 76 | */ |
73 | - nextMessage (timeout, getCurrent = false) { | |
77 | + async nextMessage (timeout) { | |
74 | 78 | if (!this._gettingNextMessage) { |
75 | - this._gettingNextMessage = this._nextMessage(timeout) | |
76 | - this._gettingNextMessage.then(() => { | |
77 | - this._gettingNextMessage = false | |
78 | - }) | |
79 | - } else if (!getCurrent) { | |
79 | + this._gettingNextMessage = true | |
80 | + } else { | |
80 | 81 | throw new Error('already waiting for next message') |
81 | 82 | } |
82 | - return this._gettingNextMessage | |
83 | - } | |
84 | 83 | |
85 | - async _nextMessage (timeout) { | |
86 | 84 | let message = this._getOldestMessage() |
87 | - if (message === undefined && timeout === 0) { | |
88 | - return | |
89 | - } | |
90 | 85 | |
91 | 86 | timeout += this.actor.ticks |
92 | 87 | let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id) |
93 | 88 | |
@@ -122,8 +117,9 @@ | ||
122 | 117 | }) |
123 | 118 | ]) |
124 | 119 | oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id) |
125 | 120 | } |
121 | + this._gettingNextMessage = false | |
126 | 122 | return this._deQueueMessage() |
127 | 123 | } |
128 | 124 | |
129 | 125 | // returns a promise that resolve when a message older then the given message |
scheduler.js | ||
---|---|---|
@@ -12,9 +12,8 @@ | ||
12 | 12 | */ |
13 | 13 | constructor () { |
14 | 14 | this._waits = [] |
15 | 15 | this._running = new Set() |
16 | - this._checkingWaits = false | |
17 | 16 | this.instances = new SortedMap(comparator) |
18 | 17 | } |
19 | 18 | |
20 | 19 | /** |
@@ -109,10 +108,8 @@ | ||
109 | 108 | // if there are no instances, clear any remaining waits |
110 | 109 | if (!this.instances.size) { |
111 | 110 | this._waits.forEach(wait => wait.resolve()) |
112 | 111 | this._waits = [] |
113 | - this._checkingWaits = false | |
114 | - | |
115 | 112 | return |
116 | 113 | } |
117 | 114 | |
118 | 115 | // find the old container, see if any of the waits can be resolved |
@@ -139,12 +136,8 @@ | ||
139 | 136 | } |
140 | 137 | instance.ticks = oldest |
141 | 138 | this._update(instance) |
142 | 139 | } |
143 | - this._checkingWaits = false | |
144 | - | |
145 | 140 | return this._checkWaits() |
146 | 141 | } |
147 | - | |
148 | - this._checkingWaits = false | |
149 | 142 | } |
150 | 143 | } |
tests/index.js | ||
---|---|---|
@@ -436,56 +436,8 @@ | ||
436 | 436 | const stateRoot = await hypervisor.createStateRoot() |
437 | 437 | t.deepEquals(stateRoot, expectedState, 'expected root!') |
438 | 438 | }) |
439 | 439 | |
440 | -tape('return while waiting for tag', async t => { | |
441 | - t.plan(4) | |
442 | - const expectedState = { | |
443 | - '/': Buffer.from('b8eb399087a990e30373e954b627a9512c9af40b', 'hex') | |
444 | - } | |
445 | - | |
446 | - const tree = new RadixTree({ | |
447 | - db: db | |
448 | - }) | |
449 | - | |
450 | - class testVMContainerA extends BaseContainer { | |
451 | - async onMessage (m) { | |
452 | - if (m.tag === 1) { | |
453 | - t.true(m, 'should recive second message') | |
454 | - } else { | |
455 | - t.true(m, 'should recive first message') | |
456 | - const rCap = this.actor.mintCap(1) | |
457 | - const message = new Message({caps: [rCap]}) | |
458 | - this.actor.send(m.caps[0], message) | |
459 | - this.actor.inbox.nextTaggedMessage([1], 44) | |
460 | - } | |
461 | - } | |
462 | - } | |
463 | - | |
464 | - class testVMContainerB extends BaseContainer { | |
465 | - onMessage (m) { | |
466 | - t.true(m, 'should recive a message') | |
467 | - this.actor.send(m.caps[0], new Message()) | |
468 | - } | |
469 | - | |
470 | - static get typeId () { | |
471 | - return 8 | |
472 | - } | |
473 | - } | |
474 | - | |
475 | - const hypervisor = new Hypervisor(tree) | |
476 | - hypervisor.registerContainer(testVMContainerA) | |
477 | - hypervisor.registerContainer(testVMContainerB) | |
478 | - | |
479 | - let capA = await hypervisor.createActor(testVMContainerA.typeId, new Message()) | |
480 | - let capB = await hypervisor.createActor(testVMContainerB.typeId, new Message()) | |
481 | - | |
482 | - hypervisor.send(capA, new Message({caps: [capB]})) | |
483 | - | |
484 | - const stateRoot = await hypervisor.createStateRoot() | |
485 | - t.deepEquals(stateRoot, expectedState, 'expected root!') | |
486 | -}) | |
487 | - | |
488 | 440 | tape('trying to listen for caps more then once', async t => { |
489 | 441 | t.plan(4) |
490 | 442 | const expectedState = { |
491 | 443 | '/': Buffer.from('b8eb399087a990e30373e954b627a9512c9af40b', 'hex') |
@@ -497,9 +449,8 @@ | ||
497 | 449 | |
498 | 450 | class testVMContainerA extends BaseContainer { |
499 | 451 | async onMessage (m) { |
500 | 452 | t.true(m, 'should recive first message') |
501 | - const rCap = this.actor.mintCap(1) | |
502 | 453 | const message = new Message({data: 'first'}) |
503 | 454 | this.actor.send(m.caps[0], message) |
504 | 455 | const promise = this.actor.inbox.nextTaggedMessage([1], 44) |
505 | 456 | try { |
Built with git-ssb-web