git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 5a33099559b3af6b0c5c0e444a8d537471853b77

Files: 5a33099559b3af6b0c5c0e444a8d537471853b77 / scheduler.js

2515 bytesRaw
1const EventEmitter = require('events')
2const binarySearchInsert = require('binary-search-insert')
3// const bs = require('binary-search')
4
5// decides which message to go first
6function comparator (messageA, messageB) {
7 // order by number of ticks if messages have different number of ticks
8 if (messageA._fromTicks !== messageB._fromTicks) {
9 return messageA._fromTicks > messageB._fromTicks
10 } else {
11 return Buffer.compare(messageA._fromId, messageB._fromId)
12 }
13}
14
15module.exports = class Scheduler extends EventEmitter {
16 /**
17 * The Scheduler manages the actor instances and tracks how many "ticks" they
18 * have ran.
19 */
20 constructor (hypervisor) {
21 super()
22 this.hypervisor = hypervisor
23 this._messages = []
24 this._times = []
25 this.actors = new Map()
26 this._state = 'idle'
27 }
28
29 queue (messages) {
30 messages.forEach(msg => binarySearchInsert(this._messages, comparator, msg))
31 if (this._state === 'idle') {
32 this._state = 'running'
33 this._messageLoop()
34 }
35 }
36
37 async _messageLoop () {
38 let waits = []
39 while (this._messages.length) {
40 const message = this._messages.shift()
41 waits.push(this._processMessage(message))
42 const oldestMessage = this._messages[0]
43 if (!oldestMessage || oldestMessage._fromTicks !== message._fromTicks) {
44 await Promise.all(waits)
45 waits = []
46 }
47 }
48 this._state = 'idle'
49 const promises = []
50 this.actors.forEach(actor => promises.push(actor.shutdown()))
51 await Promise.all(promises)
52 this.actors.clear()
53 this.emit('idle')
54 }
55
56 // enable for concurrency
57 update (oldTicks, ticks) {
58 // const index = bs(this._times, oldTicks, (a, b) => a - b)
59 // this._times.splice(index, 1)
60 // binarySearchInsert(this._times, (a, b) => { return a - b }, ticks)
61 // let oldestMessage = this._messages[0]
62 // const oldestTime = this._times[0]
63 // while (oldestMessage && oldestMessage._fromTicks < oldestTime) {
64 // const message = this._messages.shift()
65 // this._processMessage(message)
66 // oldestMessage = this._messages[0]
67 // }
68 }
69
70 async _processMessage (message) {
71 const to = message.funcRef.destId.toString('hex')
72 let actor = this.actors.get(to)
73 if (!actor) {
74 actor = await this.hypervisor.loadActor(message.funcRef.destId)
75 this.actors.set(to, actor)
76 }
77 const promise = new Promise((resolve, reject) => {
78 message.on('done', resolve)
79 })
80 actor.queue(message)
81 return promise
82 }
83}
84

Built with git-ssb-web