Commit 9e1f4df59ed811983747166e1f3b752a92e9b42a
implemeneted port waiting
Signed-off-by: wanderer <mjbecze@gmail.com>wanderer committed on 9/10/2017, 5:33:55 AM
Parent: 61eb05a360ac5413a3b250bbbfdf7b0a33560d93
Files changed
creationService.js | changed |
portManager.js | changed |
tests/index.js | changed |
creationService.js | ||
---|---|---|
@@ -78,8 +78,12 @@ | ||
78 | 78 | |
79 | 79 | return instance |
80 | 80 | } |
81 | 81 | |
82 | + get state () { | |
83 | + return {} | |
84 | + } | |
85 | + | |
82 | 86 | // get a hash from a POJO |
83 | 87 | _getHashFromObj (obj) { |
84 | 88 | return this.hypervisor.graph.flush(obj).then(obj => obj['/']) |
85 | 89 | } |
portManager.js | ||
---|---|---|
@@ -160,26 +160,21 @@ | ||
160 | 160 | this._unboundPorts.add(port2) |
161 | 161 | return [port1, port2] |
162 | 162 | } |
163 | 163 | |
164 | - // find and returns the next message that this instance currently knows about | |
165 | - _peekNextMessage (ports) { | |
166 | - ports = Object.values(ports) | |
167 | - if (ports.length) { | |
168 | - const port = ports.reduce(messageArbiter) | |
169 | - return port.messages[0] | |
170 | - } | |
171 | - } | |
172 | - | |
173 | 164 | /** |
174 | 165 | * Waits for the the next message if any |
175 | 166 | * @returns {Promise} |
176 | 167 | */ |
177 | 168 | async getNextMessage (ports = this.ports, timeout = Infinity) { |
178 | - let message = this._peekNextMessage(ports) | |
169 | + let message = peekNextMessage(ports) | |
179 | 170 | let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks() |
180 | 171 | let saturated = false |
181 | 172 | |
173 | + if (Object.keys(this._waitingPorts).length) { | |
174 | + throw new Error('already getting next message') | |
175 | + } | |
176 | + | |
182 | 177 | this._waitingPorts = ports |
183 | 178 | |
184 | 179 | const findOldestMessage = async () => { |
185 | 180 | while (// end if we have a message older then slowest containers |
@@ -192,9 +187,9 @@ | ||
192 | 187 | let ticksToWait = message ? message._fromTicks : this.kernel.ticks |
193 | 188 | // ticksToWait = ticksToWait > timeout ? timeout : ticksToWait |
194 | 189 | await Promise.race([ |
195 | 190 | this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => { |
196 | - message = this._peekNextMessage(ports) | |
191 | + message = peekNextMessage(ports) | |
197 | 192 | }), |
198 | 193 | this._olderMessage(message).then(m => { |
199 | 194 | message = m |
200 | 195 | }) |
@@ -204,14 +199,16 @@ | ||
204 | 199 | } |
205 | 200 | |
206 | 201 | await Promise.race([ |
207 | 202 | this._whenSaturated(ports).then(() => { |
208 | - message = this._peekNextMessage(ports) | |
203 | + message = peekNextMessage(ports) | |
209 | 204 | saturated = true |
210 | 205 | }), |
211 | 206 | findOldestMessage() |
212 | 207 | ]) |
213 | 208 | |
209 | + this._waitingPorts = {} | |
210 | + | |
214 | 211 | return message |
215 | 212 | } |
216 | 213 | |
217 | 214 | // returns a promise that resolve when the ports are saturated |
@@ -252,8 +249,17 @@ | ||
252 | 249 | const values = Object.values(ports) |
253 | 250 | return values.length ? values.every(port => port.messages.length) : true |
254 | 251 | } |
255 | 252 | |
253 | +// find and returns the next message that this instance currently knows about | |
254 | +function peekNextMessage (ports) { | |
255 | + ports = Object.values(ports) | |
256 | + if (ports.length) { | |
257 | + const port = ports.reduce(messageArbiter) | |
258 | + return port.messages[0] | |
259 | + } | |
260 | +} | |
261 | + | |
256 | 262 | // decides which message to go first |
257 | 263 | function messageArbiter (portA, portB) { |
258 | 264 | const a = portA.messages[0] |
259 | 265 | const b = portB.messages[0] |
tests/index.js | ||
---|---|---|
@@ -1182,5 +1182,32 @@ | ||
1182 | 1182 | })) |
1183 | 1183 | |
1184 | 1184 | hypervisor.pin(root) |
1185 | 1185 | }) |
1186 | + | |
1187 | + tape('waiting on ports', async t => { | |
1188 | + t.plan(1) | |
1189 | + class TestVMContainer extends BaseContainer { | |
1190 | + async onCreation (m) { | |
1191 | + await this.kernel.ports.bind('test', m.ports[0]) | |
1192 | + this.kernel.ports.getNextMessage() | |
1193 | + try { | |
1194 | + await this.kernel.ports.getNextMessage() | |
1195 | + } catch (e) { | |
1196 | + t.pass('should throw if already trying to get a message') | |
1197 | + } | |
1198 | + } | |
1199 | + } | |
1200 | + | |
1201 | + const hypervisor = new Hypervisor(node.dag) | |
1202 | + hypervisor.registerContainer(TestVMContainer) | |
1203 | + | |
1204 | + const port = hypervisor.creationService.getPort() | |
1205 | + | |
1206 | + await hypervisor.send(port, new Message({ | |
1207 | + data: { | |
1208 | + type: TestVMContainer.typeId | |
1209 | + }, | |
1210 | + ports: [port] | |
1211 | + })) | |
1212 | + }) | |
1186 | 1213 | }) |
Built with git-ssb-web