Commit 3caa963721b6de8700ae2769edf1ce8f78bc0718
check for cyclic waits
wanderer committed on 5/7/2017, 10:37:55 AMParent: 1585b228b98ee11398442d6ae9697882ea7360dd
Files changed
index.js | changed |
kernel.js | changed |
portManager.js | changed |
tests/index.js | changed |
index.js | ||
---|---|---|
@@ -31,11 +31,11 @@ | ||
31 | 31 | } |
32 | 32 | |
33 | 33 | // given a port, wait untill its source contract has reached the threshold |
34 | 34 | // tick count |
35 | - async wait (port, threshold) { | |
35 | + async wait (port, threshold, fromPort) { | |
36 | 36 | let kernel = await this.getInstance(port) |
37 | - return kernel.wait(threshold) | |
37 | + return kernel.wait(threshold, fromPort) | |
38 | 38 | } |
39 | 39 | |
40 | 40 | async createInstance (type, state, entryPort, parentPort) { |
41 | 41 | const VM = this._VMs[type] |
@@ -60,9 +60,9 @@ | ||
60 | 60 | /** |
61 | 61 | * opts.entryPort |
62 | 62 | * opts.parentPort |
63 | 63 | */ |
64 | - createInstanceFromPort (entryPort, parentPort) { | |
64 | + async createInstanceFromPort (entryPort, parentPort) { | |
65 | 65 | const state = entryPort.link['/'] |
66 | 66 | return this.createInstance(entryPort.type, state, entryPort, parentPort) |
67 | 67 | } |
68 | 68 | |
@@ -74,9 +74,10 @@ | ||
74 | 74 | async generateID (port) { |
75 | 75 | if (!port || !port.id) { |
76 | 76 | return null |
77 | 77 | } |
78 | - let id = await this.graph.flush(port.id) | |
78 | + let id = Object.assign({}, port.id) | |
79 | + id = await this.graph.flush(id) | |
79 | 80 | id = id['/'] |
80 | 81 | if (Buffer.isBuffer(id)) { |
81 | 82 | id = multibase.encode('base58btc', id).toString() |
82 | 83 | } |
kernel.js | ||
---|---|---|
@@ -27,9 +27,12 @@ | ||
27 | 27 | }) |
28 | 28 | this.on('result', this._runNextMessage) |
29 | 29 | this.on('idle', () => { |
30 | 30 | while (!this._waitingQueue.isEmpty()) { |
31 | - this._waitingQueue.poll().resolve() | |
31 | + const waiter = this._waitingQueue.poll() | |
32 | + this.wait(waiter.ticks, waiter.from).then(ticks => { | |
33 | + waiter.resolve(ticks) | |
34 | + }) | |
32 | 35 | } |
33 | 36 | }) |
34 | 37 | } |
35 | 38 | |
@@ -50,8 +53,9 @@ | ||
50 | 53 | this.emit(vmState, message) |
51 | 54 | } |
52 | 55 | |
53 | 56 | async _runNextMessage () { |
57 | + // console.log('next message', this.ticks, this.entryPort) | |
54 | 58 | const message = await this.ports.getNextMessage() |
55 | 59 | // if the vm is paused and it gets a message; save that message for use when the VM is resumed |
56 | 60 | if (message && this.vmState === 'paused') { |
57 | 61 | this.ports._portMap(message._fromPort).unshfit(message) |
@@ -98,30 +102,35 @@ | ||
98 | 102 | exceptionError: e |
99 | 103 | } |
100 | 104 | clearObject(this.state) |
101 | 105 | Object.assign(this.state, oldState) |
106 | + console.log(e) | |
102 | 107 | } |
103 | 108 | |
104 | 109 | this.emit('result', result) |
105 | 110 | return result |
106 | 111 | } |
107 | 112 | |
108 | 113 | // returns a promise that resolves once the kernel hits the threshould tick |
109 | 114 | // count |
110 | - async wait (threshold) { | |
111 | - return new Promise((resolve, reject) => { | |
112 | - if (this.vmState === 'idle' || threshold <= this.ticks) { | |
113 | - resolve(this.ticks) | |
114 | - } else { | |
115 | + async wait (threshold, fromPort) { | |
116 | + if (threshold <= this.ticks) { | |
117 | + return this.ticks | |
118 | + } else if (this.vmState === 'idle') { | |
119 | + return this.ports.wait(threshold, fromPort) | |
120 | + } else { | |
121 | + return new Promise((resolve, reject) => { | |
115 | 122 | this._waitingQueue.add({ |
116 | 123 | threshold: threshold, |
117 | - resolve: resolve | |
124 | + resolve: resolve, | |
125 | + from: fromPort | |
118 | 126 | }) |
119 | - } | |
120 | - }) | |
127 | + }) | |
128 | + } | |
121 | 129 | } |
122 | 130 | |
123 | 131 | incrementTicks (count) { |
132 | + console.log('update ticks') | |
124 | 133 | this.ticks += count |
125 | 134 | while (!this._waitingQueue.isEmpty()) { |
126 | 135 | const waiter = this._waitingQueue.peek() |
127 | 136 | if (waiter.threshold > this.ticks) { |
@@ -134,9 +143,8 @@ | ||
134 | 143 | |
135 | 144 | async createPort (type, name) { |
136 | 145 | const VM = this.hypervisor._VMs[type] |
137 | 146 | const parentId = this.entryPort ? this.entryPort.id : null |
138 | - | |
139 | 147 | const portRef = { |
140 | 148 | 'messages': [], |
141 | 149 | 'id': { |
142 | 150 | '/': { |
@@ -151,14 +159,12 @@ | ||
151 | 159 | } |
152 | 160 | |
153 | 161 | // create the port instance |
154 | 162 | await this.ports.set(name, portRef) |
155 | - | |
156 | 163 | // incerment the nonce |
157 | 164 | const nonce = new BN(this.state.nonce) |
158 | 165 | nonce.iaddn(1) |
159 | 166 | this.state.nonce = nonce.toArray() |
160 | - | |
161 | 167 | return portRef |
162 | 168 | } |
163 | 169 | |
164 | 170 | async send (portRef, message) { |
@@ -167,8 +173,9 @@ | ||
167 | 173 | portInstance.hasSent = true |
168 | 174 | } catch (e) { |
169 | 175 | throw new Error('invalid port referance, which means the port that the port was either moved or destoried') |
170 | 176 | } |
177 | + | |
171 | 178 | const id = await this.hypervisor.generateID(this.entryPort) |
172 | 179 | message._fromPort = id |
173 | 180 | message._ticks = this.ticks |
174 | 181 |
portManager.js | ||
---|---|---|
@@ -66,24 +66,27 @@ | ||
66 | 66 | return this.ports[key] |
67 | 67 | } |
68 | 68 | |
69 | 69 | // waits till all ports have reached a threshold tick count |
70 | - async wait (threshold) { | |
70 | + async wait (threshold, fromPort) { | |
71 | + // console.log('wait', threshold, 'id', this.entryPort) | |
71 | 72 | // find the ports that have a smaller tick count then the threshold tick count |
72 | 73 | const unkownPorts = [...this._portMap].filter(([id, port]) => { |
73 | - return (port.hasSent || port.name === ENTRY) && port.ticks < threshold | |
74 | + return (port.hasSent || port.name === ENTRY) && | |
75 | + port.ticks < threshold && | |
76 | + fromPort !== port | |
74 | 77 | }) |
75 | 78 | |
76 | 79 | const promises = unkownPorts.map(async ([id, port]) => { |
77 | 80 | const portObj = port.name === ENTRY ? this.parentPort : this.ports[port.name] |
78 | 81 | // update the port's tick count |
79 | - port.ticks = await this.hypervisor.wait(portObj, threshold) | |
82 | + port.ticks = await this.hypervisor.wait(portObj, threshold, this.entryPort) | |
80 | 83 | }) |
81 | 84 | return Promise.all(promises) |
82 | 85 | } |
83 | 86 | |
84 | 87 | async getNextMessage () { |
85 | - await this.wait(this.kernel.ticks) | |
88 | + await this.wait(this.kernel.ticks, this.entryPort) | |
86 | 89 | const portMap = [...this._portMap].reduce(messageArbiter) |
87 | 90 | if (portMap) { |
88 | 91 | return portMap[1].shift() |
89 | 92 | } |
tests/index.js | ||
---|---|---|
@@ -22,9 +22,9 @@ | ||
22 | 22 | console.log(err) |
23 | 23 | }) |
24 | 24 | |
25 | 25 | node.on('start', () => { |
26 | - tape('basic', async t => { | |
26 | + tape.only('basic', async t => { | |
27 | 27 | const message = new Message() |
28 | 28 | const expectedState = { |
29 | 29 | '/': 'zdpuAntkdU7yBJojcBT5Q9wBhrK56NmLnwpHPKaEGMFnAXpv7' |
30 | 30 | } |
@@ -47,9 +47,9 @@ | ||
47 | 47 | t.deepEquals(stateRoot, expectedState, 'expected root!') |
48 | 48 | t.end() |
49 | 49 | }) |
50 | 50 | |
51 | - tape.only('one child contract', async t => { | |
51 | + tape('one child contract', async t => { | |
52 | 52 | let message = new Message() |
53 | 53 | const expectedState = { '/': 'zdpuAqtY43BMaTCB5nTt7kooeKAWibqGs44Uwy9jJQHjTnHRK' } |
54 | 54 | let hasResolved = false |
55 | 55 | |
@@ -68,9 +68,13 @@ | ||
68 | 68 | |
69 | 69 | class testVMContainer extends BaseContainer { |
70 | 70 | async run (m) { |
71 | 71 | const port = await this.kernel.createPort('test2', 'child') |
72 | - await this.kernel.send(port, m) | |
72 | + try { | |
73 | + await this.kernel.send(port, m) | |
74 | + } catch (e) { | |
75 | + console.log(e) | |
76 | + } | |
73 | 77 | this.kernel.incrementTicks(1) |
74 | 78 | } |
75 | 79 | } |
76 | 80 |
Built with git-ssb-web