git ssb

0+

wanderer🌟 / js-primea-hypervisor



Commit 710c6eda4be5c2e63702e608a3b03af9c647ee2e

all tests passing agina

wanderer committed on 6/27/2017, 6:32:31 PM
Parent: 864bc27090fe788a4bf5a86c00246ca6e4391dd0

Files changed

exoInterface.jschanged
index.jschanged
portManager.jschanged
scheduler.jschanged
tests/index.jschanged
exoInterface.jsView
@@ -47,29 +47,51 @@
4747 // waits for the next message
4848 async _runNextMessage () {
4949 // check if the ports are saturated, if so we don't have to wait on the
5050 // scheduler
51- if (!this.ports.isSaturated()) {
52- await this.hypervisor.scheduler.wait(this.ticks, this.id)
53- }
51+ try {
52+ let message = this.ports.peekNextMessage()
53+ let saturated = this.ports.isSaturated()
54+ let oldestTime = this.hypervisor.scheduler.smallest()
5455
55- let message = this.ports.peekNextMessage()
56- if (message) {
57- if (this.ticks < message._fromTicks) {
56+ // this.hypervisor.scheduler.print()
57+ while (!saturated &&
58+ !((message && oldestTime >= message._fromTicks) ||
59+ (!message && (oldestTime === this.ticks || !this.hypervisor.scheduler._running.size)))) {
60+ const ticksToWait = message ? message._fromTicks : this.ticks
61+
62+ await Promise.race([
63+ this.hypervisor.scheduler.wait(ticksToWait, this.id).then(m => {
64+ // this.hypervisor.scheduler.print()
65+ message = this.ports.peekNextMessage()
66+ }),
67+ this.ports.olderMessage(message).then(m => {
68+ message = m
69+ }),
70+ this.ports.whenSaturated().then(() => {
71+ saturated = true
72+ })
73+ ])
74+
75+ oldestTime = this.hypervisor.scheduler.smallest()
76+ saturated = this.ports.isSaturated()
77+ }
78+
79+ if (!message) {
80+ // if no more messages then shut down
81+ this.hypervisor.scheduler.done(this)
82+ return
83+ }
84+
85+ message.fromPort.messages.shift()
86+ if (message._fromTicks > this.ticks) {
5887 this.ticks = message._fromTicks
59- // check for tie messages
60- this.hypervisor.scheduler.update(this)
61- if (!this.ports.isSaturated()) {
62- await this.hypervisor.scheduler.wait(this.ticks, this.id)
63- message = this.ports.peekNextMessage()
64- }
6588 }
66- message.fromPort.messages.shift()
89+ this.hypervisor.scheduler.update(this)
6790 // run the next message
6891 this.run(message)
69- } else {
70- // if no more messages then shut down
71- this.hypervisor.scheduler.done(this)
92+ } catch (e) {
93+ console.log(e)
7294 }
7395 }
7496
7597 /**
index.jsView
@@ -61,10 +61,10 @@
6161 id: id
6262 })
6363
6464 // save the newly created instance
65+ this.scheduler.releaseLock(lock)
6566 this.scheduler.update(exoInterface)
66- this.scheduler.releaseLock(lock)
6767 return exoInterface
6868 }
6969
7070 /**
portManager.jsView
@@ -36,8 +36,14 @@
3636 Object.assign(this, opts)
3737 this.ports = this.state.ports
3838 this._unboundPorts = new Set()
3939 this._waitingPorts = {}
40+ this._saturationPromise = new Promise((resolve, reject) => {
41+ this._saturationResolve = resolve
42+ })
43+ this._oldestMessagePromise = new Promise((resolve, reject) => {
44+ this._oldestMessageResolve = resolve
45+ })
4046 }
4147
4248 /**
4349 * binds a port to a name
@@ -129,10 +135,25 @@
129135 * @param {Message} message
130136 */
131137 queue (name, message) {
132138 if (name) {
133- this.ports[name].messages.push(message)
139+ const port = this.ports[name]
140+ if (port.messages.push(message) === 1 && message._fromTicks < this._messageTickThreshold) {
141+ message._fromPort = port
142+ message.fromName = name
143+ this._oldestMessageResolve(message)
144+ this._oldestMessagePromise = new Promise((resolve, reject) => {
145+ this._oldestMessageResolve = resolve
146+ })
147+ this._messageTickThreshold = Infinity
148+ }
134149 }
150+ if (this.isSaturated()) {
151+ this._saturationResolve()
152+ this._saturationPromise = new Promise((resolve, reject) => {
153+ this._saturationResolve = resolve
154+ })
155+ }
135156 }
136157
137158 /**
138159 * gets a port given it's name
@@ -199,8 +220,9 @@
199220 if (names.length) {
200221 const portName = names.reduce(messageArbiter.bind(this))
201222 const port = this.ports[portName]
202223 const message = port.messages[0]
224+
203225 if (message) {
204226 message._fromPort = port
205227 message.fromName = portName
206228 return message
@@ -212,7 +234,17 @@
212234 * tests wether or not all the ports have a message
213235 * @returns {boolean}
214236 */
215237 isSaturated () {
216- return Object.keys(this.ports).every(name => this.ports[name].messages.length)
238+ const keys = Object.keys(this.ports)
239+ return keys.length ? keys.every(name => this.ports[name].messages.length) : 0
217240 }
241+
242+ whenSaturated () {
243+ return this._saturationPromise
244+ }
245+
246+ olderMessage (message) {
247+ this._messageTickThreshold = message ? message._fromTicks : 0
248+ return this._oldestMessagePromise
249+ }
218250 }
scheduler.jsView
@@ -3,11 +3,16 @@
33 const comparator = function (a, b) {
44 return a.ticks - b.ticks
55 }
66
7+const instancesComparator = function (a, b) {
8+ return a[1].ticks - b[1].ticks
9+}
10+
711 module.exports = class Scheduler {
812 constructor () {
913 this._waits = []
14+ this._running = new Set()
1015 this.instances = new Map()
1116 this.locks = new Set()
1217 }
1318
@@ -21,58 +26,81 @@
2126 this.locks.delete(id)
2227 }
2328
2429 update (instance) {
30+ this._update(instance)
31+ this._checkWaits()
32+ }
33+
34+ _update (instance) {
35+ this._running.add(instance.id)
2536 this.instances.delete(instance.id)
2637 const instanceArray = [...this.instances]
27- binarySearchInsert(instanceArray, comparator, [instance.id, {
28- ticks: instance.ticks,
29- instance: instance
30- }])
38+ // console.log(instanceArray)
39+ binarySearchInsert(instanceArray, instancesComparator, [instance.id, instance])
3140 this.instances = new Map(instanceArray)
32- this._checkWaits()
3341 }
3442
3543 getInstance (id) {
36- const item = this.instances.get(id)
37- if (item) {
38- return item.instance
39- }
44+ return this.instances.get(id)
4045 }
4146
4247 done (instance) {
48+ this._running.delete(instance.id)
4349 this.instances.delete(instance.id)
4450 this._checkWaits()
4551 }
4652
47- wait (ticks = Infinity) {
53+ wait (ticks = Infinity, id) {
54+ this._running.delete(id)
4855 if (!this.locks.size && ticks <= this.smallest()) {
49- return
56+ return Promise.resolve()
5057 } else {
5158 return new Promise((resolve, reject) => {
5259 binarySearchInsert(this._waits, comparator, {
5360 ticks: ticks,
5461 resolve: resolve
5562 })
63+ this._checkWaits()
5664 })
5765 }
5866 }
5967
6068 smallest () {
61- return [...this.instances][0][1].ticks
69+ return this.instances.size ? [...this.instances][0][1].ticks : 0
6270 }
6371
6472 _checkWaits () {
6573 if (!this.locks.size) {
74+ // if there are no running containers
6675 if (!this.isRunning()) {
6776 // clear any remanding waits
6877 this._waits.forEach(wait => wait.resolve())
6978 this._waits = []
79+ } else if (!this._running.size) {
80+ const smallest = this._waits[0].ticks
81+ const toUpdate = []
82+ for (let instance of this.instances) {
83+ instance = instance[1]
84+ const ticks = instance.ticks
85+ if (ticks > smallest) {
86+ break
87+ } else {
88+ toUpdate.push(instance)
89+ }
90+ }
91+ toUpdate.forEach(instance => {
92+ instance.ticks = smallest
93+ this._update(instance)
94+ })
95+ this._checkWaits()
7096 } else {
7197 const smallest = this.smallest()
7298 for (const index in this._waits) {
7399 const wait = this._waits[index]
74100 if (wait.ticks <= smallest) {
101+ // this.print()
102+ // console.log('resolve', wait.ticks)
75103 wait.resolve()
76104 } else {
77105 this._waits.splice(0, index)
78106 break
tests/index.jsView
@@ -176,19 +176,21 @@
176176 t.plan(2)
177177 let runs = 0
178178
179179 class Root extends BaseContainer {
180- run (m) {
180+ async run (m) {
181181 if (!runs) {
182182 runs++
183183 const one = this.exInterface.ports.create('first')
184184 const two = this.exInterface.ports.create('second')
185185
186186 this.exInterface.ports.bind('one', one)
187187 this.exInterface.ports.bind('two', two)
188188
189- this.exInterface.send(one, this.exInterface.createMessage())
190- this.exInterface.send(two, this.exInterface.createMessage())
189+ await Promise.all([
190+ this.exInterface.send(one, this.exInterface.createMessage()),
191+ this.exInterface.send(two, this.exInterface.createMessage())
192+ ])
191193 } else if (runs === 1) {
192194 runs++
193195 t.equals(m.data, 'second', 'should recived the second message')
194196 } else if (runs === 2) {
@@ -199,16 +201,20 @@
199201
200202 class First extends BaseContainer {
201203 run (m) {
202204 this.exInterface.incrementTicks(2)
203- this.exInterface.send(m.fromPort, this.exInterface.createMessage({data: 'first'}))
205+ return this.exInterface.send(m.fromPort, this.exInterface.createMessage({
206+ data: 'first'
207+ }))
204208 }
205209 }
206210
207211 class Second extends BaseContainer {
208212 run (m) {
209213 this.exInterface.incrementTicks(1)
210- this.exInterface.send(m.fromPort, this.exInterface.createMessage({data: 'second'}))
214+ return this.exInterface.send(m.fromPort, this.exInterface.createMessage({
215+ data: 'second'
216+ }))
211217 }
212218 }
213219
214220 const hypervisor = new Hypervisor(node.dag)

Built with git-ssb-web