Commit a728a4416ce9e9ab6ba38dc79dde2c7a294feb7e
wait only untill we have the number of ticks of the oldest message
wanderer committed on 5/22/2017, 11:31:06 PMParent: 5b9026817a9c02b306d206d7225548e34e50bb16
Files changed
exoInterface.js | changed |
index.js | changed |
port.js | changed |
portManager.js | changed |
tests/index.js | changed |
exoInterface.js | ||
---|---|---|
@@ -103,10 +103,12 @@ | ||
103 | 103 | this.emit('result', result) |
104 | 104 | return result |
105 | 105 | } |
106 | 106 | |
107 | - // returns a promise that resolves once the kernel hits the threshould tick | |
108 | - // count | |
107 | + /** | |
108 | + * returns a promise that resolves once the kernel hits the threshould tick count | |
109 | + * | |
110 | + */ | |
109 | 111 | wait (threshold, fromPort) { |
110 | 112 | if (threshold <= this.ticks) { |
111 | 113 | return this.ticks |
112 | 114 | } else if (this.containerState === 'idle') { |
@@ -140,9 +142,9 @@ | ||
140 | 142 | // set the port that the message came from |
141 | 143 | message._fromPort = this.entryPort |
142 | 144 | message._fromPortTicks = this.ticks |
143 | 145 | |
144 | - const container = await this.getContainer(portRef) | |
146 | + const container = await this.getInstance(portRef) | |
145 | 147 | container.queue(message) |
146 | 148 | |
147 | 149 | const waiter = this._waitingMap.get(portRef) |
148 | 150 | if (waiter) { |
@@ -150,8 +152,8 @@ | ||
150 | 152 | this._waitingMap.delete(portRef) |
151 | 153 | } |
152 | 154 | } |
153 | 155 | |
154 | - getContainer (portRef) { | |
155 | - return this.hypervisor.getOrCreateInstance(portRef, this.entryPort) | |
156 | + getInstance (portRef) { | |
157 | + return this.hypervisor.getInstanceByPort(portRef, this.entryPort) | |
156 | 158 | } |
157 | 159 | } |
index.js | ||
---|---|---|
@@ -8,17 +8,22 @@ | ||
8 | 8 | * @param {Graph} dag an instance of [ipfs.dag](https://github.com/ipfs/interface-ipfs-core/tree/master/API/dag#dag-api) |
9 | 9 | */ |
10 | 10 | constructor (dag) { |
11 | 11 | this.graph = new Graph(dag) |
12 | - this._runningContainers = new Map() | |
12 | + this._containerInstances = new Map() | |
13 | 13 | this._containerTypes = {} |
14 | 14 | } |
15 | 15 | |
16 | - async getByPath (root, path) { | |
16 | + /** | |
17 | + * get a container by its path | |
18 | + * @param {Object} root - the root container to start searching from | |
19 | + * @param {String} path - the path to travers | |
20 | + */ | |
21 | + async getInstanceByPath (root, path) { | |
17 | 22 | path = path.split('/') |
18 | 23 | for (const name of path) { |
19 | 24 | const portRef = root.ports.get(name) |
20 | - root = await this.getOrCreateInstance(portRef, root.entryPort) | |
25 | + root = await this.getInstanceByPort(portRef, root.entryPort) | |
21 | 26 | } |
22 | 27 | return root |
23 | 28 | } |
24 | 29 | |
@@ -26,16 +31,16 @@ | ||
26 | 31 | * get a contrainer instance given its entry port and its mounting port |
27 | 32 | * @param {Object} port the entry port for the container |
28 | 33 | * @param {Object} parentPort the entry port of the parent container |
29 | 34 | */ |
30 | - async getOrCreateInstance (port, parentPort) { | |
31 | - let instance = this._runningContainers.get(port) | |
35 | + async getInstanceByPort (port, parentPort) { | |
36 | + let instance = this._containerInstances.get(port) | |
32 | 37 | // if there is no container running crceate one |
33 | 38 | if (!instance) { |
34 | 39 | instance = await this.createInstance(port.type, port.link, port, parentPort) |
35 | 40 | instance.on('idle', () => { |
36 | 41 | // once the container is done shut it down |
37 | - this._runningContainers.delete(port) | |
42 | + this._containerInstances.delete(port) | |
38 | 43 | }) |
39 | 44 | } |
40 | 45 | return instance |
41 | 46 | } |
@@ -48,9 +53,9 @@ | ||
48 | 53 | * @param {Object} fromPort the entryPort of the container requesting the |
49 | 54 | * wait. Used internally so that waits don't become cyclic |
50 | 55 | */ |
51 | 56 | async wait (port, threshold, fromPort) { |
52 | - let instance = this._runningContainers.get(port) | |
57 | + let instance = this._containerInstances.get(port) | |
53 | 58 | if (instance) { |
54 | 59 | return instance.wait(threshold, fromPort) |
55 | 60 | } else { |
56 | 61 | return threshold |
@@ -83,9 +88,9 @@ | ||
83 | 88 | Container: Container |
84 | 89 | }) |
85 | 90 | |
86 | 91 | // save the newly created instance |
87 | - this._runningContainers.set(entryPort, exoInterface) | |
92 | + this._containerInstances.set(entryPort, exoInterface) | |
88 | 93 | await exoInterface.start() |
89 | 94 | return exoInterface |
90 | 95 | } |
91 | 96 |
port.js | ||
---|---|---|
@@ -16,5 +16,9 @@ | ||
16 | 16 | |
17 | 17 | shift () { |
18 | 18 | return this._queue.shift() |
19 | 19 | } |
20 | + | |
21 | + get size () { | |
22 | + return this._queue.length | |
23 | + } | |
20 | 24 | } |
portManager.js | ||
---|---|---|
@@ -15,10 +15,10 @@ | ||
15 | 15 | return pairA |
16 | 16 | } |
17 | 17 | |
18 | 18 | // order by number of ticks if messages have different number of ticks |
19 | - if (a._fromPortTicks !== b._fromPortTicks) { | |
20 | - return a._fromPortTicks < b._fromPortTicks ? pairA : pairB | |
19 | + if (portA.ticks !== portB.ticks) { | |
20 | + return portA.ticks < portB.ticks ? pairA : pairB | |
21 | 21 | } else if (a.priority !== b.priority) { |
22 | 22 | // decide by priority |
23 | 23 | return a.priority > b.priority ? pairA : pairB |
24 | 24 | } else { |
@@ -118,9 +118,16 @@ | ||
118 | 118 | } |
119 | 119 | |
120 | 120 | async getNextMessage () { |
121 | 121 | if (this._portMap.size) { |
122 | - await this.wait(this.exoInterface.ticks) | |
122 | + // find the oldest message | |
123 | + const ticks = [...this._portMap].map(([name, port]) => { | |
124 | + return port.size ? port.ticks : this.exoInterface.ticks | |
125 | + }).reduce((ticksA, ticksB) => { | |
126 | + return ticksA < ticksB ? ticksA : ticksB | |
127 | + }) | |
128 | + | |
129 | + await this.wait(ticks) | |
123 | 130 | const portMap = [...this._portMap].reduce(messageArbiter) |
124 | 131 | return portMap[1].shift() |
125 | 132 | } |
126 | 133 | } |
tests/index.js | ||
---|---|---|
@@ -662,17 +662,17 @@ | ||
662 | 662 | const root = await hypervisor.createInstance('base') |
663 | 663 | let port = root.ports.create('base') |
664 | 664 | root.ports.bind(port, 'first') |
665 | 665 | |
666 | - const first = await root.getContainer(port) | |
666 | + const first = await root.getInstance(port) | |
667 | 667 | port = first.ports.create('base') |
668 | 668 | first.ports.bind(port, 'second') |
669 | 669 | |
670 | - const second = await first.getContainer(port) | |
670 | + const second = await first.getInstance(port) | |
671 | 671 | port = second.ports.create('base') |
672 | 672 | second.ports.bind(port, 'third') |
673 | 673 | |
674 | - const third = await second.getContainer(port) | |
675 | - const foundThird = await hypervisor.getByPath(root, 'first/second/third') | |
674 | + const third = await second.getInstance(port) | |
675 | + const foundThird = await hypervisor.getInstanceByPath(root, 'first/second/third') | |
676 | 676 | t.equals(third, foundThird, 'should find by path') |
677 | 677 | }) |
678 | 678 | }) |
Built with git-ssb-web