git ssb

0+

wanderer🌟 / js-primea-hypervisor



Commit e8b4f99fd6e0dfe281b7a34b3472f07323a34c0d

100% code coverage

wanderer committed on 11/21/2017, 6:09:35 PM
Parent: ca90e13f859d9f1babea0d5f690290cf73a06138

Files changed

actor.jschanged
inbox.jschanged
index.jschanged
scheduler.jschanged
tests/index.jschanged
actor.jsView
@@ -1,9 +1,9 @@
11 const Message = require('primea-message')
22 const CapsManager = require('./capsManager.js')
33 const Inbox = require('./inbox.js')
44
5-module.exports = class Kernel {
5+module.exports = class Actor {
66 /**
77 * the Kernel manages the varous message passing functions and provides
88 * an interface for the containers to use
99 * @param {Object} opts
inbox.jsView
@@ -41,14 +41,16 @@
4141 throw new Error('already getting next message')
4242 }
4343
4444 this._waitingTags = new Set(tags)
45- this._queue.forEach(message => this._queueMessage(message))
45+ this._queue = this._queue.filter(message => !this._queueTaggedMessage(message))
4646
47+ // todo: add saturation test
4748 const message = await this.getNextMessage(timeout)
49+ delete this._waitingTags
4850 this._waitingTagsQueue.forEach(message => this._queueMessage(message))
51+ this._waitingTagsQueue = []
4952
50- delete this._waitingTags
5153 return message
5254 }
5355
5456 /**
@@ -60,10 +62,10 @@
6062 if (message === undefined && timeout === 0) {
6163 return
6264 }
6365
64- let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
6566 timeout += this.actor.ticks
67+ let oldestTime = this.hypervisor.scheduler.syncLeastNumberOfTicks(this.actor.id)
6668
6769 while (true) {
6870 if (message && message._fromTicks < timeout) {
6971 timeout = message._fromTicks
@@ -80,9 +82,9 @@
8082 this._olderMessage(message).then(m => {
8183 message = m
8284 })
8385 ])
84- oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
86+ oldestTime = this.hypervisor.scheduler.syncLeastNumberOfTicks(this.actor.id)
8587 }
8688 return this._deQueueMessage()
8789 }
8890
@@ -108,13 +110,20 @@
108110 }
109111 }
110112
111113 _queueMessage (message) {
112- if (this._waitingTags && this._waitingTags.has(message.tag)) {
114+ if (!(this._waitingTags && this._queueTaggedMessage(message))) {
115+ binarySearchInsert(this._queue, messageArbiter, message)
116+ }
117+ }
118+
119+ _queueTaggedMessage (message) {
120+ if (this._waitingTags.has(message.tag)) {
113121 this._waitingTags.delete(message.tag)
114122 binarySearchInsert(this._waitingTagsQueue, messageArbiter, message)
123+ return true
115124 } else {
116- binarySearchInsert(this._queue, messageArbiter, message)
125+ return false
117126 }
118127 }
119128 }
120129
index.jsView
@@ -15,9 +15,8 @@
1515 this.nonce = 0
1616 }
1717
1818 async send (cap, message) {
19- cap = await Promise.resolve(cap)
2019 const id = cap.destId
2120 const instance = await this.getInstance(id)
2221 instance.queue(message)
2322 }
scheduler.jsView
@@ -69,9 +69,9 @@
6969 * @param {interger} ticks - the number of ticks to wait
7070 * @param {string} id - optional id of the container that is waiting
7171 * @return {Promise}
7272 */
73- wait (ticks = Infinity, id) {
73+ wait (ticks, id) {
7474 this._running.delete(id)
7575 return new Promise((resolve, reject) => {
7676 binarySearchInsert(this._waits, comparator, {
7777 ticks: ticks,
@@ -89,9 +89,9 @@
8989 /**
9090 * returns the oldest container's ticks
9191 * @return {integer}
9292 */
93- leastNumberOfTicks (exculde) {
93+ syncLeastNumberOfTicks (exculde) {
9494 let ticks = 0
9595 for (const instance of this.instances) {
9696 ticks = instance[1].ticks
9797 if (instance[1].id !== exculde) {
@@ -101,44 +101,52 @@
101101 return ticks
102102 }
103103
104104 // checks outstanding waits to see if they can be resolved
105- _checkWaits () {
105+ async _checkWaits () {
106+ if (this._checkingWaits) {
107+ return
108+ } else {
109+ this._checkingWaits = true
110+ await [...this._loadingInstances.values()]
111+ }
106112 // if there are no running containers
107- if (!this._loadingInstances.size) {
108- if (!this.instances.size) {
109- // clear any remanding waits
110- this._waits.forEach(wait => wait.resolve())
111- this._waits = []
112- } else {
113- // find the old container and see if to can resolve any of the waits
114- while (this._waits[0]) {
115- const wait = this._waits[0]
116- const least = this.leastNumberOfTicks(wait.id)
117- if (wait.ticks <= least) {
118- this._waits.shift()
119- wait.resolve()
120- this._running.add(wait.id)
121- } else {
122- break
123- }
113+ if (!this.instances.size) {
114+ // clear any remanding waits
115+ this._waits.forEach(wait => wait.resolve())
116+ this._waits = []
117+ this._checkingWaits = false
118+ } else {
119+ // find the old container and see if to can resolve any of the waits
120+ while (this._waits[0]) {
121+ const wait = this._waits[0]
122+ const least = this.syncLeastNumberOfTicks(wait.id)
123+ if (wait.ticks <= least) {
124+ this._waits.shift()
125+ wait.resolve()
126+ this._running.add(wait.id)
127+ } else {
128+ break
124129 }
130+ }
125131
126- if (!this._running.size) {
127- // if there are no containers running find the oldest wait and update
128- // the oldest containers to it ticks
129- const oldest = this._waits[0].ticks
130- for (let instance of this.instances) {
131- instance = instance[1]
132- if (instance.ticks > oldest) {
133- break
134- } else {
135- instance.ticks = oldest
136- this._update(instance)
137- }
132+ if (!this._running.size && this._waits.length) {
133+ // if there are no containers running find the oldest wait and update
134+ // the oldest containers to it ticks
135+ const oldest = this._waits[0].ticks
136+ for (let instance of this.instances) {
137+ instance = instance[1]
138+ if (instance.ticks > oldest) {
139+ break
140+ } else {
141+ instance.ticks = oldest
142+ this._update(instance)
138143 }
139- return this._checkWaits()
140144 }
145+ this._checkingWaits = false
146+ return this._checkWaits()
147+ } else {
148+ this._checkingWaits = false
141149 }
142150 }
143151 }
144152 }
tests/index.jsView
@@ -15,9 +15,9 @@
1515 }
1616 }
1717
1818 tape('basic', async t => {
19- t.plan(3)
19+ t.plan(2)
2020 let message
2121 const expectedState = {
2222 '/': Buffer.from('70a9676b7995b108057bd29955e3874401aa5ba7', 'hex')
2323 }
@@ -41,10 +41,8 @@
4141 hypervisor.send(rootCap, message)
4242
4343 const stateRoot = await hypervisor.createStateRoot(Infinity)
4444 t.deepEquals(stateRoot, expectedState, 'expected root!')
45-
46- t.equals(hypervisor.scheduler.leastNumberOfTicks(), 0)
4745 })
4846
4947 tape('caps manager', async t => {
5048 const capsManager = new CapsManager({})
@@ -58,9 +56,9 @@
5856 t.end()
5957 })
6058
6159 tape('two communicating actors', async t => {
62- t.plan(3)
60+ t.plan(2)
6361 let message
6462 const expectedState = {
6563 '/': Buffer.from('fc935489953ed357f06171dd23439d83190b3a1b', 'hex')
6664 }
@@ -97,13 +95,12 @@
9795
9896 const stateRoot = await hypervisor.createStateRoot(Infinity)
9997
10098 t.deepEquals(stateRoot, expectedState, 'expected root!')
101- t.equals(hypervisor.scheduler.leastNumberOfTicks(), 0)
10299 })
103100
104101 tape('three communicating actors', async t => {
105- t.plan(4)
102+ t.plan(3)
106103 let message
107104 const expectedState = {
108105 '/': Buffer.from('24855a8efa9af536f0f9b319c05b10d6b7cae6c8', 'hex')
109106 }
@@ -144,13 +141,12 @@
144141
145142 const stateRoot = await hypervisor.createStateRoot(Infinity)
146143
147144 t.deepEquals(stateRoot, expectedState, 'expected root!')
148- t.equals(hypervisor.scheduler.leastNumberOfTicks(), 0)
149145 })
150146
151147 tape('three communicating actors, with tick counting', async t => {
152- t.plan(4)
148+ t.plan(3)
153149 let message
154150 const expectedState = {
155151 '/': Buffer.from('24855a8efa9af536f0f9b319c05b10d6b7cae6c8', 'hex')
156152 }
@@ -198,13 +194,12 @@
198194
199195 const stateRoot = await hypervisor.createStateRoot(Infinity)
200196
201197 t.deepEquals(stateRoot, expectedState, 'expected root!')
202- t.equals(hypervisor.scheduler.leastNumberOfTicks(), 0)
203198 })
204199
205200 tape('response caps', async t => {
206- t.plan(4)
201+ t.plan(3)
207202 let message
208203 const expectedState = {
209204 '/': Buffer.from('fc935489953ed357f06171dd23439d83190b3a1b', 'hex')
210205 }
@@ -246,13 +241,12 @@
246241
247242 const stateRoot = await hypervisor.createStateRoot(Infinity)
248243
249244 t.deepEquals(stateRoot, expectedState, 'expected root!')
250- t.equals(hypervisor.scheduler.leastNumberOfTicks(), 0)
251245 })
252246
253247 tape('response caps for errors', async t => {
254- t.plan(4)
248+ t.plan(3)
255249 let message
256250 const expectedState = {
257251 '/': Buffer.from('fc935489953ed357f06171dd23439d83190b3a1b', 'hex')
258252 }
@@ -295,13 +289,12 @@
295289
296290 const stateRoot = await hypervisor.createStateRoot(Infinity)
297291
298292 t.deepEquals(stateRoot, expectedState, 'expected root!')
299- t.equals(hypervisor.scheduler.leastNumberOfTicks(), 0)
300293 })
301294
302295 tape('actor creation', async t => {
303- t.plan(3)
296+ t.plan(2)
304297 let message
305298 const expectedState = {
306299 '/': Buffer.from('8e809b10d473ef4592dc5c1683e89bc7001e5e3e', 'hex')
307300 }
@@ -328,11 +321,8 @@
328321 const cap = m.caps[0]
329322 return this.kernel.send(cap, new Message())
330323 }
331324
332- onMessage (m) {
333- }
334-
335325 static get typeId () {
336326 return 8
337327 }
338328 }
@@ -345,13 +335,12 @@
345335
346336 const stateRoot = await hypervisor.createStateRoot(Infinity)
347337
348338 t.deepEquals(stateRoot, expectedState, 'expected root!')
349- t.equals(hypervisor.scheduler.leastNumberOfTicks(), 0)
350339 })
351340
352341 tape('simple message arbiter test', async t => {
353- t.plan(5)
342+ t.plan(4)
354343 const expectedState = {
355344 '/': Buffer.from('fc935489953ed357f06171dd23439d83190b3a1b', 'hex')
356345 }
357346
@@ -385,9 +374,9 @@
385374 if (recMsg === 0) {
386375 t.equal(m.data, 'first', 'should recive fist message')
387376 } else if (recMsg === 1) {
388377 t.equal(m.data, 'second', 'should recive second message')
389- } else if (recMsg === 2) {
378+ } else {
390379 t.equal(m.data, 'third', 'should recive third message')
391380 }
392381 recMsg++
393382 }
@@ -408,13 +397,12 @@
408397
409398 const stateRoot = await hypervisor.createStateRoot(Infinity)
410399
411400 t.deepEquals(stateRoot, expectedState, 'expected root!')
412- t.equals(hypervisor.scheduler.leastNumberOfTicks(), 0)
413401 })
414402
415403 tape('arbiter test for id comparision', async t => {
416- t.plan(5)
404+ t.plan(4)
417405 let message
418406 const expectedState = {
419407 '/': Buffer.from('0866fe6a6adaf28c51ce99ddfddd49c492e9ce48', 'hex')
420408 }
@@ -439,9 +427,9 @@
439427 if (recMsg === 0) {
440428 t.equal(m.data, 'first', 'should recive fist message')
441429 } else if (recMsg === 1) {
442430 t.equal(m.data, 'second', 'should recive second message')
443- } else if (recMsg === 2) {
431+ } else {
444432 t.equal(m.data, 'third', 'should recive third message')
445433 }
446434 recMsg++
447435
@@ -475,13 +463,12 @@
475463
476464 const stateRoot = await hypervisor.createStateRoot(Infinity)
477465
478466 t.deepEquals(stateRoot, expectedState, 'expected root!')
479- t.equals(hypervisor.scheduler.leastNumberOfTicks(), 0)
480467 })
481468
482469 tape('basic tagged caps', async t => {
483- t.plan(5)
470+ t.plan(4)
484471 const expectedState = {
485472 '/': Buffer.from('ef403643f292108fe9edc1700d80a7bf2402e7a0', 'hex')
486473 }
487474
@@ -522,6 +509,312 @@
522509
523510 const stateRoot = await hypervisor.createStateRoot(Infinity)
524511
525512 t.deepEquals(stateRoot, expectedState, 'expected root!')
526- t.equals(hypervisor.scheduler.leastNumberOfTicks(), 0)
527513 })
514+
515+tape('trying to listen for caps more then once', async t => {
516+ t.plan(4)
517+ const expectedState = {
518+ '/': Buffer.from('ef403643f292108fe9edc1700d80a7bf2402e7a0', 'hex')
519+ }
520+
521+ const tree = new RadixTree({
522+ db: db
523+ })
524+
525+ class testVMContainerA extends BaseContainer {
526+ async onMessage (m) {
527+ t.true(m, 'should recive first message')
528+ const rCap = this.kernel.mintCap(1)
529+ const message = new Message({data: 'first'})
530+ message.responseCap = rCap
531+ await this.kernel.send(m.caps[0], message)
532+ const promise = this.kernel.inbox.waitOnTag([1], 44)
533+ try {
534+ await this.kernel.inbox.waitOnTag([1], 44)
535+ } catch (e) {
536+ t.true(e, 'should error if waiting twice')
537+ }
538+ return promise
539+ }
540+ }
541+
542+ class testVMContainerB extends BaseContainer {
543+ onMessage (m) {
544+ t.true(m, 'should recive a message')
545+ }
546+
547+ static get typeId () {
548+ return 8
549+ }
550+ }
551+
552+ const hypervisor = new Hypervisor(tree)
553+ hypervisor.registerContainer(testVMContainerA)
554+ hypervisor.registerContainer(testVMContainerB)
555+
556+ let capA = await hypervisor.createInstance(testVMContainerA.typeId, new Message())
557+ let capB = await hypervisor.createInstance(testVMContainerB.typeId, new Message())
558+
559+ await hypervisor.send(capA, new Message({caps: [capB]}))
560+
561+ const stateRoot = await hypervisor.createStateRoot(Infinity)
562+
563+ t.deepEquals(stateRoot, expectedState, 'expected root!')
564+})
565+
566+tape('multple messages to restore on waiting for tags', async t => {
567+ t.plan(6)
568+ const expectedState = {
569+ '/': Buffer.from('c2bbd78ee38ecf417f857451bdb06cdba1345b22', 'hex')
570+ }
571+
572+ const tree = new RadixTree({
573+ db: db
574+ })
575+
576+ class testVMContainerA extends BaseContainer {
577+ async onMessage (m) {
578+ t.true(m, 'should recive first message')
579+ if (m.caps.length) {
580+ const cap1 = this.kernel.mintCap(1)
581+ const cap2 = this.kernel.mintCap(2)
582+ const message1 = new Message({
583+ data: 'first'
584+ })
585+ const message2 = new Message({
586+ data: 'second'
587+ })
588+ message1.caps.push(cap1)
589+ message2.caps.push(cap2)
590+ await Promise.all([
591+ this.kernel.send(m.caps[0], message1),
592+ this.kernel.send(m.caps[1], message2)
593+ ])
594+ const rMessage = await this.kernel.inbox.waitOnTag([1, 2], 44)
595+ t.true(rMessage, 'should recive a response message')
596+ }
597+ }
598+ }
599+
600+ class testVMContainerB extends BaseContainer {
601+ async onMessage (m) {
602+ t.true(m, 'should recive a message')
603+ const cap = m.caps[0]
604+ this.kernel.incrementTicks(1)
605+ await this.kernel.send(cap, new Message({data: m.data}))
606+
607+ return new Promise((resolve, reject) => {
608+ setTimeout(resolve, 200)
609+ })
610+ }
611+
612+ static get typeId () {
613+ return 8
614+ }
615+ }
616+
617+ const hypervisor = new Hypervisor(tree)
618+ hypervisor.registerContainer(testVMContainerA)
619+ hypervisor.registerContainer(testVMContainerB)
620+
621+ let capA = await hypervisor.createInstance(testVMContainerA.typeId, new Message())
622+ let capB1 = await hypervisor.createInstance(testVMContainerB.typeId, new Message())
623+ let capB2 = await hypervisor.createInstance(testVMContainerB.typeId, new Message())
624+
625+ await hypervisor.send(capA, new Message({caps: [capB1, capB2]}))
626+
627+ const stateRoot = await hypervisor.createStateRoot(Infinity)
628+
629+ t.deepEquals(stateRoot, expectedState, 'expected root!')
630+})
631+
632+tape('multple messages to backup on waiting for tags', async t => {
633+ t.plan(6)
634+ const expectedState = {
635+ '/': Buffer.from('c2bbd78ee38ecf417f857451bdb06cdba1345b22', 'hex')
636+ }
637+
638+ const tree = new RadixTree({
639+ db: db
640+ })
641+
642+ class testVMContainerA extends BaseContainer {
643+ async onMessage (m) {
644+ t.true(m, 'should recive first message')
645+ if (m.caps.length) {
646+ const cap1 = this.kernel.mintCap(1)
647+ const cap2 = this.kernel.mintCap(2)
648+ const message1 = new Message({
649+ data: 'first'
650+ })
651+ const message2 = new Message({
652+ data: 'second'
653+ })
654+ message1.caps.push(cap1)
655+ message2.caps.push(cap2)
656+ await this.kernel.send(m.caps[0], message1)
657+ await this.kernel.send(m.caps[1], message2)
658+ const rMessage = await this.kernel.inbox.waitOnTag([1, 2], 44)
659+ t.true(rMessage, 'should recive a response message')
660+ }
661+ }
662+ }
663+
664+ class testVMContainerB extends BaseContainer {
665+ async onMessage (m) {
666+ t.true(m, 'should recive a message')
667+ const cap = m.caps[0]
668+ this.kernel.incrementTicks(1)
669+ await this.kernel.send(cap, new Message({data: m.data}))
670+
671+ return new Promise((resolve, reject) => {
672+ setTimeout(resolve, 200)
673+ })
674+ }
675+
676+ static get typeId () {
677+ return 8
678+ }
679+ }
680+
681+ const hypervisor = new Hypervisor(tree)
682+ hypervisor.registerContainer(testVMContainerA)
683+ hypervisor.registerContainer(testVMContainerB)
684+
685+ let capA = await hypervisor.createInstance(testVMContainerA.typeId, new Message())
686+ let capB1 = await hypervisor.createInstance(testVMContainerB.typeId, new Message())
687+ let capB2 = await hypervisor.createInstance(testVMContainerB.typeId, new Message())
688+
689+ await hypervisor.send(capA, new Message({caps: [capB1, capB2]}))
690+
691+ const stateRoot = await hypervisor.createStateRoot(Infinity)
692+
693+ t.deepEquals(stateRoot, expectedState, 'expected root!')
694+})
695+
696+tape('multple messages, but single tag', async t => {
697+ t.plan(6)
698+ const expectedState = {
699+ '/': Buffer.from('c2bbd78ee38ecf417f857451bdb06cdba1345b22', 'hex')
700+ }
701+
702+ const tree = new RadixTree({
703+ db: db
704+ })
705+
706+ class testVMContainerA extends BaseContainer {
707+ async onMessage (m) {
708+ t.true(m, 'should recive first message')
709+ if (m.caps.length) {
710+ const cap1 = this.kernel.mintCap(1)
711+ const cap2 = this.kernel.mintCap(2)
712+ const message1 = new Message({
713+ data: 'first'
714+ })
715+ const message2 = new Message({
716+ data: 'second'
717+ })
718+ message1.caps.push(cap1)
719+ message2.caps.push(cap2)
720+ await this.kernel.send(m.caps[0], message1)
721+ await this.kernel.send(m.caps[1], message2)
722+ const rMessage = await this.kernel.inbox.waitOnTag([2], 44)
723+ t.true(rMessage, 'should recive a response message')
724+ }
725+ }
726+ }
727+
728+ class testVMContainerB extends BaseContainer {
729+ async onMessage (m) {
730+ t.true(m, 'should recive a message')
731+ const cap = m.caps[0]
732+ this.kernel.incrementTicks(1)
733+ await this.kernel.send(cap, new Message({data: m.data}))
734+
735+ return new Promise((resolve, reject) => {
736+ setTimeout(resolve, 200)
737+ })
738+ }
739+
740+ static get typeId () {
741+ return 8
742+ }
743+ }
744+
745+ const hypervisor = new Hypervisor(tree)
746+ hypervisor.registerContainer(testVMContainerA)
747+ hypervisor.registerContainer(testVMContainerB)
748+
749+ let capA = await hypervisor.createInstance(testVMContainerA.typeId, new Message())
750+ let capB1 = await hypervisor.createInstance(testVMContainerB.typeId, new Message())
751+ let capB2 = await hypervisor.createInstance(testVMContainerB.typeId, new Message())
752+
753+ await hypervisor.send(capA, new Message({caps: [capB1, capB2]}))
754+
755+ const stateRoot = await hypervisor.createStateRoot(Infinity)
756+
757+ t.deepEquals(stateRoot, expectedState, 'expected root!')
758+})
759+
760+tape('deadlock test', async t => {
761+ t.plan(7)
762+ const expectedState = {
763+ '/': Buffer.from('2e8658dc2f616599b4fa622318b86ad6ed809db0', 'hex')
764+ }
765+
766+ const tree = new RadixTree({
767+ db: db
768+ })
769+
770+ class testVMContainerA extends BaseContainer {
771+ async onMessage (m) {
772+ t.true(m, 'should recive first message 1')
773+ const rMessage = await this.kernel.inbox.waitOnTag([1], 50)
774+ t.equals(rMessage, undefined, 'should recive a response message 1')
775+ }
776+ }
777+
778+ class testVMContainerB extends BaseContainer {
779+ async onMessage (m) {
780+ t.true(m, 'should recive first message 2')
781+ this.kernel.incrementTicks(47)
782+ const rMessage = await this.kernel.inbox.waitOnTag([1], 1)
783+ t.equals(rMessage, undefined, 'should recive a response message 2')
784+ }
785+
786+ static get typeId () {
787+ return 8
788+ }
789+ }
790+
791+ class testVMContainerC extends BaseContainer {
792+ async onMessage (m) {
793+ t.true(m, 'should recive first message 3')
794+ this.kernel.incrementTicks(45)
795+ const rMessage = await this.kernel.inbox.waitOnTag([1], 1)
796+ t.equals(rMessage, undefined, 'should recive a response message 3')
797+ }
798+
799+ static get typeId () {
800+ return 7
801+ }
802+ }
803+
804+ const hypervisor = new Hypervisor(tree)
805+ hypervisor.registerContainer(testVMContainerA)
806+ hypervisor.registerContainer(testVMContainerB)
807+ hypervisor.registerContainer(testVMContainerC)
808+
809+ let capA = await hypervisor.createInstance(testVMContainerA.typeId, new Message())
810+ let capB = await hypervisor.createInstance(testVMContainerB.typeId, new Message())
811+ let capC = await hypervisor.createInstance(testVMContainerC.typeId, new Message())
812+
813+ await Promise.all([
814+ hypervisor.send(capA, new Message()),
815+ hypervisor.send(capB, new Message()),
816+ hypervisor.send(capC, new Message())
817+ ])
818+ const stateRoot = await hypervisor.createStateRoot(Infinity)
819+ t.deepEquals(stateRoot, expectedState, 'expected root!')
820+})

Built with git-ssb-web