git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 8cd3e3af7c391f083f5f83593a260f831c7083af

Files: 8cd3e3af7c391f083f5f83593a260f831c7083af / scheduler.js

1630 bytesRaw
1const EventEmitter = require('events')
2const binarySearchInsert = require('binary-search-insert')
3
4// decides which message to go first
5function comparator (messageA, messageB) {
6 // order by number of ticks if messages have different number of ticks
7 if (messageA._fromTicks !== messageB._fromTicks) {
8 return messageA._fromTicks > messageB._fromTicks
9 } else {
10 return Buffer.compare(messageA._fromId.id, messageB._fromId.id)
11 }
12}
13
14module.exports = class Scheduler extends EventEmitter {
15 /**
16 * The Scheduler manages the actor instances and tracks how many "ticks" they
17 * have ran.
18 */
19 constructor (hypervisor) {
20 super()
21 this.hypervisor = hypervisor
22 this._messages = []
23 this._times = []
24 this.actors = new Map()
25 this._running = false
26 }
27
28 queue (messages) {
29 messages.forEach(msg => binarySearchInsert(this._messages, comparator, msg))
30 if (!this._running) {
31 this._running = true
32 this._messageLoop()
33 }
34 }
35
36 async _messageLoop () {
37 while (this._messages.length) {
38 const message = this._messages.shift()
39 await this._processMessage(message)
40 }
41 this._running = false
42 const promises = []
43 this.actors.forEach(actor => promises.push(actor.shutdown()))
44 await Promise.all(promises)
45 this.actors.clear()
46 this.emit('idle')
47 }
48
49 async _processMessage (message) {
50 const to = message.funcRef.destId.id.toString('hex')
51 let actor = this.actors.get(to)
52 if (!actor) {
53 actor = await this.hypervisor.loadActor(message.funcRef.destId)
54 this.actors.set(to, actor)
55 }
56 return actor.runMessage(message)
57 }
58}
59

Built with git-ssb-web