Commit 5d73da450ff3f8fae6bf5844ed02da668a06e011
fix scheduler lock condition
Signed-off-by: wanderer <mjbecze@gmail.com>wanderer committed on 9/9/2017, 9:44:41 PM
Parent: 2d384c9d532ddc23ea6089a835bb4b338df7bd85
Files changed
portManager.js | changed |
scheduler.js | changed |
tests/index.js | changed |
portManager.js | ||
---|---|---|
@@ -193,42 +193,47 @@ | ||
193 | 193 | */ |
194 | 194 | async getNextMessage (ports = this.ports, timeout = Infinity) { |
195 | 195 | let message = this._peekNextMessage(ports) |
196 | 196 | let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks() |
197 | + let saturated = false | |
197 | 198 | |
199 | + const findOldestMessage = async () => { | |
200 | + while (// end if we have a message older then slowest containers | |
201 | + !((message && oldestTime >= message._fromTicks) || | |
202 | + // end if there are no messages and this container is the oldest contaner | |
203 | + (!message && oldestTime === this.kernel.ticks))) { | |
204 | + if (saturated) { | |
205 | + break | |
206 | + } | |
207 | + let ticksToWait = message ? message._fromTicks : this.kernel.ticks | |
208 | + // ticksToWait = ticksToWait > timeout ? timeout : ticksToWait | |
209 | + await Promise.race([ | |
210 | + this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => { | |
211 | + message = this._peekNextMessage(ports) | |
212 | + }), | |
213 | + this._olderMessage(message).then(m => { | |
214 | + message = m | |
215 | + }) | |
216 | + ]) | |
217 | + oldestTime = this.hypervisor.scheduler.leastNumberOfTicks() | |
218 | + } | |
219 | + } | |
220 | + | |
198 | 221 | await Promise.race([ |
199 | 222 | this._whenSaturated().then(() => { |
200 | 223 | message = this._peekNextMessage(ports) |
224 | + saturated = true | |
201 | 225 | }), |
202 | - new Promise(async (resolve, reject) => { | |
203 | - while (// end if we have a message older then slowest containers | |
204 | - !((message && oldestTime >= message._fromTicks) || | |
205 | - // end if there are no messages and this container is the oldest contaner | |
206 | - (!message && oldestTime === this.kernel.ticks))) { | |
207 | - let ticksToWait = message ? message._fromTicks : this.kernel.ticks | |
208 | - // ticksToWait = ticksToWait > timeout ? timeout : ticksToWait | |
226 | + findOldestMessage() | |
227 | + ]) | |
209 | 228 | |
210 | - await Promise.race([ | |
211 | - this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => { | |
212 | - message = this._peekNextMessage(ports) | |
213 | - }), | |
214 | - this._olderMessage(message).then(m => { | |
215 | - message = m | |
216 | - }) | |
217 | - ]) | |
218 | - | |
219 | - oldestTime = this.hypervisor.scheduler.leastNumberOfTicks() | |
220 | - } | |
221 | - resolve() | |
222 | - }) | |
223 | - ]) | |
224 | 229 | return message |
225 | 230 | } |
226 | 231 | |
227 | 232 | // tests wether or not all the ports have a message |
228 | 233 | _isSaturated (ports) { |
229 | 234 | const values = Object.values(ports) |
230 | - return values.length ? values.every(port => port.messages.length) : false | |
235 | + return values.length === 0 ? true : values.every(port => port.messages.length !== 0) | |
231 | 236 | } |
232 | 237 | |
233 | 238 | // returns a promise that resolve when the ports are saturated |
234 | 239 | _whenSaturated () { |
scheduler.js | ||
---|---|---|
@@ -27,8 +27,9 @@ | ||
27 | 27 | * updates an instance with a new tick count |
28 | 28 | * @param {Object} instance - a container instance |
29 | 29 | */ |
30 | 30 | update (instance) { |
31 | + this._waits = this._waits.filter(wait => wait.id !== instance.id) | |
31 | 32 | this._update(instance) |
32 | 33 | this._running.add(instance.id) |
33 | 34 | this._checkWaits() |
34 | 35 | } |
tests/index.js | ||
---|---|---|
@@ -397,9 +397,9 @@ | ||
397 | 397 | ports: [portRef4] |
398 | 398 | }) |
399 | 399 | |
400 | 400 | this.kernel.incrementTicks(6) |
401 | - return Promise.all([ | |
401 | + await Promise.all([ | |
402 | 402 | this.kernel.send(creationPort, message1), |
403 | 403 | this.kernel.send(creationPort, message2), |
404 | 404 | this.kernel.send(portRef1, this.kernel.createMessage()), |
405 | 405 | this.kernel.send(portRef3, this.kernel.createMessage()), |
@@ -470,8 +470,9 @@ | ||
470 | 470 | } |
471 | 471 | })) |
472 | 472 | |
473 | 473 | hypervisor.pin(root) |
474 | + root = await hypervisor.getInstance(root.id) | |
474 | 475 | |
475 | 476 | const [portRef1, portRef2] = root.ports.createChannel() |
476 | 477 | const [portRef3, portRef4] = root.ports.createChannel() |
477 | 478 |
Built with git-ssb-web