git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: e210ee60ee3b72696f7da750046fb41d41785da5

Files: e210ee60ee3b72696f7da750046fb41d41785da5 / scheduler.js

3658 bytesRaw
1const binarySearchInsert = require('binary-search-insert')
2const SortedMap = require('sortedmap')
3const LockMap = require('lockmap')
4
5function comparator (a, b) {
6 return a.ticks - b.ticks
7}
8
9module.exports = class Scheduler {
10 /**
11 * The Scheduler manages the actor instances and tracks how many "ticks" they
12 * have ran.
13 */
14 constructor () {
15 this._waits = []
16 this._running = new Set()
17 this._loadingInstances = new LockMap()
18 this._checkingWaits = false
19 this.instances = new SortedMap(comparator)
20 }
21
22 /**
23 * locks the scheduler from clearing waits untill the lock is resolved
24 * @param {string} id
25 * @return {function} the resolve function to call once it to unlock
26 */
27 lock (id) {
28 this.instances.set(id, {
29 ticks: 0
30 })
31 this._running.add(id)
32 return this._loadingInstances.lock(id)
33 }
34
35 /**
36 * updates an instance with a new tick count
37 * @param {Object} instance - an actor instance
38 */
39 update (instance) {
40 this._waits = this._waits.filter(wait => wait.id !== instance.id)
41 this._update(instance)
42 this._running.add(instance.id)
43 this._checkWaits()
44 }
45
46 _update (instance) {
47 this.instances.delete(instance.id)
48 this.instances.set(instance.id, instance)
49 }
50
51 /**
52 * returns an Actor instance
53 * @param {String} id
54 * @return {Object}
55 */
56 getInstance (id) {
57 return this._loadingInstances.get(id) || this.instances.get(id)
58 }
59
60 /**
61 * deletes an instance from the scheduler
62 * @param {String} id - the containers id
63 */
64 done (id) {
65 this._running.delete(id)
66 this.instances.delete(id)
67 this._checkWaits()
68 }
69
70 /**
71 * returns a promise that resolves once all containers have reached the given
72 * number of ticks
73 * @param {interger} ticks - the number of ticks to wait
74 * @param {string} id - optional id of the container that is waiting
75 * @return {Promise}
76 */
77 wait (ticks, id) {
78 this._running.delete(id)
79
80 return new Promise((resolve, reject) => {
81 binarySearchInsert(this._waits, comparator, {
82 ticks: ticks,
83 resolve: resolve,
84 id: id
85 })
86 this._checkWaits()
87 })
88 }
89
90 /**
91 * returns the oldest container's ticks
92 * @return {integer}
93 */
94 leastNumberOfTicks (exclude) {
95 let ticks = Infinity
96 for (const instance of this.instances) {
97 ticks = instance[1].ticks
98 if (instance[1].id !== exclude) {
99 return ticks
100 }
101 }
102
103 return ticks
104 }
105
106 // checks outstanding waits to see if they can be resolved
107 _checkWaits () {
108 // if there are no instances, clear any remaining waits
109 if (!this.instances.size) {
110 this._waits.forEach(wait => wait.resolve())
111 this._waits = []
112 this._checkingWaits = false
113
114 return
115 }
116
117 // find the old container, see if any of the waits can be resolved
118 while (this._waits[0]) {
119 const wait = this._waits[0]
120 const least = this.leastNumberOfTicks(wait.id)
121 if (wait.ticks <= least) {
122 this._waits.shift()
123 wait.resolve()
124 this._running.add(wait.id)
125 } else {
126 break
127 }
128 }
129
130 // if there are no containers running find the oldest wait
131 // and update the oldest containers to its ticks
132 if (!this._running.size && this._waits.length) {
133 const oldest = this._waits[0].ticks
134 for (let instance of this.instances) {
135 instance = instance[1]
136 if (instance.ticks > oldest) {
137 break
138 }
139 instance.ticks = oldest
140 this._update(instance)
141 }
142 this._checkingWaits = false
143
144 return this._checkWaits()
145 }
146
147 this._checkingWaits = false
148 }
149}
150

Built with git-ssb-web