git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: c779344f6653a07ed5e2695bca74c9cbd0c66a19

Files: c779344f6653a07ed5e2695bca74c9cbd0c66a19 / scheduler.js

1786 bytesRaw
1const EventEmitter = require('events')
2const binarySearchInsert = require('binary-search-insert')
3
4// decides which message to go first
5function comparator (messageA, messageB) {
6 // order by number of ticks if messages have different number of ticks
7 if (messageA._fromTicks !== messageB._fromTicks) {
8 return messageA._fromTicks > messageB._fromTicks
9 } else {
10 return Buffer.compare(messageA._fromId, messageB._fromId)
11 }
12}
13
14module.exports = class Scheduler extends EventEmitter {
15 /**
16 * The Scheduler manages the actor instances and tracks how many "ticks" they
17 * have ran.
18 */
19 constructor (hypervisor) {
20 super()
21 this.hypervisor = hypervisor
22 this._messages = []
23 this._times = []
24 this.actors = new Map()
25 this._running = false
26 }
27
28 queue (messages) {
29 messages.forEach(msg => binarySearchInsert(this._messages, comparator, msg))
30 if (!this._running) {
31 this._running = true
32 this._messageLoop()
33 }
34 }
35
36 async _messageLoop () {
37 while (this._messages.length) {
38 const message = this._messages.shift()
39 await this._processMessage(message)
40 }
41 this._running = false
42 const promises = []
43 this.actors.forEach(actor => promises.push(actor.shutdown()))
44 await Promise.all(promises)
45 this.actors.clear()
46 this.emit('idle')
47 }
48
49 // enable for concurrency
50 update (oldTicks, ticks) {
51 }
52
53 async _processMessage (message) {
54 const to = message.funcRef.destId.toString('hex')
55 let actor = this.actors.get(to)
56 if (!actor) {
57 actor = await this.hypervisor.loadActor(message.funcRef.destId)
58 this.actors.set(to, actor)
59 }
60 const promise = new Promise((resolve, reject) => {
61 message.on('done', resolve)
62 })
63 actor.queue(message)
64 return promise
65 }
66}
67

Built with git-ssb-web