git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: b9423af2cd45a86722df9ab973b4db265a8fcaa3

Files: b9423af2cd45a86722df9ab973b4db265a8fcaa3 / scheduler.js

1683 bytesRaw
1const EventEmitter = require('events')
2const binarySearchInsert = require('binary-search-insert')
3
4// decides which message to go first
5function comparator (messageA, messageB) {
6 // order by number of ticks if messages have different number of ticks
7 if (messageA._fromTicks !== messageB._fromTicks) {
8 return messageA._fromTicks > messageB._fromTicks
9 } else {
10 return Buffer.compare(messageA._fromId.id, messageB._fromId.id)
11 }
12}
13
14module.exports = class Scheduler extends EventEmitter {
15 /**
16 * The Scheduler manages the actor instances and tracks how many "ticks" they
17 * have ran.
18 */
19 constructor (hypervisor) {
20 super()
21 this.hypervisor = hypervisor
22 this._messages = []
23 this._times = []
24 this.actors = new Map()
25 this.drivers = new Map()
26 this._running = false
27 }
28
29 queue (messages) {
30 messages.forEach(msg => binarySearchInsert(this._messages, comparator, msg))
31 if (!this._running) {
32 this._running = true
33 this._messageLoop()
34 }
35 }
36
37 async _messageLoop () {
38 while (this._messages.length) {
39 const message = this._messages.shift()
40 await this._processMessage(message)
41 }
42 this._running = false
43 const promises = []
44 this.actors.forEach(actor => promises.push(actor.shutdown()))
45 await Promise.all(promises)
46 this.actors.clear()
47 this.emit('idle')
48 }
49
50 async _processMessage (message) {
51 const to = message.funcRef.destId.id.toString('hex')
52 let actor = this.actors.get(to) || this.drivers.get(to)
53 if (!actor) {
54 actor = await this.hypervisor.loadActor(message.funcRef.destId)
55 this.actors.set(to, actor)
56 }
57 return actor.runMessage(message)
58 }
59}
60

Built with git-ssb-web