git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 2b9a2b29e069f6c25c9947d0c64fde6c71dcf631

Files: 2b9a2b29e069f6c25c9947d0c64fde6c71dcf631 / scheduler.js

3778 bytesRaw
1const binarySearchInsert = require('binary-search-insert')
2const SortedMap = require('sortedmap')
3const LockMap = require('lockmap')
4
5function comparator (a, b) {
6 return a.ticks - b.ticks
7}
8
9module.exports = class Scheduler {
10 /**
11 * The Scheduler manages the actor instances and tracks how many "ticks" they
12 * have ran.
13 */
14 constructor () {
15 this._waits = []
16 this._running = new Set()
17 this._loadingInstances = new LockMap()
18 this._checkingWaits = false
19 this.instances = new SortedMap(comparator)
20 }
21
22 /**
23 * locks the scheduler from clearing waits untill the lock is resolved
24 * @param {string} id
25 * @return {function} the resolve function to call once it to unlock
26 */
27 lock (id) {
28 return this._loadingInstances.lock(id)
29 }
30
31 /**
32 * updates an instance with a new tick count
33 * @param {Object} instance - an actor instance
34 */
35 update (instance) {
36 this._waits = this._waits.filter(wait => wait.id !== instance.id)
37 this._update(instance)
38 this._running.add(instance.id)
39 this._checkWaits()
40 }
41
42 _update (instance) {
43 this.instances.delete(instance.id)
44 this.instances.set(instance.id, instance)
45 }
46
47 /**
48 * returns an Actor instance
49 * @param {String} id
50 * @return {Object}
51 */
52 getInstance (id) {
53 return this.instances.get(id) || this._loadingInstances.get(id)
54 }
55
56 /**
57 * deletes an instance from the scheduler
58 * @param {String} id - the containers id
59 */
60 done (id) {
61 this._running.delete(id)
62 this.instances.delete(id)
63 this._checkWaits()
64 }
65
66 /**
67 * returns a promise that resolves once all containers have reached the given
68 * number of ticks
69 * @param {interger} ticks - the number of ticks to wait
70 * @param {string} id - optional id of the container that is waiting
71 * @return {Promise}
72 */
73 wait (ticks, id) {
74 this._running.delete(id)
75
76 return new Promise((resolve, reject) => {
77 binarySearchInsert(this._waits, comparator, {
78 ticks: ticks,
79 resolve: resolve,
80 id: id
81 })
82 this._checkWaits()
83 })
84 }
85
86 /**
87 * returns the oldest container's ticks
88 * @return {integer}
89 */
90 leastNumberOfTicks (exclude) {
91 let ticks = 0
92 for (const instance of this.instances) {
93 ticks = instance[1].ticks
94 if (instance[1].id !== exclude) {
95 return ticks
96 }
97 }
98
99 return ticks
100 }
101
102 // checks outstanding waits to see if they can be resolved
103 async _checkWaits () {
104 if (this._checkingWaits) {
105 return
106 }
107 this._checkingWaits = true
108
109 // wait to check waits until all the instances are done loading
110 await [...this._loadingInstances.values()]
111
112 // if there are no instances, clear any remaining waits
113 if (!this.instances.size) {
114 this._waits.forEach(wait => wait.resolve())
115 this._waits = []
116 this._checkingWaits = false
117
118 return
119 }
120
121 // find the old container, see if any of the waits can be resolved
122 while (this._waits[0]) {
123 const wait = this._waits[0]
124 const least = this.leastNumberOfTicks(wait.id)
125 if (wait.ticks <= least) {
126 this._waits.shift()
127 wait.resolve()
128 this._running.add(wait.id)
129 } else {
130 break
131 }
132 }
133
134 // if there are no containers running find the oldest wait
135 // and update the oldest containers to its ticks
136 if (!this._running.size && this._waits.length) {
137 const oldest = this._waits[0].ticks
138 for (let instance of this.instances) {
139 instance = instance[1]
140 if (instance.ticks > oldest) {
141 break
142 }
143 instance.ticks = oldest
144 this._update(instance)
145 }
146 this._checkingWaits = false
147
148 return this._checkWaits()
149 }
150
151 this._checkingWaits = false
152 }
153}
154

Built with git-ssb-web