Files: 9d0e76ab27a7c7ca239e1ef71f84447d37f4acc4 / scheduler.js
3732 bytesRaw
1 | const binarySearchInsert = require('binary-search-insert') |
2 | const LockMap = require('lockmap') |
3 | |
4 | module.exports = class Scheduler { |
5 | /** |
6 | * The Sceduler manages the run cycle of the containes and figures out which |
7 | * order they should run in |
8 | */ |
9 | constructor () { |
10 | this._waits = [] |
11 | this._running = new Set() |
12 | this._loadingInstances = new LockMap() |
13 | this.instances = new Map() |
14 | this.systemServices = new Map() |
15 | } |
16 | |
17 | /** |
18 | * locks the scheduler from clearing waits untill the lock is resolved |
19 | * @param {string} id |
20 | * @return {function} the resolve function to call once it to unlock |
21 | */ |
22 | lock (id) { |
23 | return this._loadingInstances.lock(id) |
24 | } |
25 | |
26 | /** |
27 | * updates an instance with a new tick count |
28 | * @param {Object} instance - a container instance |
29 | */ |
30 | update (instance) { |
31 | this._waits = this._waits.filter(wait => wait.id !== instance.id) |
32 | this._update(instance) |
33 | this._running.add(instance.id) |
34 | this._checkWaits() |
35 | } |
36 | |
37 | _update (instance) { |
38 | // sorts the container instance map by tick count |
39 | this.instances.delete(instance.id) |
40 | const instanceArray = [...this.instances] |
41 | binarySearchInsert(instanceArray, comparator, [instance.id, instance]) |
42 | this.instances = new Map(instanceArray) |
43 | |
44 | function comparator (a, b) { |
45 | return a[1].ticks - b[1].ticks |
46 | } |
47 | } |
48 | |
49 | /** |
50 | * returns a container |
51 | * @param {string} id |
52 | * @return {object} |
53 | */ |
54 | getInstance (id) { |
55 | return this.instances.get(id) || this._loadingInstances.getLock(id) || this.systemServices.get(id) |
56 | } |
57 | |
58 | /** |
59 | * deletes an instance from the scheduler |
60 | * @param {string} id - the containers id |
61 | */ |
62 | done (id) { |
63 | this._running.delete(id) |
64 | this.instances.delete(id) |
65 | this._checkWaits() |
66 | } |
67 | |
68 | /** |
69 | * returns a promise that resolves once all containers have reached the given |
70 | * number of ticks |
71 | * @param {interger} ticks - the number of ticks to wait |
72 | * @param {string} id - optional id of the container that is waiting |
73 | * @return {Promise} |
74 | */ |
75 | wait (ticks = Infinity, id) { |
76 | this._running.delete(id) |
77 | return new Promise((resolve, reject) => { |
78 | binarySearchInsert(this._waits, comparator, { |
79 | ticks: ticks, |
80 | resolve: resolve, |
81 | id: id |
82 | }) |
83 | this._checkWaits() |
84 | }) |
85 | |
86 | function comparator (a, b) { |
87 | return a.ticks - b.ticks |
88 | } |
89 | } |
90 | |
91 | /** |
92 | * returns the oldest container's ticks |
93 | * @return {integer} |
94 | */ |
95 | leastNumberOfTicks () { |
96 | const nextValue = this.instances.values().next().value |
97 | return nextValue ? nextValue.ticks : 0 |
98 | } |
99 | |
100 | // checks outstanding waits to see if they can be resolved |
101 | _checkWaits () { |
102 | // if there are no running containers |
103 | if (!this.instances.size) { |
104 | // clear any remanding waits |
105 | this._waits.forEach(wait => wait.resolve()) |
106 | this._waits = [] |
107 | } else { |
108 | // find the old container and see if to can resolve any of the waits |
109 | const least = this.leastNumberOfTicks() |
110 | for (const index in this._waits) { |
111 | const wait = this._waits[index] |
112 | if (wait.ticks <= least) { |
113 | wait.resolve() |
114 | this._running.add(wait.id) |
115 | } else { |
116 | this._waits.splice(0, index) |
117 | break |
118 | } |
119 | } |
120 | if (!this._running.size) { |
121 | // if there are no containers running find the oldest wait and update |
122 | // the oldest containers to it ticks |
123 | const oldest = this._waits[0].ticks |
124 | for (let instance of this.instances) { |
125 | instance = instance[1] |
126 | if (instance.ticks > oldest) { |
127 | break |
128 | } else { |
129 | instance.ticks = oldest |
130 | this._update(instance) |
131 | } |
132 | } |
133 | return this._checkWaits() |
134 | } |
135 | } |
136 | } |
137 | } |
138 |
Built with git-ssb-web