Files: 5a33099559b3af6b0c5c0e444a8d537471853b77 / scheduler.js
2515 bytesRaw
1 | const EventEmitter = require('events') |
2 | const binarySearchInsert = require('binary-search-insert') |
3 | // const bs = require('binary-search') |
4 | |
5 | // decides which message to go first |
6 | function comparator (messageA, messageB) { |
7 | // order by number of ticks if messages have different number of ticks |
8 | if (messageA._fromTicks !== messageB._fromTicks) { |
9 | return messageA._fromTicks > messageB._fromTicks |
10 | } else { |
11 | return Buffer.compare(messageA._fromId, messageB._fromId) |
12 | } |
13 | } |
14 | |
15 | module.exports = class Scheduler extends EventEmitter { |
16 | /** |
17 | * The Scheduler manages the actor instances and tracks how many "ticks" they |
18 | * have ran. |
19 | */ |
20 | constructor (hypervisor) { |
21 | super() |
22 | this.hypervisor = hypervisor |
23 | this._messages = [] |
24 | this._times = [] |
25 | this.actors = new Map() |
26 | this._state = 'idle' |
27 | } |
28 | |
29 | queue (messages) { |
30 | messages.forEach(msg => binarySearchInsert(this._messages, comparator, msg)) |
31 | if (this._state === 'idle') { |
32 | this._state = 'running' |
33 | this._messageLoop() |
34 | } |
35 | } |
36 | |
37 | async _messageLoop () { |
38 | let waits = [] |
39 | while (this._messages.length) { |
40 | const message = this._messages.shift() |
41 | waits.push(this._processMessage(message)) |
42 | const oldestMessage = this._messages[0] |
43 | if (!oldestMessage || oldestMessage._fromTicks !== message._fromTicks) { |
44 | await Promise.all(waits) |
45 | waits = [] |
46 | } |
47 | } |
48 | this._state = 'idle' |
49 | const promises = [] |
50 | this.actors.forEach(actor => promises.push(actor.shutdown())) |
51 | await Promise.all(promises) |
52 | this.actors.clear() |
53 | this.emit('idle') |
54 | } |
55 | |
56 | // enable for concurrency |
57 | update (oldTicks, ticks) { |
58 | // const index = bs(this._times, oldTicks, (a, b) => a - b) |
59 | // this._times.splice(index, 1) |
60 | // binarySearchInsert(this._times, (a, b) => { return a - b }, ticks) |
61 | // let oldestMessage = this._messages[0] |
62 | // const oldestTime = this._times[0] |
63 | // while (oldestMessage && oldestMessage._fromTicks < oldestTime) { |
64 | // const message = this._messages.shift() |
65 | // this._processMessage(message) |
66 | // oldestMessage = this._messages[0] |
67 | // } |
68 | } |
69 | |
70 | async _processMessage (message) { |
71 | const to = message.funcRef.destId.toString('hex') |
72 | let actor = this.actors.get(to) |
73 | if (!actor) { |
74 | actor = await this.hypervisor.loadActor(message.funcRef.destId) |
75 | this.actors.set(to, actor) |
76 | } |
77 | const promise = new Promise((resolve, reject) => { |
78 | message.on('done', resolve) |
79 | }) |
80 | actor.queue(message) |
81 | return promise |
82 | } |
83 | } |
84 |
Built with git-ssb-web