git ssb

0+

wanderer🌟 / js-primea-hypervisor



Commit 5a33099559b3af6b0c5c0e444a8d537471853b77

rewrite scheduler

Signed-off-by: wanderer <mjbecze@gmail.com>
wanderer committed on 1/23/2018, 7:23:32 PM
Parent: ff617aaaa70d98abc58ed432aac680e76748c148

Files changed

actor.jschanged
benchmark/index.jschanged
index.jschanged
package-lock.jsonchanged
package.jsonchanged
scheduler.jschanged
tests/index.jschanged
inbox.jsdeleted
actor.jsView
@@ -1,7 +1,6 @@
11 const Pipe = require('buffer-pipe')
22 const leb128 = require('leb128').unsigned
3-const Inbox = require('./inbox.js')
43
54 module.exports = class Actor {
65 /**
76 * the Actor manages the varous message passing functions and provides
@@ -14,12 +13,9 @@
1413 */
1514 constructor (opts) {
1615 Object.assign(this, opts)
1716
18- this.inbox = new Inbox({
19- actor: this,
20- hypervisor: opts.hypervisor
21- })
17+ this.inbox = []
2218
2319 this.ticks = 0
2420 this.running = false
2521 }
@@ -29,31 +25,28 @@
2925 * @param {string} portName
3026 * @param {object} message
3127 */
3228 queue (message) {
33- this.inbox.queue(message)
29+ this.inbox.push(message)
3430
3531 if (!this.running) {
3632 this.running = true
37- this._startMessageLoop()
33+ return this._startMessageLoop()
3834 }
3935 }
4036
4137 // waits for the next message
4238 async _startMessageLoop () {
4339 // this ensure we only every have one loop running at a time
44- while (!this.inbox.isEmpty) {
45- const message = await this.inbox.nextMessage()
40+ while (this.inbox.length) {
41+ const message = this.inbox.shift()
42+ if (message._fromTicks > this.ticks) {
43+ this.hypervisor.scheduler.update(this.ticks, message._fromTicks)
44+ this.ticks = message._fromTicks
45+ }
4646 await this.runMessage(message)
4747 }
4848 this.running = false
49- // wait for state ops to finish
50- await this.state.done()
51- setTimeout(() => {
52- if (!this.running) {
53- this.shutdown()
54- }
55- }, 0)
5649 }
5750
5851 serializeMetaData () {
5952 return Actor.serializeMetaData(this.type, this.nonce)
@@ -85,11 +78,11 @@
8578
8679 /**
8780 * Runs the shutdown routine for the actor
8881 */
89- shutdown () {
82+ async shutdown () {
83+ await this.state.done()
9084 this.state.root['/'][3] = this.serializeMetaData()
91- this.hypervisor.scheduler.done(this.id)
9285 }
9386
9487 /**
9588 * Runs the startup routine for the actor
@@ -110,17 +103,19 @@
110103 await this.instance.exports[message.funcRef.name](...message.funcArguments)
111104 } catch (e) {
112105 message.emit('execution:error', e)
113106 }
107+ message.emit('done')
114108 }
115109
116110 /**
117111 * updates the number of ticks that the actor has run
118112 * @param {Number} count - the number of ticks to add
119113 */
120114 incrementTicks (count) {
115+ const oldValue = this.ticks
121116 this.ticks += count
122- this.hypervisor.scheduler.update(this)
117+ this.hypervisor.scheduler.update(oldValue, this.ticks)
123118 }
124119
125120 /**
126121 * creates an actor
@@ -150,7 +145,7 @@
150145 send (message) {
151146 message._fromTicks = this.ticks
152147 message._fromId = this.id
153148
154- return this.hypervisor.send(message)
149+ this.hypervisor.scheduler.queue([message])
155150 }
156151 }
benchmark/index.jsView
@@ -89,12 +89,13 @@
8989
9090 let start = new Date()
9191 await Promise.all(msgs.map((msg) => hypervisor.send(msg)))
9292 console.log('done sending')
93- await hypervisor.scheduler.wait(Infinity)
94- const end = new Date() - start
95- console.info('Execution time: %dms', end)
96- console.log('messages processed', numOfActors * depth + numOfActors)
97- console.log('messages processed', numOfMsg)
93+ hypervisor.scheduler.on('idle', () => {
94+ const end = new Date() - start
95+ console.info('Execution time: %dms', end)
96+ console.log('messages processed', numOfActors * depth + numOfActors)
97+ console.log('messages processed', numOfMsg)
98+ })
9899 }
99100
100101 main(1000, 10)
index.jsView
@@ -8,9 +8,9 @@
88 * @param {Tree} tree - a [radix tree](https://github.com/dfinity/js-dfinity-radix-tree) to store the state
99 */
1010 constructor (tree, nonce = 0) {
1111 this.tree = tree
12- this.scheduler = new Scheduler()
12+ this.scheduler = new Scheduler(this)
1313 this._containerTypes = {}
1414 this.nonce = nonce
1515 }
1616
@@ -19,15 +19,16 @@
1919 * @param {Object} cap - the capabilitly used to send the message
2020 * @param {Object} message - the [message](https://github.com/primea/js-primea-message) to send
2121 * @returns {Promise} a promise that resolves once the receiving container is loaded
2222 */
23- async send (message) {
24- const id = message.funcRef.destId
25- const actor = await this.getActor(id)
26- actor.queue(message)
23+ send (messages) {
24+ if (!Array.isArray(messages)) {
25+ messages = [messages]
26+ }
27+ this.scheduler.queue(messages)
2728 }
2829
29- async _loadActor (id) {
30+ async loadActor (id) {
3031 const state = await this.tree.getSubTree(id)
3132 const {type, nonce} = Actor.deserializeMetaData(state.root['/'][3])
3233 const container = this._containerTypes[type]
3334
@@ -41,28 +42,12 @@
4142 type
4243 })
4344
4445 await actor.startup()
45- this.scheduler.update(actor)
4646 return actor
4747 }
4848
4949 /**
50- * gets an existsing actor
51- * @param {string} id - the actor's ID
52- * @returns {Promise}
53- */
54- async getActor (id) {
55- let actor = this.scheduler.getActor(id)
56- if (!actor) {
57- const resolve = this.scheduler.lock(id)
58- actor = await this._loadActor(id)
59- resolve(actor)
60- }
61- return actor
62- }
63-
64- /**
6550 * creates an instance of an Actor
6651 * @param {Integer} type - the type id for the container
6752 * @param {Object} message - an intial [message](https://github.com/primea/js-primea-message) to send newly created actor
6853 * @param {Object} id - the id for the actor
@@ -92,11 +77,14 @@
9277 * ticks
9378 * @param {Number} ticks the number of ticks at which to create the state root
9479 * @returns {Promise}
9580 */
96- async createStateRoot (ticks = Infinity) {
97- await this.scheduler.wait(ticks)
98- return this.tree.flush()
81+ createStateRoot () {
82+ return new Promise((resolve, reject) => {
83+ this.scheduler.on('idle', () => {
84+ this.tree.flush().then(resolve)
85+ })
86+ })
9987 }
10088
10189 /**
10290 * regirsters a container with the hypervisor
package-lock.jsonView
The diff is too large to show. Use a local git client to view these changes.
Old file size: 341658 bytes
New file size: 341063 bytes
package.jsonView
@@ -35,10 +35,8 @@
3535 "buffer-pipe": "0.0.2",
3636 "events": "^1.1.1",
3737 "leb128": "0.0.4",
3838 "safe-buffer": "^5.1.1",
39- "sortedmap": "0.0.1",
40- "typedarray-addition": "0.0.1",
4139 "wasm-json-toolkit": "^0.2.0"
4240 },
4341 "devDependencies": {
4442 "coveralls": "^3.0.0",
scheduler.jsView
@@ -1,149 +1,83 @@
1+const EventEmitter = require('events')
12 const binarySearchInsert = require('binary-search-insert')
2-const SortedMap = require('sortedmap')
3+// const bs = require('binary-search')
34
4-function comparator (a, b) {
5- return a.ticks - b.ticks
5+// decides which message to go first
6+function comparator (messageA, messageB) {
7+ // order by number of ticks if messages have different number of ticks
8+ if (messageA._fromTicks !== messageB._fromTicks) {
9+ return messageA._fromTicks > messageB._fromTicks
10+ } else {
11+ return Buffer.compare(messageA._fromId, messageB._fromId)
12+ }
613 }
714
8-module.exports = class Scheduler {
15+module.exports = class Scheduler extends EventEmitter {
916 /**
1017 * The Scheduler manages the actor instances and tracks how many "ticks" they
1118 * have ran.
1219 */
13- constructor () {
14- this._waits = []
15- this._running = new Set()
16- this.actors = new SortedMap(comparator)
20+ constructor (hypervisor) {
21+ super()
22+ this.hypervisor = hypervisor
23+ this._messages = []
24+ this._times = []
25+ this.actors = new Map()
26+ this._state = 'idle'
1727 }
1828
19- /**
20- * locks the scheduler from clearing waits untill the lock is resolved
21- * @param {string} id
22- * @return {function} the resolve function to call once it to unlock
23- */
24- lock (id) {
25- id = id.toString('hex')
26- let r
27- const p = new Promise((resolve, reject) => {
28- r = resolve
29- })
30- p.ticks = 0
31- this.actors.set(id, p)
32- this._running.add(id)
33- return r
34- }
35-
36- /**
37- * updates an actor with a new tick count
38- * @param {Object} actor - an actor instance
39- */
40- update (actor) {
41- this._update(actor)
42- this._running.add(actor.id.toString('hex'))
43- this._checkWaits()
44- }
45-
46- _update (actor) {
47- this.actors.delete(actor.id.toString('hex'))
48- this.actors.set(actor.id.toString('hex'), actor)
49- }
50-
51- /**
52- * returns an Actor instance
53- * @param {String} id
54- * @return {Object}
55- */
56- getActor (id) {
57- id = id.toString('hex')
58- return this.actors.get(id)
59- }
60-
61- /**
62- * deletes an actor from the scheduler
63- * @param {String} id - the containers id
64- */
65- done (id) {
66- id = id.toString('hex')
67- this._running.delete(id)
68- this.actors.delete(id)
69- this._checkWaits()
70- }
71-
72- /**
73- * returns a promise that resolves once all containers have reached the given
74- * number of ticks
75- * @param {interger} ticks - the number of ticks to wait
76- * @param {string} id - optional id of the container that is waiting
77- * @return {Promise}
78- */
79- wait (ticks, id) {
80- if (id) {
81- id = id.toString('hex')
82- this._running.delete(id)
29+ queue (messages) {
30+ messages.forEach(msg => binarySearchInsert(this._messages, comparator, msg))
31+ if (this._state === 'idle') {
32+ this._state = 'running'
33+ this._messageLoop()
8334 }
84-
85- return new Promise((resolve, reject) => {
86- binarySearchInsert(this._waits, comparator, {
87- ticks: ticks,
88- resolve: resolve,
89- id: id
90- })
91- this._checkWaits()
92- })
9335 }
9436
95- /**
96- * returns the oldest container's ticks
97- * @return {integer}
98- */
99- leastNumberOfTicks (exclude) {
100- let ticks = Infinity
101- for (const actor of this.actors) {
102- ticks = actor[1].ticks
103- if (actor[1].id !== exclude) {
104- return ticks
37+ async _messageLoop () {
38+ let waits = []
39+ while (this._messages.length) {
40+ const message = this._messages.shift()
41+ waits.push(this._processMessage(message))
42+ const oldestMessage = this._messages[0]
43+ if (!oldestMessage || oldestMessage._fromTicks !== message._fromTicks) {
44+ await Promise.all(waits)
45+ waits = []
10546 }
10647 }
48+ this._state = 'idle'
49+ const promises = []
50+ this.actors.forEach(actor => promises.push(actor.shutdown()))
51+ await Promise.all(promises)
52+ this.actors.clear()
53+ this.emit('idle')
54+ }
10755
108- return ticks
56+ // enable for concurrency
57+ update (oldTicks, ticks) {
58+ // const index = bs(this._times, oldTicks, (a, b) => a - b)
59+ // this._times.splice(index, 1)
60+ // binarySearchInsert(this._times, (a, b) => { return a - b }, ticks)
61+ // let oldestMessage = this._messages[0]
62+ // const oldestTime = this._times[0]
63+ // while (oldestMessage && oldestMessage._fromTicks < oldestTime) {
64+ // const message = this._messages.shift()
65+ // this._processMessage(message)
66+ // oldestMessage = this._messages[0]
67+ // }
10968 }
11069
111- // checks outstanding waits to see if they can be resolved
112- _checkWaits () {
113- // if there are no instances, clear any remaining waits
114- if (!this.actors.size) {
115- // console.log('here', this._waits)
116- this._waits.forEach(wait => wait.resolve())
117- this._waits = []
118- return
70+ async _processMessage (message) {
71+ const to = message.funcRef.destId.toString('hex')
72+ let actor = this.actors.get(to)
73+ if (!actor) {
74+ actor = await this.hypervisor.loadActor(message.funcRef.destId)
75+ this.actors.set(to, actor)
11976 }
120-
121- // find the oldest container, see if any of the waits can be resolved
122- while (this._waits[0]) {
123- const wait = this._waits[0]
124- const least = this.leastNumberOfTicks(wait.id)
125- if (wait.ticks <= least) {
126- this._waits.shift()
127- wait.resolve()
128- this._running.add(wait.id)
129- } else {
130- break
131- }
132- }
133-
134- // if there are no containers running find the oldest wait
135- // and update the oldest containers to its ticks
136- if (!this._running.size && this._waits.length) {
137- const oldest = this._waits[0].ticks
138- for (let actor of this.actors) {
139- actor = actor[1]
140- if (actor.ticks > oldest) {
141- break
142- }
143- actor.ticks = oldest
144- this._update(actor)
145- }
146- return this._checkWaits()
147- }
77+ const promise = new Promise((resolve, reject) => {
78+ message.on('done', resolve)
79+ })
80+ actor.queue(message)
81+ return promise
14882 }
14983 }
tests/index.jsView
@@ -89,9 +89,9 @@
8989 class testVMContainerB extends BaseContainer {
9090 static functions () {
9191 return {
9292 onMessage: (args) => {
93- t.true(args === 2, 'should recive a message')
93+ t.equals(args, 2, 'should recive a message')
9494 }
9595 }
9696 }
9797
@@ -105,14 +105,15 @@
105105 hypervisor.registerContainer(testVMContainerB)
106106
107107 const {exports: exportsB} = await hypervisor.createActor(testVMContainerB.typeId)
108108 const {exports: exportsA} = await hypervisor.createActor(testVMContainerA.typeId)
109+
109110 const message = new Message({
110111 funcRef: exportsA[0],
111- funcArguments: [exportsB[0]]
112+ funcArguments: exportsB
112113 })
113114
114- await hypervisor.send(message)
115+ hypervisor.send(message)
115116
116117 const stateRoot = await hypervisor.createStateRoot()
117118 t.deepEquals(stateRoot, expectedState, 'expected root!')
118119 })
@@ -230,9 +231,8 @@
230231 const message0 = new Message({
231232 funcRef: actorA0.exports[0],
232233 funcArguments: [actorB.exports[0]]
233234 })
234-
235235 const message1 = new Message({
236236 funcRef: actorA1.exports[0],
237237 funcArguments: actorB.exports
238238 })
@@ -504,8 +504,77 @@
504504 const stateRoot = await hypervisor.createStateRoot()
505505 t.deepEquals(stateRoot, expectedState, 'expected root!')
506506 })
507507
508+tape('async work', async t => {
509+ t.plan(3)
510+ const expectedState = {
511+ '/': Buffer.from('a4c7ceacd8c867ae1d0b472d8bffa3cb10048331', 'hex')
512+ }
513+
514+ const tree = new RadixTree({
515+ db: db
516+ })
517+
518+ class testVMContainerA extends BaseContainer {
519+ static functions (actor) {
520+ return {
521+ onMessage: (funcRef) => {
522+ const message = new Message({
523+ funcRef: funcRef,
524+ funcArguments: [2]
525+ })
526+ actor.send(message)
527+
528+ const message2 = new Message({
529+ funcRef: funcRef,
530+ funcArguments: [2]
531+ })
532+ actor.send(message2)
533+ actor.incrementTicks(1)
534+ return new Promise((resolve, reject) => {
535+ setTimeout(() => {
536+ resolve()
537+ }, 10)
538+ })
539+ }
540+ }
541+ }
542+ }
543+
544+ class testVMContainerB extends BaseContainer {
545+ static functions (actor) {
546+ return {
547+ onMessage: (args) => {
548+ actor.incrementTicks(1)
549+ t.equals(args, 2, 'should recive a message')
550+ }
551+ }
552+ }
553+
554+ static get typeId () {
555+ return 8
556+ }
557+ }
558+
559+ const hypervisor = new Hypervisor(tree)
560+ hypervisor.registerContainer(testVMContainerA)
561+ hypervisor.registerContainer(testVMContainerB)
562+
563+ const {exports: exportsB} = await hypervisor.createActor(testVMContainerB.typeId)
564+ const {exports: exportsA} = await hypervisor.createActor(testVMContainerA.typeId)
565+
566+ const message = new Message({
567+ funcRef: exportsA[0],
568+ funcArguments: exportsB
569+ })
570+
571+ hypervisor.send(message)
572+
573+ const stateRoot = await hypervisor.createStateRoot()
574+ t.deepEquals(stateRoot, expectedState, 'expected root!')
575+})
576+
508577 tape('random', async t => {
509578 const numOfActors = 10
510579 const depth = 10
511580 const messageOrder = {}
@@ -528,9 +597,9 @@
528597 messageOrder[actor.id.toString('hex')] = message._fromTicks
529598 numOfMsg++
530599 actor.incrementTicks(10)
531600 if (ref) {
532- return actor.send(new Message({
601+ actor.send(new Message({
533602 funcRef: ref,
534603 funcArguments: refs
535604 }))
536605 }
@@ -564,10 +633,10 @@
564633 })
565634 msgs.push(message)
566635 }
567636
568- msgs.forEach(msg => hypervisor.send(msg))
569- // console.log('here', numOfMsg)
570- await hypervisor.scheduler.wait(Infinity)
571- t.equals(numOfMsg, 110)
572- t.end()
637+ hypervisor.send(msgs)
638+ await hypervisor.scheduler.on('idle', () => {
639+ t.equals(numOfMsg, 110)
640+ t.end()
641+ })
573642 })
inbox.jsView
@@ -1,110 +1,0 @@
1-const Buffer = require('safe-buffer').Buffer
2-const binarySearchInsert = require('binary-search-insert')
3-
4-// decides which message to go first
5-function messageArbiter (messageA, messageB) {
6- // order by number of ticks if messages have different number of ticks
7- if (messageA._fromTicks !== messageB._fromTicks) {
8- return messageA._fromTicks > messageB._fromTicks
9- } else {
10- // sender id
11- // console.log('here')
12- return Buffer.compare(messageA._fromId, messageB._fromId)
13- }
14-}
15-
16-module.exports = class Inbox {
17- /**
18- * The inbox manages and sorts incoming messages and provides functions
19- * to wait on messages
20- * @param {Object} opts
21- * @param {Object} opts.state
22- * @param {Object} opts.hypervisor
23- */
24- constructor (opts) {
25- this.actor = opts.actor
26- this.hypervisor = opts.hypervisor
27- this._queue = []
28- this._oldestMessagePromise = new Promise((resolve, reject) => {
29- this._oldestMessageResolve = resolve
30- })
31- }
32-
33- get isEmpty () {
34- return !this._queue.length
35- }
36-
37- /**
38- * queues a message
39- * @param {Message} message
40- */
41- queue (message) {
42- this._queueMessage(message)
43-
44- const oldestMessage = this._getOldestMessage()
45- if (oldestMessage === message) {
46- this._oldestMessageResolve(message)
47- this._oldestMessagePromise = new Promise((resolve, reject) => {
48- this._oldestMessageResolve = resolve
49- })
50- }
51- }
52-
53- /**
54- * Waits for the the next message if any
55- * @param {Integer} timeout
56- * @returns {Promise}
57- */
58- async nextMessage () {
59- let message = this._getOldestMessage()
60- let timeout = message._fromTicks
61- let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
62-
63- while (true) {
64- // if all actors are "older" then the time out then stop waiting for messages
65- // since we konw that we can not receive one
66- if (oldestTime >= timeout) {
67- break
68- }
69-
70- await Promise.race([
71- this.hypervisor.scheduler.wait(timeout, this.actor.id).then(() => {
72- message = this._getOldestMessage()
73- }),
74- this._olderMessage(message).then(m => {
75- message = m
76- // if there is a message that is "older" then the timeout, the lower
77- // the timeout to the oldest message
78- timeout = message._fromTicks
79- })
80- ])
81- oldestTime = this.hypervisor.scheduler.leastNumberOfTicks(this.actor.id)
82- }
83- message = this._deQueueMessage()
84- // if the message we recived had more ticks then we currently have then
85- // update our ticks to it, since we jumped forward in time
86- if (message && message._fromTicks > this.actor.ticks) {
87- this.actor.ticks = message._fromTicks
88- this.hypervisor.scheduler.update(this.actor)
89- }
90- return message
91- }
92-
93- // returns a promise that resolve when a message older then the given message
94- // is recived
95- _olderMessage (message) {
96- return this._oldestMessagePromise
97- }
98-
99- _getOldestMessage () {
100- return this._queue[0]
101- }
102-
103- _deQueueMessage () {
104- return this._queue.shift()
105- }
106-
107- _queueMessage (message) {
108- binarySearchInsert(this._queue, messageArbiter, message)
109- }
110-}

Built with git-ssb-web