Files: de71a0149a10c40bfb94cc8e121223e6b887f685 / scheduler.js
3595 bytesRaw
1 | const binarySearchInsert = require('binary-search-insert') |
2 | const SortedMap = require('sortedmap') |
3 | |
4 | function comparator (a, b) { |
5 | return a.ticks - b.ticks |
6 | } |
7 | |
8 | module.exports = class Scheduler { |
9 | /** |
10 | * The Scheduler manages the actor instances and tracks how many "ticks" they |
11 | * have ran. |
12 | */ |
13 | constructor () { |
14 | this._waits = [] |
15 | this._running = new Set() |
16 | this._checkingWaits = false |
17 | this.instances = new SortedMap(comparator) |
18 | } |
19 | |
20 | /** |
21 | * locks the scheduler from clearing waits untill the lock is resolved |
22 | * @param {string} id |
23 | * @return {function} the resolve function to call once it to unlock |
24 | */ |
25 | lock (id) { |
26 | let r |
27 | const p = new Promise((resolve, reject) => { |
28 | r = resolve |
29 | }) |
30 | p.ticks = 0 |
31 | this.instances.set(id, p) |
32 | this._running.add(id) |
33 | return r |
34 | } |
35 | |
36 | /** |
37 | * updates an instance with a new tick count |
38 | * @param {Object} instance - an actor instance |
39 | */ |
40 | update (instance) { |
41 | this._waits = this._waits.filter(wait => wait.id !== instance.id) |
42 | this._update(instance) |
43 | this._running.add(instance.id) |
44 | this._checkWaits() |
45 | } |
46 | |
47 | _update (instance) { |
48 | this.instances.delete(instance.id) |
49 | this.instances.set(instance.id, instance) |
50 | } |
51 | |
52 | /** |
53 | * returns an Actor instance |
54 | * @param {String} id |
55 | * @return {Object} |
56 | */ |
57 | getInstance (id) { |
58 | return this.instances.get(id) |
59 | } |
60 | |
61 | /** |
62 | * deletes an instance from the scheduler |
63 | * @param {String} id - the containers id |
64 | */ |
65 | done (id) { |
66 | this._running.delete(id) |
67 | this.instances.delete(id) |
68 | this._checkWaits() |
69 | } |
70 | |
71 | /** |
72 | * returns a promise that resolves once all containers have reached the given |
73 | * number of ticks |
74 | * @param {interger} ticks - the number of ticks to wait |
75 | * @param {string} id - optional id of the container that is waiting |
76 | * @return {Promise} |
77 | */ |
78 | wait (ticks, id) { |
79 | this._running.delete(id) |
80 | |
81 | return new Promise((resolve, reject) => { |
82 | binarySearchInsert(this._waits, comparator, { |
83 | ticks: ticks, |
84 | resolve: resolve, |
85 | id: id |
86 | }) |
87 | this._checkWaits() |
88 | }) |
89 | } |
90 | |
91 | /** |
92 | * returns the oldest container's ticks |
93 | * @return {integer} |
94 | */ |
95 | leastNumberOfTicks (exclude) { |
96 | let ticks = Infinity |
97 | for (const instance of this.instances) { |
98 | ticks = instance[1].ticks |
99 | if (instance[1].id !== exclude) { |
100 | return ticks |
101 | } |
102 | } |
103 | |
104 | return ticks |
105 | } |
106 | |
107 | // checks outstanding waits to see if they can be resolved |
108 | _checkWaits () { |
109 | // if there are no instances, clear any remaining waits |
110 | if (!this.instances.size) { |
111 | this._waits.forEach(wait => wait.resolve()) |
112 | this._waits = [] |
113 | this._checkingWaits = false |
114 | |
115 | return |
116 | } |
117 | |
118 | // find the old container, see if any of the waits can be resolved |
119 | while (this._waits[0]) { |
120 | const wait = this._waits[0] |
121 | const least = this.leastNumberOfTicks(wait.id) |
122 | if (wait.ticks <= least) { |
123 | this._waits.shift() |
124 | wait.resolve() |
125 | this._running.add(wait.id) |
126 | } else { |
127 | break |
128 | } |
129 | } |
130 | |
131 | // if there are no containers running find the oldest wait |
132 | // and update the oldest containers to its ticks |
133 | if (!this._running.size && this._waits.length) { |
134 | const oldest = this._waits[0].ticks |
135 | for (let instance of this.instances) { |
136 | instance = instance[1] |
137 | if (instance.ticks > oldest) { |
138 | break |
139 | } |
140 | instance.ticks = oldest |
141 | this._update(instance) |
142 | } |
143 | this._checkingWaits = false |
144 | |
145 | return this._checkWaits() |
146 | } |
147 | |
148 | this._checkingWaits = false |
149 | } |
150 | } |
151 |
Built with git-ssb-web