Files: f2c989b4dce29265b4c17a2cde93c68a0b742edc / scheduler.js
2388 bytesRaw
1 | const binarySearchInsert = require('binary-search-insert') |
2 | |
3 | const comparator = function (a, b) { |
4 | return a.ticks - b.ticks |
5 | } |
6 | |
7 | const instancesComparator = function (a, b) { |
8 | return a[1].ticks - b[1].ticks |
9 | } |
10 | |
11 | module.exports = class Scheduler { |
12 | constructor () { |
13 | this._waits = [] |
14 | this._running = new Set() |
15 | this.instances = new Map() |
16 | this.locks = new Set() |
17 | } |
18 | |
19 | getLock () { |
20 | const id = Symbol('lock') |
21 | this.locks.add(id) |
22 | return id |
23 | } |
24 | |
25 | releaseLock (id) { |
26 | this.locks.delete(id) |
27 | } |
28 | |
29 | update (instance) { |
30 | this._update(instance) |
31 | this._checkWaits() |
32 | } |
33 | |
34 | _update (instance) { |
35 | this._running.add(instance.id) |
36 | this.instances.delete(instance.id) |
37 | const instanceArray = [...this.instances] |
38 | binarySearchInsert(instanceArray, instancesComparator, [instance.id, instance]) |
39 | this.instances = new Map(instanceArray) |
40 | } |
41 | |
42 | getInstance (id) { |
43 | return this.instances.get(id) |
44 | } |
45 | |
46 | done (instance) { |
47 | this._running.delete(instance.id) |
48 | this.instances.delete(instance.id) |
49 | this._checkWaits() |
50 | } |
51 | |
52 | wait (ticks = Infinity, id) { |
53 | this._running.delete(id) |
54 | return new Promise((resolve, reject) => { |
55 | binarySearchInsert(this._waits, comparator, { |
56 | ticks: ticks, |
57 | resolve: resolve |
58 | }) |
59 | this._checkWaits() |
60 | }) |
61 | } |
62 | |
63 | smallest () { |
64 | return this.instances.size ? [...this.instances][0][1].ticks : 0 |
65 | } |
66 | |
67 | _checkWaits () { |
68 | if (!this.locks.size) { |
69 | // if there are no running containers |
70 | if (!this.isRunning()) { |
71 | // clear any remanding waits |
72 | this._waits.forEach(wait => wait.resolve()) |
73 | this._waits = [] |
74 | } else if (!this._running.size) { |
75 | const smallest = this._waits[0].ticks |
76 | for (let instance of this.instances) { |
77 | instance = instance[1] |
78 | if (instance.ticks > smallest) { |
79 | break |
80 | } else { |
81 | instance.ticks = smallest |
82 | this._update(instance) |
83 | } |
84 | } |
85 | return this._checkWaits() |
86 | } else { |
87 | const smallest = this.smallest() |
88 | for (const index in this._waits) { |
89 | const wait = this._waits[index] |
90 | if (wait.ticks <= smallest) { |
91 | wait.resolve() |
92 | } else { |
93 | this._waits.splice(0, index) |
94 | break |
95 | } |
96 | } |
97 | } |
98 | } |
99 | } |
100 | |
101 | isRunning () { |
102 | return this.instances.size |
103 | } |
104 | } |
105 |
Built with git-ssb-web