Files: a205418c5a618890ae0bb4c95d74c0dfadd17cff / scheduler.js
1606 bytesRaw
1 | const EventEmitter = require('events') |
2 | const binarySearchInsert = require('binary-search-insert') |
3 | |
4 | // decides which message to go first |
5 | function comparator (messageA, messageB) { |
6 | // order by number of ticks if messages have different number of ticks |
7 | if (messageA._fromTicks !== messageB._fromTicks) { |
8 | return messageA._fromTicks > messageB._fromTicks |
9 | } else { |
10 | return Buffer.compare(messageA._fromId.id, messageB._fromId.id) |
11 | } |
12 | } |
13 | |
14 | module.exports = class Scheduler extends EventEmitter { |
15 | /** |
16 | * The Scheduler manages the actor instances and tracks how many "ticks" they |
17 | * have ran. |
18 | */ |
19 | constructor (hypervisor) { |
20 | super() |
21 | this.hypervisor = hypervisor |
22 | this._messages = [] |
23 | this._times = [] |
24 | this.actors = new Map() |
25 | this.drivers = new Map() |
26 | this._running = false |
27 | } |
28 | |
29 | queue (messages) { |
30 | messages.forEach(msg => binarySearchInsert(this._messages, comparator, msg)) |
31 | if (!this._running) { |
32 | this._running = true |
33 | this._messageLoop() |
34 | } |
35 | } |
36 | |
37 | async _messageLoop () { |
38 | while (this._messages.length) { |
39 | const message = this._messages.shift() |
40 | await this._processMessage(message) |
41 | } |
42 | this._running = false |
43 | this.actors.forEach(actor => actor.shutdown()) |
44 | this.actors.clear() |
45 | this.emit('idle') |
46 | } |
47 | |
48 | async _processMessage (message) { |
49 | const to = message.funcRef.actorId.toString() |
50 | let actor = this.actors.get(to) || this.drivers.get(to) |
51 | if (!actor) { |
52 | actor = await this.hypervisor.loadActor(message.funcRef.actorId) |
53 | this.actors.set(to, actor) |
54 | } |
55 | return actor.runMessage(message) |
56 | } |
57 | } |
58 |
Built with git-ssb-web