git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 622b5589b3a43d425004534b7b44c376d3e740c7

Files: 622b5589b3a43d425004534b7b44c376d3e740c7 / scheduler.js

3967 bytesRaw
1const binarySearchInsert = require('binary-search-insert')
2const SortedMap = require('sortedmap')
3const LockMap = require('lockmap')
4
5module.exports = class Scheduler {
6 /**
7 * The Scheduler manages the run cycle of Actors and figures out which
8 * order they should run in
9 */
10 constructor () {
11 this._waits = []
12 this._running = new Set()
13 this._loadingInstances = new LockMap()
14 this.instances = new SortedMap(comparator)
15
16 function comparator (a, b) {
17 return a.ticks - b.ticks
18 }
19 }
20
21 /**
22 * locks the scheduler from clearing waits untill the lock is resolved
23 * @param {string} id
24 * @return {function} the resolve function to call once it to unlock
25 */
26 lock (id) {
27 return this._loadingInstances.lock(id)
28 }
29
30 /**
31 * updates an instance with a new tick count
32 * @param {Object} instance - an actor instance
33 */
34 update (instance) {
35 this._waits = this._waits.filter(wait => wait.id !== instance.id)
36 this._update(instance)
37 this._running.add(instance.id)
38 this._checkWaits()
39 }
40
41 _update (instance) {
42 this.instances.delete(instance.id)
43 this.instances.set(instance.id, instance)
44 }
45
46 /**
47 * returns an Actor instance
48 * @param {String} id
49 * @return {Object}
50 */
51 getInstance (id) {
52 return this.instances.get(id) || this._loadingInstances.get(id)
53 }
54
55 /**
56 * deletes an instance from the scheduler
57 * @param {String} id - the containers id
58 */
59 done (id) {
60 this._running.delete(id)
61 this.instances.delete(id)
62 this._checkWaits()
63 }
64
65 /**
66 * returns a promise that resolves once all containers have reached the given
67 * number of ticks
68 * @param {interger} ticks - the number of ticks to wait
69 * @param {string} id - optional id of the container that is waiting
70 * @return {Promise}
71 */
72 wait (ticks, id) {
73 this._running.delete(id)
74 return new Promise((resolve, reject) => {
75 binarySearchInsert(this._waits, comparator, {
76 ticks: ticks,
77 resolve: resolve,
78 id: id
79 })
80 this._checkWaits()
81 })
82
83 function comparator (a, b) {
84 return a.ticks - b.ticks
85 }
86 }
87
88 /**
89 * returns the oldest container's ticks
90 * @return {integer}
91 */
92 leastNumberOfTicks (exculde) {
93 let ticks = 0
94 for (const instance of this.instances) {
95 ticks = instance[1].ticks
96 if (instance[1].id !== exculde) {
97 return ticks
98 }
99 }
100 return ticks
101 }
102
103 // checks outstanding waits to see if they can be resolved
104 async _checkWaits () {
105 if (this._checkingWaits) {
106 return
107 } else {
108 this._checkingWaits = true
109 // wait to check waits untill all the instances are done loading
110 await [...this._loadingInstances.values()]
111 }
112 // if there are no running containers
113 if (!this.instances.size) {
114 // clear any remanding waits
115 this._waits.forEach(wait => wait.resolve())
116 this._waits = []
117 this._checkingWaits = false
118 } else {
119 // find the old container and see if to can resolve any of the waits
120 while (this._waits[0]) {
121 const wait = this._waits[0]
122 const least = this.leastNumberOfTicks(wait.id)
123 if (wait.ticks <= least) {
124 this._waits.shift()
125 wait.resolve()
126 this._running.add(wait.id)
127 } else {
128 break
129 }
130 }
131
132 if (!this._running.size && this._waits.length) {
133 // if there are no containers running find the oldest wait and update
134 // the oldest containers to it ticks
135 const oldest = this._waits[0].ticks
136 for (let instance of this.instances) {
137 instance = instance[1]
138 if (instance.ticks > oldest) {
139 break
140 } else {
141 instance.ticks = oldest
142 this._update(instance)
143 }
144 }
145 this._checkingWaits = false
146 return this._checkWaits()
147 } else {
148 this._checkingWaits = false
149 }
150 }
151 }
152}
153

Built with git-ssb-web