Files: 622b5589b3a43d425004534b7b44c376d3e740c7 / scheduler.js
3967 bytesRaw
1 | const binarySearchInsert = require('binary-search-insert') |
2 | const SortedMap = require('sortedmap') |
3 | const LockMap = require('lockmap') |
4 | |
5 | module.exports = class Scheduler { |
6 | /** |
7 | * The Scheduler manages the run cycle of Actors and figures out which |
8 | * order they should run in |
9 | */ |
10 | constructor () { |
11 | this._waits = [] |
12 | this._running = new Set() |
13 | this._loadingInstances = new LockMap() |
14 | this.instances = new SortedMap(comparator) |
15 | |
16 | function comparator (a, b) { |
17 | return a.ticks - b.ticks |
18 | } |
19 | } |
20 | |
21 | /** |
22 | * locks the scheduler from clearing waits untill the lock is resolved |
23 | * @param {string} id |
24 | * @return {function} the resolve function to call once it to unlock |
25 | */ |
26 | lock (id) { |
27 | return this._loadingInstances.lock(id) |
28 | } |
29 | |
30 | /** |
31 | * updates an instance with a new tick count |
32 | * @param {Object} instance - an actor instance |
33 | */ |
34 | update (instance) { |
35 | this._waits = this._waits.filter(wait => wait.id !== instance.id) |
36 | this._update(instance) |
37 | this._running.add(instance.id) |
38 | this._checkWaits() |
39 | } |
40 | |
41 | _update (instance) { |
42 | this.instances.delete(instance.id) |
43 | this.instances.set(instance.id, instance) |
44 | } |
45 | |
46 | /** |
47 | * returns an Actor instance |
48 | * @param {String} id |
49 | * @return {Object} |
50 | */ |
51 | getInstance (id) { |
52 | return this.instances.get(id) || this._loadingInstances.get(id) |
53 | } |
54 | |
55 | /** |
56 | * deletes an instance from the scheduler |
57 | * @param {String} id - the containers id |
58 | */ |
59 | done (id) { |
60 | this._running.delete(id) |
61 | this.instances.delete(id) |
62 | this._checkWaits() |
63 | } |
64 | |
65 | /** |
66 | * returns a promise that resolves once all containers have reached the given |
67 | * number of ticks |
68 | * @param {interger} ticks - the number of ticks to wait |
69 | * @param {string} id - optional id of the container that is waiting |
70 | * @return {Promise} |
71 | */ |
72 | wait (ticks, id) { |
73 | this._running.delete(id) |
74 | return new Promise((resolve, reject) => { |
75 | binarySearchInsert(this._waits, comparator, { |
76 | ticks: ticks, |
77 | resolve: resolve, |
78 | id: id |
79 | }) |
80 | this._checkWaits() |
81 | }) |
82 | |
83 | function comparator (a, b) { |
84 | return a.ticks - b.ticks |
85 | } |
86 | } |
87 | |
88 | /** |
89 | * returns the oldest container's ticks |
90 | * @return {integer} |
91 | */ |
92 | leastNumberOfTicks (exculde) { |
93 | let ticks = 0 |
94 | for (const instance of this.instances) { |
95 | ticks = instance[1].ticks |
96 | if (instance[1].id !== exculde) { |
97 | return ticks |
98 | } |
99 | } |
100 | return ticks |
101 | } |
102 | |
103 | // checks outstanding waits to see if they can be resolved |
104 | async _checkWaits () { |
105 | if (this._checkingWaits) { |
106 | return |
107 | } else { |
108 | this._checkingWaits = true |
109 | // wait to check waits untill all the instances are done loading |
110 | await [...this._loadingInstances.values()] |
111 | } |
112 | // if there are no running containers |
113 | if (!this.instances.size) { |
114 | // clear any remanding waits |
115 | this._waits.forEach(wait => wait.resolve()) |
116 | this._waits = [] |
117 | this._checkingWaits = false |
118 | } else { |
119 | // find the old container and see if to can resolve any of the waits |
120 | while (this._waits[0]) { |
121 | const wait = this._waits[0] |
122 | const least = this.leastNumberOfTicks(wait.id) |
123 | if (wait.ticks <= least) { |
124 | this._waits.shift() |
125 | wait.resolve() |
126 | this._running.add(wait.id) |
127 | } else { |
128 | break |
129 | } |
130 | } |
131 | |
132 | if (!this._running.size && this._waits.length) { |
133 | // if there are no containers running find the oldest wait and update |
134 | // the oldest containers to it ticks |
135 | const oldest = this._waits[0].ticks |
136 | for (let instance of this.instances) { |
137 | instance = instance[1] |
138 | if (instance.ticks > oldest) { |
139 | break |
140 | } else { |
141 | instance.ticks = oldest |
142 | this._update(instance) |
143 | } |
144 | } |
145 | this._checkingWaits = false |
146 | return this._checkWaits() |
147 | } else { |
148 | this._checkingWaits = false |
149 | } |
150 | } |
151 | } |
152 | } |
153 |
Built with git-ssb-web