git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 9e1f4df59ed811983747166e1f3b752a92e9b42a

Files: 9e1f4df59ed811983747166e1f3b752a92e9b42a / portManager.js

7275 bytesRaw
1const DeleteMessage = require('./deleteMessage')
2
3
4module.exports = class PortManager {
5 /**
6 * The port manager manages the the ports. This inculdes creation, deletion
7 * fetching and waiting on ports
8 * @param {Object} opts
9 * @param {Object} opts.state
10 * @param {Object} opts.hypervisor
11 * @param {Object} opts.exoInterface
12 */
13 constructor (opts) {
14 Object.assign(this, opts)
15 this.ports = this.state.ports
16
17 this._waitingPorts = {}
18 // tracks unbounded ports that we have
19 this._unboundPorts = new Set()
20 this._saturationPromise = new Promise((resolve, reject) => {
21 this._saturationResolve = resolve
22 })
23 this._oldestMessagePromise = new Promise((resolve, reject) => {
24 this._oldestMessageResolve = resolve
25 })
26 }
27
28 /**
29 * binds a port to a name
30 * @param {Object} port - the port to bind
31 * @param {String} name - the name of the port
32 */
33 async bind (name, port) {
34 if (this.isBound(port)) {
35 throw new Error('cannot bind a port that is already bound')
36 } else if (this.ports[name]) {
37 throw new Error('cannot bind port to a name that is alread bound')
38 } else {
39 this._unboundPorts.delete(port)
40
41 // save the port instance
42 this.ports[name] = port
43
44 // update the dest port
45 const destPort = await this.hypervisor.getDestPort(port)
46 port.messages.forEach(message => {
47 message._fromPort = port
48 message.fromName = name
49 })
50
51 if (destPort) {
52 destPort.destName = name
53 destPort.destId = this.id
54 delete destPort.destPort
55 }
56 }
57 }
58
59 /**
60 * unbinds a port given its name
61 * @param {string} name
62 * @returns {Promise}
63 */
64 async unbind (name) {
65 const port = this.ports[name]
66 delete this.ports[name]
67 this._unboundPorts.add(port)
68 this.hypervisor.addNodeToCheck(this.id)
69
70 // update the destination port
71 const destPort = await this.hypervisor.getDestPort(port)
72 delete destPort.destName
73 delete destPort.destId
74 destPort.destPort = port
75
76 return port
77 }
78
79 /**
80 * delete an port given the name it is bound to
81 * @param {string} name
82 */
83 async delete (name) {
84 const port = this.ports[name]
85 await this.kernel.send(port, new DeleteMessage())
86 this._delete(name)
87 }
88
89 _delete (name) {
90 this.hypervisor.addNodeToCheck(this.id)
91 delete this.ports[name]
92 }
93
94 /**
95 * clears any unbounded ports referances
96 */
97 clearUnboundedPorts () {
98 const waits = []
99 this._unboundPorts.forEach(port => {
100 waits.push(this.kernel.send(port, new DeleteMessage()))
101 })
102
103 this._unboundPorts.clear()
104 return Promise.all(waits)
105 }
106
107 /**
108 * check if a port object is still valid
109 * @param {Object} port
110 * @return {Boolean}
111 */
112 isBound (port) {
113 return !this._unboundPorts.has(port)
114 }
115
116 /**
117 * queues a message on a port
118 * @param {Message} message
119 */
120 queue (name, message) {
121 const port = this.ports[name]
122
123 message._fromPort = port
124 message.fromName = name
125
126 const numOfMsg = port.messages.push(message)
127
128 if (numOfMsg === 1) {
129 if (isSaturated(this._waitingPorts)) {
130 this._saturationResolve()
131 this._saturationPromise = new Promise((resolve, reject) => {
132 this._saturationResolve = resolve
133 })
134 } else if (message._fromTicks < this._messageTickThreshold) {
135 this._oldestMessageResolve(message)
136 this._oldestMessagePromise = new Promise((resolve, reject) => {
137 this._oldestMessageResolve = resolve
138 })
139 this._messageTickThreshold = Infinity
140 }
141 }
142 }
143
144 /**
145 * gets a port given it's name
146 * @param {String} name
147 * @return {Object}
148 */
149 get (name) {
150 return this.ports[name]
151 }
152
153 /**
154 * creates a channel returns the created ports in an Array
155 * @returns {array}
156 */
157 createChannel () {
158 const [port1, port2] = this.hypervisor.createChannel()
159 this._unboundPorts.add(port1)
160 this._unboundPorts.add(port2)
161 return [port1, port2]
162 }
163
164 /**
165 * Waits for the the next message if any
166 * @returns {Promise}
167 */
168 async getNextMessage (ports = this.ports, timeout = Infinity) {
169 let message = peekNextMessage(ports)
170 let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks()
171 let saturated = false
172
173 if (Object.keys(this._waitingPorts).length) {
174 throw new Error('already getting next message')
175 }
176
177 this._waitingPorts = ports
178
179 const findOldestMessage = async () => {
180 while (// end if we have a message older then slowest containers
181 !((message && oldestTime >= message._fromTicks) ||
182 // end if there are no messages and this container is the oldest contaner
183 (!message && oldestTime === this.kernel.ticks))) {
184 if (saturated) {
185 break
186 }
187 let ticksToWait = message ? message._fromTicks : this.kernel.ticks
188 // ticksToWait = ticksToWait > timeout ? timeout : ticksToWait
189 await Promise.race([
190 this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => {
191 message = peekNextMessage(ports)
192 }),
193 this._olderMessage(message).then(m => {
194 message = m
195 })
196 ])
197 oldestTime = this.hypervisor.scheduler.leastNumberOfTicks()
198 }
199 }
200
201 await Promise.race([
202 this._whenSaturated(ports).then(() => {
203 message = peekNextMessage(ports)
204 saturated = true
205 }),
206 findOldestMessage()
207 ])
208
209 this._waitingPorts = {}
210
211 return message
212 }
213
214 // returns a promise that resolve when the ports are saturated
215 _whenSaturated (ports) {
216 if (isSaturated(ports)) {
217 return Promise.resolve()
218 } else {
219 return this._saturationPromise
220 }
221 }
222
223 // returns a promise that resolve when a message older then the given message
224 // is recived
225 _olderMessage (message) {
226 this._messageTickThreshold = message ? message._fromTicks : -1
227 return this._oldestMessagePromise
228 }
229
230 removeSentPorts (message) {
231 message.ports.forEach(port => this._unboundPorts.delete(port))
232 }
233
234 addReceivedPorts (message) {
235 message.ports.forEach(port => this._unboundPorts.add(port))
236 }
237
238 checkSendingPorts (message) {
239 for (const port of message.ports) {
240 if (this.isBound(port)) {
241 throw new Error('message must not contain bound ports')
242 }
243 }
244 }
245}
246
247// tests wether or not all the ports have a message
248function isSaturated (ports) {
249 const values = Object.values(ports)
250 return values.length ? values.every(port => port.messages.length) : true
251}
252
253// find and returns the next message that this instance currently knows about
254function peekNextMessage (ports) {
255 ports = Object.values(ports)
256 if (ports.length) {
257 const port = ports.reduce(messageArbiter)
258 return port.messages[0]
259 }
260}
261
262// decides which message to go first
263function messageArbiter (portA, portB) {
264 const a = portA.messages[0]
265 const b = portB.messages[0]
266
267 if (!a) {
268 return portB
269 } else if (!b) {
270 return portA
271 }
272
273 // order by number of ticks if messages have different number of ticks
274 if (a._fromTicks !== b._fromTicks) {
275 return a._fromTicks < b._fromTicks ? portA : portB
276 } else {
277 // insertion order
278 return portA
279 }
280}
281

Built with git-ssb-web