Files: b9423af2cd45a86722df9ab973b4db265a8fcaa3 / scheduler.js
1683 bytesRaw
1 | const EventEmitter = require('events') |
2 | const binarySearchInsert = require('binary-search-insert') |
3 | |
4 | // decides which message to go first |
5 | function comparator (messageA, messageB) { |
6 | // order by number of ticks if messages have different number of ticks |
7 | if (messageA._fromTicks !== messageB._fromTicks) { |
8 | return messageA._fromTicks > messageB._fromTicks |
9 | } else { |
10 | return Buffer.compare(messageA._fromId.id, messageB._fromId.id) |
11 | } |
12 | } |
13 | |
14 | module.exports = class Scheduler extends EventEmitter { |
15 | /** |
16 | * The Scheduler manages the actor instances and tracks how many "ticks" they |
17 | * have ran. |
18 | */ |
19 | constructor (hypervisor) { |
20 | super() |
21 | this.hypervisor = hypervisor |
22 | this._messages = [] |
23 | this._times = [] |
24 | this.actors = new Map() |
25 | this.drivers = new Map() |
26 | this._running = false |
27 | } |
28 | |
29 | queue (messages) { |
30 | messages.forEach(msg => binarySearchInsert(this._messages, comparator, msg)) |
31 | if (!this._running) { |
32 | this._running = true |
33 | this._messageLoop() |
34 | } |
35 | } |
36 | |
37 | async _messageLoop () { |
38 | while (this._messages.length) { |
39 | const message = this._messages.shift() |
40 | await this._processMessage(message) |
41 | } |
42 | this._running = false |
43 | const promises = [] |
44 | this.actors.forEach(actor => promises.push(actor.shutdown())) |
45 | await Promise.all(promises) |
46 | this.actors.clear() |
47 | this.emit('idle') |
48 | } |
49 | |
50 | async _processMessage (message) { |
51 | const to = message.funcRef.destId.id.toString('hex') |
52 | let actor = this.actors.get(to) || this.drivers.get(to) |
53 | if (!actor) { |
54 | actor = await this.hypervisor.loadActor(message.funcRef.destId) |
55 | this.actors.set(to, actor) |
56 | } |
57 | return actor.runMessage(message) |
58 | } |
59 | } |
60 |
Built with git-ssb-web