Files: 28f64a130416af8761e0959a2c2f2de9512f0c93 / scheduler.js
3737 bytesRaw
1 | const binarySearchInsert = require('binary-search-insert') |
2 | |
3 | module.exports = class Scheduler { |
4 | /** |
5 | * The Sceduler manages the run cycle of the containes and figures out which |
6 | * order they should run in |
7 | */ |
8 | constructor () { |
9 | this._waits = [] |
10 | this._running = new Set() |
11 | this._loadingInstances = new Map() |
12 | this.instances = new Map() |
13 | } |
14 | |
15 | /** |
16 | * locks the scheduler from clearing waits untill the lock is resolved |
17 | * @param {string} id |
18 | * @return {function} the resolve function to call once it to unlock |
19 | */ |
20 | getLock (id) { |
21 | let r |
22 | const promise = new Promise((resolve, reject) => { |
23 | r = resolve |
24 | }) |
25 | promise.then(() => { |
26 | this._loadingInstances.delete(id) |
27 | }) |
28 | this._loadingInstances.set(id, promise) |
29 | return r |
30 | } |
31 | |
32 | /** |
33 | * updates an instance with a new tick count |
34 | * @param {Object} instance - a container instance |
35 | */ |
36 | update (instance) { |
37 | this._update(instance) |
38 | this._checkWaits() |
39 | } |
40 | |
41 | _update (instance) { |
42 | this._running.add(instance.id) |
43 | // sorts the container instance map by tick count |
44 | this.instances.delete(instance.id) |
45 | const instanceArray = [...this.instances] |
46 | binarySearchInsert(instanceArray, comparator, [instance.id, instance]) |
47 | this.instances = new Map(instanceArray) |
48 | |
49 | function comparator (a, b) { |
50 | return a[1].ticks - b[1].ticks |
51 | } |
52 | } |
53 | |
54 | /** |
55 | * returns a container |
56 | * @param {string} id |
57 | * @return {object} |
58 | */ |
59 | getInstance (id) { |
60 | return this.instances.get(id) || this._loadingInstances.get(id) |
61 | } |
62 | |
63 | /** |
64 | * deletes an instance from the scheduler |
65 | * @param {string} id - the containers id |
66 | */ |
67 | done (id) { |
68 | this._running.delete(id) |
69 | this.instances.delete(id) |
70 | this._checkWaits() |
71 | } |
72 | |
73 | /** |
74 | * returns a promise that resolves once all containers have reached the given |
75 | * number of ticks |
76 | * @param {interger} ticks - the number of ticks to wait |
77 | * @param {string} id - optional id of the container that is waiting |
78 | * @return {Promise} |
79 | */ |
80 | wait (ticks = Infinity, id) { |
81 | this._running.delete(id) |
82 | return new Promise((resolve, reject) => { |
83 | binarySearchInsert(this._waits, comparator, { |
84 | ticks: ticks, |
85 | resolve: resolve |
86 | }) |
87 | this._checkWaits() |
88 | }) |
89 | |
90 | function comparator (a, b) { |
91 | return a.ticks - b.ticks |
92 | } |
93 | } |
94 | |
95 | /** |
96 | * returns the oldest container's ticks |
97 | * @return {integer} |
98 | */ |
99 | oldest () { |
100 | const nextValue = this.instances.values().next().value |
101 | return nextValue ? nextValue.ticks : 0 |
102 | } |
103 | |
104 | // checks outstanding waits to see if they can be resolved |
105 | _checkWaits () { |
106 | if (!this._loadingInstances.size) { |
107 | // if there are no running containers |
108 | if (!this.instances.size) { |
109 | // clear any remanding waits |
110 | this._waits.forEach(wait => wait.resolve()) |
111 | this._waits = [] |
112 | } else if (!this._running.size) { |
113 | // if there are no containers running find the oldest wait and update |
114 | // the oldest containers to it ticks |
115 | const oldest = this._waits[0].ticks |
116 | for (let instance of this.instances) { |
117 | instance = instance[1] |
118 | if (instance.ticks > oldest) { |
119 | break |
120 | } else { |
121 | instance.ticks = oldest |
122 | this._update(instance) |
123 | } |
124 | } |
125 | return this._checkWaits() |
126 | } else { |
127 | // find the old container and see if to can resolve any of the waits |
128 | const oldest = this.oldest() |
129 | for (const index in this._waits) { |
130 | const wait = this._waits[index] |
131 | if (wait.ticks <= oldest) { |
132 | wait.resolve() |
133 | } else { |
134 | this._waits.splice(0, index) |
135 | break |
136 | } |
137 | } |
138 | } |
139 | } |
140 | } |
141 | } |
142 |
Built with git-ssb-web