Files: 2b9a2b29e069f6c25c9947d0c64fde6c71dcf631 / scheduler.js
3778 bytesRaw
1 | const binarySearchInsert = require('binary-search-insert') |
2 | const SortedMap = require('sortedmap') |
3 | const LockMap = require('lockmap') |
4 | |
5 | function comparator (a, b) { |
6 | return a.ticks - b.ticks |
7 | } |
8 | |
9 | module.exports = class Scheduler { |
10 | /** |
11 | * The Scheduler manages the actor instances and tracks how many "ticks" they |
12 | * have ran. |
13 | */ |
14 | constructor () { |
15 | this._waits = [] |
16 | this._running = new Set() |
17 | this._loadingInstances = new LockMap() |
18 | this._checkingWaits = false |
19 | this.instances = new SortedMap(comparator) |
20 | } |
21 | |
22 | /** |
23 | * locks the scheduler from clearing waits untill the lock is resolved |
24 | * @param {string} id |
25 | * @return {function} the resolve function to call once it to unlock |
26 | */ |
27 | lock (id) { |
28 | return this._loadingInstances.lock(id) |
29 | } |
30 | |
31 | /** |
32 | * updates an instance with a new tick count |
33 | * @param {Object} instance - an actor instance |
34 | */ |
35 | update (instance) { |
36 | this._waits = this._waits.filter(wait => wait.id !== instance.id) |
37 | this._update(instance) |
38 | this._running.add(instance.id) |
39 | this._checkWaits() |
40 | } |
41 | |
42 | _update (instance) { |
43 | this.instances.delete(instance.id) |
44 | this.instances.set(instance.id, instance) |
45 | } |
46 | |
47 | /** |
48 | * returns an Actor instance |
49 | * @param {String} id |
50 | * @return {Object} |
51 | */ |
52 | getInstance (id) { |
53 | return this.instances.get(id) || this._loadingInstances.get(id) |
54 | } |
55 | |
56 | /** |
57 | * deletes an instance from the scheduler |
58 | * @param {String} id - the containers id |
59 | */ |
60 | done (id) { |
61 | this._running.delete(id) |
62 | this.instances.delete(id) |
63 | this._checkWaits() |
64 | } |
65 | |
66 | /** |
67 | * returns a promise that resolves once all containers have reached the given |
68 | * number of ticks |
69 | * @param {interger} ticks - the number of ticks to wait |
70 | * @param {string} id - optional id of the container that is waiting |
71 | * @return {Promise} |
72 | */ |
73 | wait (ticks, id) { |
74 | this._running.delete(id) |
75 | |
76 | return new Promise((resolve, reject) => { |
77 | binarySearchInsert(this._waits, comparator, { |
78 | ticks: ticks, |
79 | resolve: resolve, |
80 | id: id |
81 | }) |
82 | this._checkWaits() |
83 | }) |
84 | } |
85 | |
86 | /** |
87 | * returns the oldest container's ticks |
88 | * @return {integer} |
89 | */ |
90 | leastNumberOfTicks (exclude) { |
91 | let ticks = 0 |
92 | for (const instance of this.instances) { |
93 | ticks = instance[1].ticks |
94 | if (instance[1].id !== exclude) { |
95 | return ticks |
96 | } |
97 | } |
98 | |
99 | return ticks |
100 | } |
101 | |
102 | // checks outstanding waits to see if they can be resolved |
103 | async _checkWaits () { |
104 | if (this._checkingWaits) { |
105 | return |
106 | } |
107 | this._checkingWaits = true |
108 | |
109 | // wait to check waits until all the instances are done loading |
110 | await [...this._loadingInstances.values()] |
111 | |
112 | // if there are no instances, clear any remaining waits |
113 | if (!this.instances.size) { |
114 | this._waits.forEach(wait => wait.resolve()) |
115 | this._waits = [] |
116 | this._checkingWaits = false |
117 | |
118 | return |
119 | } |
120 | |
121 | // find the old container, see if any of the waits can be resolved |
122 | while (this._waits[0]) { |
123 | const wait = this._waits[0] |
124 | const least = this.leastNumberOfTicks(wait.id) |
125 | if (wait.ticks <= least) { |
126 | this._waits.shift() |
127 | wait.resolve() |
128 | this._running.add(wait.id) |
129 | } else { |
130 | break |
131 | } |
132 | } |
133 | |
134 | // if there are no containers running find the oldest wait |
135 | // and update the oldest containers to its ticks |
136 | if (!this._running.size && this._waits.length) { |
137 | const oldest = this._waits[0].ticks |
138 | for (let instance of this.instances) { |
139 | instance = instance[1] |
140 | if (instance.ticks > oldest) { |
141 | break |
142 | } |
143 | instance.ticks = oldest |
144 | this._update(instance) |
145 | } |
146 | this._checkingWaits = false |
147 | |
148 | return this._checkWaits() |
149 | } |
150 | |
151 | this._checkingWaits = false |
152 | } |
153 | } |
154 |
Built with git-ssb-web