git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: dde59deea46cd600f319fa2ec8e88988386c3466

Files: dde59deea46cd600f319fa2ec8e88988386c3466 / scheduler.js

1677 bytesRaw
1const EventEmitter = require('events')
2const binarySearchInsert = require('binary-search-insert')
3
4// decides which message to go first
5function comparator (messageA, messageB) {
6 // order by number of ticks if messages have different number of ticks
7 if (messageA._fromTicks !== messageB._fromTicks) {
8 return messageA._fromTicks > messageB._fromTicks
9 } else {
10 return Buffer.compare(messageA._fromId.id, messageB._fromId.id)
11 }
12}
13
14module.exports = class Scheduler extends EventEmitter {
15 /**
16 * The Scheduler manages the actor instances and tracks how many "ticks" they
17 * have ran.
18 */
19 constructor (hypervisor) {
20 super()
21 this.hypervisor = hypervisor
22 this._messages = []
23 this._times = []
24 this.actors = new Map()
25 this.drivers = new Map()
26 this._running = false
27 }
28
29 queue (messages) {
30 messages.forEach(msg => binarySearchInsert(this._messages, comparator, msg))
31 if (!this._running) {
32 this._running = true
33 this._messageLoop()
34 }
35 }
36
37 async _messageLoop () {
38 while (this._messages.length) {
39 const message = this._messages.shift()
40 await this._processMessage(message)
41 }
42 this._running = false
43 const promises = []
44 this.actors.forEach(actor => promises.push(actor.shutdown()))
45 await Promise.all(promises)
46 this.actors.clear()
47 this.emit('idle')
48 }
49
50 async _processMessage (message) {
51 const to = message.funcRef.actorID.toString()
52 let actor = this.actors.get(to) || this.drivers.get(to)
53 if (!actor) {
54 actor = await this.hypervisor.loadActor(message.funcRef.actorID)
55 this.actors.set(to, actor)
56 }
57 return actor.runMessage(message)
58 }
59}
60

Built with git-ssb-web