portManager.jsView |
---|
1 | 1 | const DeleteMessage = require('./deleteMessage') |
2 | 2 | |
3 | 3 | |
4 | | -function messageArbiter (nameA, nameB) { |
5 | | - const a = this.ports[nameA].messages[0] |
6 | | - const b = this.ports[nameB].messages[0] |
| 4 | +function messageArbiter (portA, portB) { |
| 5 | + const a = portA.messages[0] |
| 6 | + const b = portB.messages[0] |
7 | 7 | |
8 | 8 | if (!a) { |
9 | | - return nameB |
| 9 | + return portB |
10 | 10 | } else if (!b) { |
11 | | - return nameA |
| 11 | + return portA |
12 | 12 | } |
13 | 13 | |
14 | 14 | |
15 | 15 | if (a._fromTicks !== b._fromTicks) { |
16 | | - return a._fromTicks < b._fromTicks ? nameA : nameB |
| 16 | + return a._fromTicks < b._fromTicks ? portA : portB |
17 | 17 | } else { |
18 | 18 | |
19 | | - return nameA |
| 19 | + return portA |
20 | 20 | } |
21 | 21 | } |
22 | 22 | |
23 | 23 | module.exports = class PortManager { |
142 | 142 | |
143 | 143 | const numOfMsg = port.messages.push(message) |
144 | 144 | |
145 | 145 | if (numOfMsg === 1) { |
146 | | - if (this._isSaturated()) { |
| 146 | + if (this._isSaturated(this.ports)) { |
147 | 147 | this._saturationResolve() |
148 | 148 | this._saturationPromise = new Promise((resolve, reject) => { |
149 | 149 | this._saturationResolve = resolve |
150 | 150 | }) |
177 | 177 | this._unboundPorts.add(port2) |
178 | 178 | return [port1, port2] |
179 | 179 | } |
180 | 180 | |
181 | | - |
182 | | - _peekNextMessage () { |
183 | | - const names = Object.keys(this.ports) |
184 | | - if (names.length) { |
185 | | - const portName = names.reduce(messageArbiter.bind(this)) |
186 | | - const port = this.ports[portName] |
| 181 | + |
| 182 | + _peekNextMessage (ports) { |
| 183 | + ports = Object.values(ports) |
| 184 | + if (ports.length) { |
| 185 | + const port = ports.reduce(messageArbiter) |
187 | 186 | return port.messages[0] |
188 | 187 | } |
189 | 188 | } |
190 | 189 | |
191 | 190 | |
192 | 191 | * Waits for the the next message if any |
193 | 192 | * @returns {Promise} |
194 | 193 | */ |
195 | | - async getNextMessage () { |
196 | | - let message = this._peekNextMessage() |
197 | | - let saturated = this._isSaturated() |
| 194 | + async getNextMessage (ports = this.ports, timeout = Infinity) { |
| 195 | + let message = this._peekNextMessage(ports) |
198 | 196 | let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks() |
199 | 197 | |
200 | | - while (!saturated && |
201 | | - |
202 | | - !((message && oldestTime >= message._fromTicks) || |
203 | | - |
204 | | - (!message && oldestTime === this.kernel.ticks))) { |
205 | | - const ticksToWait = message ? message._fromTicks : this.kernel.ticks |
| 198 | + await Promise.race([ |
| 199 | + this._whenSaturated().then(() => { |
| 200 | + message = this._peekNextMessage(ports) |
| 201 | + }), |
| 202 | + new Promise(async (resolve, reject) => { |
| 203 | + while ( |
| 204 | + !((message && oldestTime >= message._fromTicks) || |
| 205 | + |
| 206 | + (!message && oldestTime === this.kernel.ticks))) { |
| 207 | + let ticksToWait = message ? message._fromTicks : this.kernel.ticks |
| 208 | + |
206 | 209 | |
207 | | - await Promise.race([ |
208 | | - this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => { |
209 | | - message = this._peekNextMessage() |
210 | | - }), |
211 | | - this._olderMessage(message).then(m => { |
212 | | - message = m |
213 | | - }), |
214 | | - this._whenSaturated().then(() => { |
215 | | - saturated = true |
216 | | - message = this._peekNextMessage() |
217 | | - }) |
218 | | - ]) |
| 210 | + await Promise.race([ |
| 211 | + this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => { |
| 212 | + message = this._peekNextMessage(ports) |
| 213 | + }), |
| 214 | + this._olderMessage(message).then(m => { |
| 215 | + message = m |
| 216 | + }) |
| 217 | + ]) |
219 | 218 | |
220 | | - oldestTime = this.hypervisor.scheduler.leastNumberOfTicks() |
221 | | - } |
222 | | - |
| 219 | + oldestTime = this.hypervisor.scheduler.leastNumberOfTicks() |
| 220 | + } |
| 221 | + resolve() |
| 222 | + }) |
| 223 | + ]) |
223 | 224 | return message |
224 | 225 | } |
225 | 226 | |
226 | 227 | |
227 | | - _isSaturated () { |
228 | | - const keys = Object.keys(this.ports) |
229 | | - return keys.length ? keys.every(name => this.ports[name].messages.length) : 0 |
| 228 | + _isSaturated (ports) { |
| 229 | + const values = Object.values(ports) |
| 230 | + return values.length ? values.every(port => port.messages.length) : false |
230 | 231 | } |
231 | 232 | |
232 | 233 | |
233 | 234 | _whenSaturated () { |
234 | | - return this._saturationPromise |
| 235 | + if (this._isSaturated(this.ports)) { |
| 236 | + return Promise.resolve() |
| 237 | + } else { |
| 238 | + return this._saturationPromise |
| 239 | + } |
235 | 240 | } |
236 | 241 | |
237 | 242 | |
238 | 243 | |