Commit f2c989b4dce29265b4c17a2cde93c68a0b742edc
added checks for loading instances
wanderer committed on 6/28/2017, 1:53:04 AMParent: d30608cc6c438ea4527a506e061d183c30099168
Files changed
exoInterface.js | changed |
index.js | changed |
portManager.js | changed |
tests/index.js | changed |
exoInterface.js | ||
---|---|---|
@@ -18,15 +18,20 @@ | ||
18 | 18 | this.container = new opts.container.Constructor(this, opts.container.args) |
19 | 19 | |
20 | 20 | this.ticks = 0 |
21 | 21 | this.containerState = 'idle' |
22 | + this._pendingSends = new Map() | |
22 | 23 | |
23 | 24 | // create the port manager |
24 | 25 | this.ports = new PortManager(Object.assign({ |
25 | 26 | exInterface: this |
26 | 27 | }, opts)) |
27 | 28 | } |
28 | 29 | |
30 | + _addWork (promise) { | |
31 | + this._outStandingWork = Promise.all([this._outStandingWork, promise]) | |
32 | + } | |
33 | + | |
29 | 34 | /** |
30 | 35 | * adds a message to this containers message queue |
31 | 36 | * @param {string} portName |
32 | 37 | * @param {object} message |
index.js | ||
---|---|---|
@@ -16,8 +16,9 @@ | ||
16 | 16 | this.scheduler = new Scheduler() |
17 | 17 | this.state = state |
18 | 18 | this._containerTypes = {} |
19 | 19 | this._nodesToCheck = new Set() |
20 | + this._loadingInstances = new Map() | |
20 | 21 | } |
21 | 22 | |
22 | 23 | /** |
23 | 24 | * add a potaintail node in the state graph to check for garbage collection |
@@ -71,16 +72,22 @@ | ||
71 | 72 | * gets an existsing container instances |
72 | 73 | * @param {string} id - the containers ID |
73 | 74 | * @returns {Promise} |
74 | 75 | */ |
75 | - async getInstance (id) { | |
76 | - let instance = this.scheduler.getInstance(id) | |
76 | + getInstance (id) { | |
77 | + let instance = this.scheduler.getInstance(id) || this._loadingInstances.get(id) | |
77 | 78 | if (instance) { |
79 | + // console.log('have instance', id) | |
78 | 80 | return instance |
79 | 81 | } else { |
80 | 82 | const lock = this.scheduler.getLock() |
81 | - instance = await this._loadInstance(id, lock) | |
82 | - return instance | |
83 | + const promise = this._loadInstance(id, lock) | |
84 | + promise.then(() => { | |
85 | + this._loadingInstances.delete(id) | |
86 | + }) | |
87 | + | |
88 | + this._loadingInstances.set(id, promise) | |
89 | + return promise | |
83 | 90 | } |
84 | 91 | } |
85 | 92 | |
86 | 93 | /** |
portManager.js | ||
---|---|---|
@@ -145,15 +145,16 @@ | ||
145 | 145 | this._oldestMessageResolve = resolve |
146 | 146 | }) |
147 | 147 | this._messageTickThreshold = Infinity |
148 | 148 | } |
149 | + | |
150 | + if (this.isSaturated()) { | |
151 | + this._saturationResolve() | |
152 | + this._saturationPromise = new Promise((resolve, reject) => { | |
153 | + this._saturationResolve = resolve | |
154 | + }) | |
155 | + } | |
149 | 156 | } |
150 | - if (this.isSaturated()) { | |
151 | - this._saturationResolve() | |
152 | - this._saturationPromise = new Promise((resolve, reject) => { | |
153 | - this._saturationResolve = resolve | |
154 | - }) | |
155 | - } | |
156 | 157 | } |
157 | 158 | |
158 | 159 | /** |
159 | 160 | * gets a port given it's name |
@@ -185,9 +186,10 @@ | ||
185 | 186 | |
186 | 187 | // create a new channel for the container |
187 | 188 | const ports = this.createChannel() |
188 | 189 | this._unboundPorts.delete(ports[1]) |
189 | - this.hypervisor.createInstance(type, data, [ports[1]], id) | |
190 | + const promise = this.hypervisor.createInstance(type, data, [ports[1]], id) | |
191 | + this.exInterface._addWork(promise) | |
190 | 192 | |
191 | 193 | return ports[0] |
192 | 194 | } |
193 | 195 |
tests/index.js | ||
---|---|---|
@@ -176,21 +176,19 @@ | ||
176 | 176 | t.plan(2) |
177 | 177 | let runs = 0 |
178 | 178 | |
179 | 179 | class Root extends BaseContainer { |
180 | - async run (m) { | |
180 | + run (m) { | |
181 | 181 | if (!runs) { |
182 | 182 | runs++ |
183 | 183 | const one = this.exInterface.ports.create('first') |
184 | 184 | const two = this.exInterface.ports.create('second') |
185 | 185 | |
186 | 186 | this.exInterface.ports.bind('two', two) |
187 | 187 | this.exInterface.ports.bind('one', one) |
188 | 188 | |
189 | - await Promise.all([ | |
190 | - this.exInterface.send(one, this.exInterface.createMessage()), | |
191 | - this.exInterface.send(two, this.exInterface.createMessage()) | |
192 | - ]) | |
189 | + this.exInterface.send(one, this.exInterface.createMessage()) | |
190 | + this.exInterface.send(two, this.exInterface.createMessage()) | |
193 | 191 | } else if (runs === 1) { |
194 | 192 | runs++ |
195 | 193 | t.equals(m.data, 'first', 'should recive the first message') |
196 | 194 | } else if (runs === 2) { |
@@ -535,8 +533,52 @@ | ||
535 | 533 | root.ports.bind('first', port) |
536 | 534 | root.send(port, root.createMessage()) |
537 | 535 | }) |
538 | 536 | |
537 | + tape('send to the same container at the same time', async t => { | |
538 | + t.plan(2) | |
539 | + | |
540 | + let runs = 0 | |
541 | + let instance | |
542 | + | |
543 | + class Root extends BaseContainer { | |
544 | + run (m) { | |
545 | + let one = this.exInterface.ports.get('one') | |
546 | + if (!one) { | |
547 | + one = this.exInterface.ports.create('first') | |
548 | + this.exInterface.ports.bind('one', one) | |
549 | + } else { | |
550 | + this.exInterface.send(one, this.exInterface.createMessage()) | |
551 | + this.exInterface.send(one, this.exInterface.createMessage()) | |
552 | + } | |
553 | + } | |
554 | + } | |
555 | + | |
556 | + class First extends BaseContainer { | |
557 | + run (m) { | |
558 | + ++runs | |
559 | + if (runs === 2) { | |
560 | + t.equals(instance, this, 'should have same instances') | |
561 | + } else { | |
562 | + instance = this | |
563 | + } | |
564 | + } | |
565 | + } | |
566 | + | |
567 | + const hypervisor = new Hypervisor(node.dag) | |
568 | + | |
569 | + hypervisor.registerContainer('root', Root) | |
570 | + hypervisor.registerContainer('first', First) | |
571 | + | |
572 | + const root = await hypervisor.createInstance('root') | |
573 | + const port = root.ports.create('root') | |
574 | + root.ports.bind('first', port) | |
575 | + root.send(port, root.createMessage()) | |
576 | + await hypervisor.createStateRoot() | |
577 | + root.send(port, root.createMessage()) | |
578 | + await hypervisor.createStateRoot() | |
579 | + t.equals(runs, 2) | |
580 | + }) | |
539 | 581 | tape('checking ports', async t => { |
540 | 582 | t.plan(4) |
541 | 583 | const hypervisor = new Hypervisor(node.dag) |
542 | 584 | hypervisor.registerContainer('base', BaseContainer) |
Built with git-ssb-web