git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: de71a0149a10c40bfb94cc8e121223e6b887f685

Files: de71a0149a10c40bfb94cc8e121223e6b887f685 / scheduler.js

3595 bytesRaw
1const binarySearchInsert = require('binary-search-insert')
2const SortedMap = require('sortedmap')
3
4function comparator (a, b) {
5 return a.ticks - b.ticks
6}
7
8module.exports = class Scheduler {
9 /**
10 * The Scheduler manages the actor instances and tracks how many "ticks" they
11 * have ran.
12 */
13 constructor () {
14 this._waits = []
15 this._running = new Set()
16 this._checkingWaits = false
17 this.instances = new SortedMap(comparator)
18 }
19
20 /**
21 * locks the scheduler from clearing waits untill the lock is resolved
22 * @param {string} id
23 * @return {function} the resolve function to call once it to unlock
24 */
25 lock (id) {
26 let r
27 const p = new Promise((resolve, reject) => {
28 r = resolve
29 })
30 p.ticks = 0
31 this.instances.set(id, p)
32 this._running.add(id)
33 return r
34 }
35
36 /**
37 * updates an instance with a new tick count
38 * @param {Object} instance - an actor instance
39 */
40 update (instance) {
41 this._waits = this._waits.filter(wait => wait.id !== instance.id)
42 this._update(instance)
43 this._running.add(instance.id)
44 this._checkWaits()
45 }
46
47 _update (instance) {
48 this.instances.delete(instance.id)
49 this.instances.set(instance.id, instance)
50 }
51
52 /**
53 * returns an Actor instance
54 * @param {String} id
55 * @return {Object}
56 */
57 getInstance (id) {
58 return this.instances.get(id)
59 }
60
61 /**
62 * deletes an instance from the scheduler
63 * @param {String} id - the containers id
64 */
65 done (id) {
66 this._running.delete(id)
67 this.instances.delete(id)
68 this._checkWaits()
69 }
70
71 /**
72 * returns a promise that resolves once all containers have reached the given
73 * number of ticks
74 * @param {interger} ticks - the number of ticks to wait
75 * @param {string} id - optional id of the container that is waiting
76 * @return {Promise}
77 */
78 wait (ticks, id) {
79 this._running.delete(id)
80
81 return new Promise((resolve, reject) => {
82 binarySearchInsert(this._waits, comparator, {
83 ticks: ticks,
84 resolve: resolve,
85 id: id
86 })
87 this._checkWaits()
88 })
89 }
90
91 /**
92 * returns the oldest container's ticks
93 * @return {integer}
94 */
95 leastNumberOfTicks (exclude) {
96 let ticks = Infinity
97 for (const instance of this.instances) {
98 ticks = instance[1].ticks
99 if (instance[1].id !== exclude) {
100 return ticks
101 }
102 }
103
104 return ticks
105 }
106
107 // checks outstanding waits to see if they can be resolved
108 _checkWaits () {
109 // if there are no instances, clear any remaining waits
110 if (!this.instances.size) {
111 this._waits.forEach(wait => wait.resolve())
112 this._waits = []
113 this._checkingWaits = false
114
115 return
116 }
117
118 // find the old container, see if any of the waits can be resolved
119 while (this._waits[0]) {
120 const wait = this._waits[0]
121 const least = this.leastNumberOfTicks(wait.id)
122 if (wait.ticks <= least) {
123 this._waits.shift()
124 wait.resolve()
125 this._running.add(wait.id)
126 } else {
127 break
128 }
129 }
130
131 // if there are no containers running find the oldest wait
132 // and update the oldest containers to its ticks
133 if (!this._running.size && this._waits.length) {
134 const oldest = this._waits[0].ticks
135 for (let instance of this.instances) {
136 instance = instance[1]
137 if (instance.ticks > oldest) {
138 break
139 }
140 instance.ticks = oldest
141 this._update(instance)
142 }
143 this._checkingWaits = false
144
145 return this._checkWaits()
146 }
147
148 this._checkingWaits = false
149 }
150}
151

Built with git-ssb-web