Files: 5013b5ed5b183c9bd021b9b9b20f7df558b0a664 / scheduler.js
1630 bytesRaw
1 | const EventEmitter = require('events') |
2 | const binarySearchInsert = require('binary-search-insert') |
3 | |
4 | // decides which message to go first |
5 | function comparator (messageA, messageB) { |
6 | // order by number of ticks if messages have different number of ticks |
7 | if (messageA._fromTicks !== messageB._fromTicks) { |
8 | return messageA._fromTicks > messageB._fromTicks |
9 | } else { |
10 | return Buffer.compare(messageA._fromId.id, messageB._fromId.id) |
11 | } |
12 | } |
13 | |
14 | module.exports = class Scheduler extends EventEmitter { |
15 | /** |
16 | * The Scheduler manages the actor instances and tracks how many "ticks" they |
17 | * have ran. |
18 | */ |
19 | constructor (hypervisor) { |
20 | super() |
21 | this.hypervisor = hypervisor |
22 | this._messages = [] |
23 | this._times = [] |
24 | this.actors = new Map() |
25 | this._running = false |
26 | } |
27 | |
28 | queue (messages) { |
29 | messages.forEach(msg => binarySearchInsert(this._messages, comparator, msg)) |
30 | if (!this._running) { |
31 | this._running = true |
32 | this._messageLoop() |
33 | } |
34 | } |
35 | |
36 | async _messageLoop () { |
37 | while (this._messages.length) { |
38 | const message = this._messages.shift() |
39 | await this._processMessage(message) |
40 | } |
41 | this._running = false |
42 | const promises = [] |
43 | this.actors.forEach(actor => promises.push(actor.shutdown())) |
44 | await Promise.all(promises) |
45 | this.actors.clear() |
46 | this.emit('idle') |
47 | } |
48 | |
49 | async _processMessage (message) { |
50 | const to = message.funcRef.destId.id.toString('hex') |
51 | let actor = this.actors.get(to) |
52 | if (!actor) { |
53 | actor = await this.hypervisor.loadActor(message.funcRef.destId) |
54 | this.actors.set(to, actor) |
55 | } |
56 | return actor.runMessage(message) |
57 | } |
58 | } |
59 |
Built with git-ssb-web