Commit c331410c6a6f9cd1457d0321b06291624febe799
Merge pull request #125 from primea/loop
Loopwanderer authored on 7/6/2017, 8:16:25 PM
GitHub committed on 7/6/2017, 8:16:25 PM
Parent: 016c6df49758e5db4d0f3caaf955ad714e137afc
Parent: 381ecaab16e8fc78457b956f74843ae172ca686e
Files changed
index.js | changed |
kernel.js | changed |
package.json | changed |
tests/index.js | changed |
index.js | ||
---|---|---|
@@ -41,8 +41,19 @@ | ||
41 | 41 | return this.graph.get(this.state, `${port.destId}/ports/${port.destName}`) |
42 | 42 | } |
43 | 43 | } |
44 | 44 | |
45 | + async send (port, message) { | |
46 | + if (port.destId) { | |
47 | + const id = port.destId | |
48 | + const instance = await this.getInstance(id) | |
49 | + return instance.queue(port.destName, message) | |
50 | + } else { | |
51 | + // port is unbound | |
52 | + port.destPort.messages.push(message) | |
53 | + } | |
54 | + } | |
55 | + | |
45 | 56 | // loads an instance of a container from the state |
46 | 57 | async _loadInstance (id) { |
47 | 58 | const state = await this.graph.get(this.state, id) |
48 | 59 | const container = this._containerTypes[state.type] |
kernel.js | ||
---|---|---|
@@ -34,37 +34,38 @@ | ||
34 | 34 | * @param {object} message |
35 | 35 | */ |
36 | 36 | queue (portName, message) { |
37 | 37 | this.ports.queue(portName, message) |
38 | - if (this.containerState !== 'running') { | |
39 | - this.containerState = 'running' | |
40 | - return this._runNextMessage() | |
41 | - } | |
38 | + return this._startMessageLoop() | |
42 | 39 | } |
43 | 40 | |
44 | - initialize (message) { | |
45 | - this.containerState = 'running' | |
46 | - return this.run(message, 'initialize') | |
41 | + async initialize (message) { | |
42 | + await this.run(message, 'initialize') | |
43 | + return this._startMessageLoop() | |
47 | 44 | } |
48 | 45 | |
49 | 46 | // waits for the next message |
50 | - async _runNextMessage () { | |
51 | - // check if the ports are saturated, if so we don't have to wait on the | |
52 | - // scheduler | |
53 | - const message = await this.ports.getNextMessage() | |
47 | + async _startMessageLoop () { | |
48 | + // this ensure we only every have one loop running at a time | |
49 | + if (this.containerState !== 'running') { | |
50 | + this.containerState = 'running' | |
54 | 51 | |
55 | - if (message) { | |
56 | - message.fromPort.messages.shift() | |
57 | - // if the message we recived had more ticks then we currently have the | |
58 | - // update it | |
59 | - if (message._fromTicks > this.ticks) { | |
60 | - this.ticks = message._fromTicks | |
61 | - this.hypervisor.scheduler.update(this) | |
52 | + while (1) { | |
53 | + const message = await this.ports.getNextMessage() | |
54 | + if (!message) break | |
55 | + | |
56 | + // dequqe message | |
57 | + message.fromPort.messages.shift() | |
58 | + // if the message we recived had more ticks then we currently have the | |
59 | + // update it | |
60 | + if (message._fromTicks > this.ticks) { | |
61 | + this.ticks = message._fromTicks | |
62 | + this.hypervisor.scheduler.update(this) | |
63 | + } | |
64 | + // run the next message | |
65 | + await this.run(message) | |
62 | 66 | } |
63 | - // run the next message | |
64 | - return this.run(message) | |
65 | - } else { | |
66 | - // if no more messages then shut down | |
67 | + // no more messages; shut down | |
67 | 68 | this.hypervisor.scheduler.done(this.id) |
68 | 69 | } |
69 | 70 | } |
70 | 71 | |
@@ -80,9 +81,8 @@ | ||
80 | 81 | const responsePort = message.responsePort |
81 | 82 | delete message.responsePort |
82 | 83 | |
83 | 84 | this.ports.addReceivedPorts(message) |
84 | - message._hops++ | |
85 | 85 | |
86 | 86 | if (message.constructor === DeleteMessage) { |
87 | 87 | this.ports._delete(message.fromName) |
88 | 88 | } else { |
@@ -99,13 +99,11 @@ | ||
99 | 99 | if (responsePort) { |
100 | 100 | this.send(responsePort, new Message({ |
101 | 101 | data: result |
102 | 102 | })) |
103 | - this.ports._unboundPorts.add(responsePort) | |
104 | 103 | } |
105 | 104 | |
106 | 105 | this.ports.clearUnboundedPorts() |
107 | - return this._runNextMessage() | |
108 | 106 | } |
109 | 107 | |
110 | 108 | getResponsePort (message) { |
111 | 109 | if (message.responsePort) { |
@@ -164,23 +162,16 @@ | ||
164 | 162 | * sends a message to a given port |
165 | 163 | * @param {Object} portRef - the port |
166 | 164 | * @param {Message} message - the message |
167 | 165 | */ |
168 | - async send (port, message) { | |
166 | + send (port, message) { | |
167 | + message._hops++ | |
169 | 168 | // set the port that the message came from |
170 | 169 | message._fromTicks = this.ticks |
171 | 170 | this.ports.removeSentPorts(message) |
172 | 171 | |
173 | 172 | // if (this.currentMessage !== message && !message.responsePort) { |
174 | 173 | // this.currentMessage._addSubMessage(message) |
175 | 174 | // } |
176 | - | |
177 | - if (port.destId) { | |
178 | - const id = port.destId | |
179 | - const instance = await this.hypervisor.getInstance(id) | |
180 | - return instance.queue(port.destName, message) | |
181 | - } else { | |
182 | - // port is unbound | |
183 | - port.destPort.messages.push(message) | |
184 | - } | |
175 | + return this.hypervisor.send(port, message) | |
185 | 176 | } |
186 | 177 | } |
package.json | ||
---|---|---|
@@ -1,7 +1,7 @@ | ||
1 | 1 | { |
2 | 2 | "name": "primea-hypervisor", |
3 | - "version": "0.0.1", | |
3 | + "version": "0.0.2", | |
4 | 4 | "description": "this is a JS implemention of the primea hypervisor", |
5 | 5 | "scripts": { |
6 | 6 | "coverage": "node --harmony ./node_modules/istanbul/lib/cli.js cover ./tests/index.js", |
7 | 7 | "coveralls": "npm run coverage && coveralls <coverage/lcov.info", |
tests/index.js | ||
---|---|---|
@@ -21,9 +21,9 @@ | ||
21 | 21 | } |
22 | 22 | |
23 | 23 | node.on('ready', () => { |
24 | 24 | tape('basic', async t => { |
25 | - t.plan(2) | |
25 | + t.plan(3) | |
26 | 26 | let message |
27 | 27 | const expectedState = { |
28 | 28 | '/': 'zdpuB1wc9Pb6jUzfNt4nAxAEUxB7kNhg4vbq7YLcEyBUb6iAB' |
29 | 29 | } |
@@ -52,8 +52,9 @@ | ||
52 | 52 | rootContainer.send(portRef1, message) |
53 | 53 | |
54 | 54 | const stateRoot = await hypervisor.createStateRoot(Infinity) |
55 | 55 | t.deepEquals(stateRoot, expectedState, 'expected root!') |
56 | + t.equals(hypervisor.scheduler.oldest(), 0) | |
56 | 57 | }) |
57 | 58 | |
58 | 59 | tape('basic - do not store containers with no ports bound', async t => { |
59 | 60 | t.plan(1) |
Built with git-ssb-web