Commit 61eb05a360ac5413a3b250bbbfdf7b0a33560d93
add copy functio to creation service
Signed-off-by: wanderer <mjbecze@gmail.com>wanderer committed on 9/10/2017, 4:41:34 AM
Parent: 5d73da450ff3f8fae6bf5844ed02da668a06e011
Files changed
creationService.js | changed |
portManager.js | changed |
tests/index.js | changed |
creationService.js | ||
---|---|---|
@@ -1,5 +1,7 @@ | ||
1 | 1 | const chunk = require('chunk') |
2 | +const Message = require('primea-message') | |
3 | +const DeleteMessage = require('./deleteMessage.js') | |
2 | 4 | |
3 | 5 | const MAX_DATA_BYTES = 65533 |
4 | 6 | |
5 | 7 | module.exports = class CreationService { |
@@ -15,8 +17,12 @@ | ||
15 | 17 | const creator = this.scheduler.getInstance(message.fromId) |
16 | 18 | id = creator.generateNextId() |
17 | 19 | } |
18 | 20 | return this.createInstance(message, id) |
21 | + } else if (message.responsePort && !(message instanceof DeleteMessage)) { | |
22 | + this.send(message.responsePort, new Message({ | |
23 | + ports: [this.getPort()] | |
24 | + })) | |
19 | 25 | } |
20 | 26 | } |
21 | 27 | |
22 | 28 | getPort () { |
@@ -25,15 +31,15 @@ | ||
25 | 31 | destId: 0 |
26 | 32 | } |
27 | 33 | } |
28 | 34 | |
29 | - // send (port, message) { | |
30 | - // message._hops++ | |
31 | - // message._fromTicks = this.ticks | |
32 | - // message.fromId = this.id | |
35 | + send (port, message) { | |
36 | + message._hops++ | |
37 | + message._fromTicks = this.ticks | |
38 | + message.fromId = this.id | |
33 | 39 | |
34 | - // return this.hypervisor.send(port, message) | |
35 | - // } | |
40 | + return this.hypervisor.send(port, message) | |
41 | + } | |
36 | 42 | |
37 | 43 | /** |
38 | 44 | * creates an new container instances and save it in the state |
39 | 45 | * @returns {Promise} |
portManager.js | ||
---|---|---|
@@ -1,26 +1,7 @@ | ||
1 | 1 | const DeleteMessage = require('./deleteMessage') |
2 | 2 | |
3 | -// decides which message to go first | |
4 | -function messageArbiter (portA, portB) { | |
5 | - const a = portA.messages[0] | |
6 | - const b = portB.messages[0] | |
7 | 3 | |
8 | - if (!a) { | |
9 | - return portB | |
10 | - } else if (!b) { | |
11 | - return portA | |
12 | - } | |
13 | - | |
14 | - // order by number of ticks if messages have different number of ticks | |
15 | - if (a._fromTicks !== b._fromTicks) { | |
16 | - return a._fromTicks < b._fromTicks ? portA : portB | |
17 | - } else { | |
18 | - // insertion order | |
19 | - return portA | |
20 | - } | |
21 | -} | |
22 | - | |
23 | 4 | module.exports = class PortManager { |
24 | 5 | /** |
25 | 6 | * The port manager manages the the ports. This inculdes creation, deletion |
26 | 7 | * fetching and waiting on ports |
@@ -31,8 +12,10 @@ | ||
31 | 12 | */ |
32 | 13 | constructor (opts) { |
33 | 14 | Object.assign(this, opts) |
34 | 15 | this.ports = this.state.ports |
16 | + | |
17 | + this._waitingPorts = {} | |
35 | 18 | // tracks unbounded ports that we have |
36 | 19 | this._unboundPorts = new Set() |
37 | 20 | this._saturationPromise = new Promise((resolve, reject) => { |
38 | 21 | this._saturationResolve = resolve |
@@ -142,9 +125,9 @@ | ||
142 | 125 | |
143 | 126 | const numOfMsg = port.messages.push(message) |
144 | 127 | |
145 | 128 | if (numOfMsg === 1) { |
146 | - if (this._isSaturated(this.ports)) { | |
129 | + if (isSaturated(this._waitingPorts)) { | |
147 | 130 | this._saturationResolve() |
148 | 131 | this._saturationPromise = new Promise((resolve, reject) => { |
149 | 132 | this._saturationResolve = resolve |
150 | 133 | }) |
@@ -195,8 +178,10 @@ | ||
195 | 178 | let message = this._peekNextMessage(ports) |
196 | 179 | let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks() |
197 | 180 | let saturated = false |
198 | 181 | |
182 | + this._waitingPorts = ports | |
183 | + | |
199 | 184 | const findOldestMessage = async () => { |
200 | 185 | while (// end if we have a message older then slowest containers |
201 | 186 | !((message && oldestTime >= message._fromTicks) || |
202 | 187 | // end if there are no messages and this container is the oldest contaner |
@@ -218,9 +203,9 @@ | ||
218 | 203 | } |
219 | 204 | } |
220 | 205 | |
221 | 206 | await Promise.race([ |
222 | - this._whenSaturated().then(() => { | |
207 | + this._whenSaturated(ports).then(() => { | |
223 | 208 | message = this._peekNextMessage(ports) |
224 | 209 | saturated = true |
225 | 210 | }), |
226 | 211 | findOldestMessage() |
@@ -228,17 +213,11 @@ | ||
228 | 213 | |
229 | 214 | return message |
230 | 215 | } |
231 | 216 | |
232 | - // tests wether or not all the ports have a message | |
233 | - _isSaturated (ports) { | |
234 | - const values = Object.values(ports) | |
235 | - return values.length === 0 ? true : values.every(port => port.messages.length !== 0) | |
236 | - } | |
237 | - | |
238 | 217 | // returns a promise that resolve when the ports are saturated |
239 | - _whenSaturated () { | |
240 | - if (this._isSaturated(this.ports)) { | |
218 | + _whenSaturated (ports) { | |
219 | + if (isSaturated(ports)) { | |
241 | 220 | return Promise.resolve() |
242 | 221 | } else { |
243 | 222 | return this._saturationPromise |
244 | 223 | } |
@@ -266,4 +245,30 @@ | ||
266 | 245 | } |
267 | 246 | } |
268 | 247 | } |
269 | 248 | } |
249 | + | |
250 | +// tests wether or not all the ports have a message | |
251 | +function isSaturated (ports) { | |
252 | + const values = Object.values(ports) | |
253 | + return values.length ? values.every(port => port.messages.length) : true | |
254 | +} | |
255 | + | |
256 | +// decides which message to go first | |
257 | +function messageArbiter (portA, portB) { | |
258 | + const a = portA.messages[0] | |
259 | + const b = portB.messages[0] | |
260 | + | |
261 | + if (!a) { | |
262 | + return portB | |
263 | + } else if (!b) { | |
264 | + return portA | |
265 | + } | |
266 | + | |
267 | + // order by number of ticks if messages have different number of ticks | |
268 | + if (a._fromTicks !== b._fromTicks) { | |
269 | + return a._fromTicks < b._fromTicks ? portA : portB | |
270 | + } else { | |
271 | + // insertion order | |
272 | + return portA | |
273 | + } | |
274 | +} |
tests/index.js | ||
---|---|---|
@@ -1147,5 +1147,40 @@ | ||
1147 | 1147 | '/': 'zdpuAonuhk7ZhdghJh4saaUCskY5mXZ6M9BcV9iAhCanAQx9i' |
1148 | 1148 | } |
1149 | 1149 | t.deepEquals(stateRoot, expectedSR) |
1150 | 1150 | }) |
1151 | + | |
1152 | + tape('creation service - port copy', async t => { | |
1153 | + t.plan(2) | |
1154 | + class TestVMContainer extends BaseContainer { | |
1155 | + onCreation (m) { | |
1156 | + const creationPort = m.ports[0] | |
1157 | + | |
1158 | + const message = this.kernel.createMessage() | |
1159 | + const responePort = this.kernel.getResponsePort(message) | |
1160 | + | |
1161 | + return Promise.all([ | |
1162 | + this.kernel.ports.bind('response', responePort), | |
1163 | + this.kernel.send(creationPort, message) | |
1164 | + ]) | |
1165 | + } | |
1166 | + onMessage (m) { | |
1167 | + t.equal(m.fromName, 'response') | |
1168 | + t.equal(m.ports.length, 1) | |
1169 | + } | |
1170 | + } | |
1171 | + | |
1172 | + const hypervisor = new Hypervisor(node.dag) | |
1173 | + hypervisor.registerContainer(TestVMContainer) | |
1174 | + | |
1175 | + const port = hypervisor.creationService.getPort() | |
1176 | + | |
1177 | + const root = await hypervisor.send(port, new Message({ | |
1178 | + data: { | |
1179 | + type: TestVMContainer.typeId | |
1180 | + }, | |
1181 | + ports: [port] | |
1182 | + })) | |
1183 | + | |
1184 | + hypervisor.pin(root) | |
1185 | + }) | |
1151 | 1186 | }) |
Built with git-ssb-web