git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 61eb05a360ac5413a3b250bbbfdf7b0a33560d93

Files: 61eb05a360ac5413a3b250bbbfdf7b0a33560d93 / portManager.js

7161 bytesRaw
1const DeleteMessage = require('./deleteMessage')
2
3
4module.exports = class PortManager {
5 /**
6 * The port manager manages the the ports. This inculdes creation, deletion
7 * fetching and waiting on ports
8 * @param {Object} opts
9 * @param {Object} opts.state
10 * @param {Object} opts.hypervisor
11 * @param {Object} opts.exoInterface
12 */
13 constructor (opts) {
14 Object.assign(this, opts)
15 this.ports = this.state.ports
16
17 this._waitingPorts = {}
18 // tracks unbounded ports that we have
19 this._unboundPorts = new Set()
20 this._saturationPromise = new Promise((resolve, reject) => {
21 this._saturationResolve = resolve
22 })
23 this._oldestMessagePromise = new Promise((resolve, reject) => {
24 this._oldestMessageResolve = resolve
25 })
26 }
27
28 /**
29 * binds a port to a name
30 * @param {Object} port - the port to bind
31 * @param {String} name - the name of the port
32 */
33 async bind (name, port) {
34 if (this.isBound(port)) {
35 throw new Error('cannot bind a port that is already bound')
36 } else if (this.ports[name]) {
37 throw new Error('cannot bind port to a name that is alread bound')
38 } else {
39 this._unboundPorts.delete(port)
40
41 // save the port instance
42 this.ports[name] = port
43
44 // update the dest port
45 const destPort = await this.hypervisor.getDestPort(port)
46 port.messages.forEach(message => {
47 message._fromPort = port
48 message.fromName = name
49 })
50
51 if (destPort) {
52 destPort.destName = name
53 destPort.destId = this.id
54 delete destPort.destPort
55 }
56 }
57 }
58
59 /**
60 * unbinds a port given its name
61 * @param {string} name
62 * @returns {Promise}
63 */
64 async unbind (name) {
65 const port = this.ports[name]
66 delete this.ports[name]
67 this._unboundPorts.add(port)
68 this.hypervisor.addNodeToCheck(this.id)
69
70 // update the destination port
71 const destPort = await this.hypervisor.getDestPort(port)
72 delete destPort.destName
73 delete destPort.destId
74 destPort.destPort = port
75
76 return port
77 }
78
79 /**
80 * delete an port given the name it is bound to
81 * @param {string} name
82 */
83 async delete (name) {
84 const port = this.ports[name]
85 await this.kernel.send(port, new DeleteMessage())
86 this._delete(name)
87 }
88
89 _delete (name) {
90 this.hypervisor.addNodeToCheck(this.id)
91 delete this.ports[name]
92 }
93
94 /**
95 * clears any unbounded ports referances
96 */
97 clearUnboundedPorts () {
98 const waits = []
99 this._unboundPorts.forEach(port => {
100 waits.push(this.kernel.send(port, new DeleteMessage()))
101 })
102
103 this._unboundPorts.clear()
104 return Promise.all(waits)
105 }
106
107 /**
108 * check if a port object is still valid
109 * @param {Object} port
110 * @return {Boolean}
111 */
112 isBound (port) {
113 return !this._unboundPorts.has(port)
114 }
115
116 /**
117 * queues a message on a port
118 * @param {Message} message
119 */
120 queue (name, message) {
121 const port = this.ports[name]
122
123 message._fromPort = port
124 message.fromName = name
125
126 const numOfMsg = port.messages.push(message)
127
128 if (numOfMsg === 1) {
129 if (isSaturated(this._waitingPorts)) {
130 this._saturationResolve()
131 this._saturationPromise = new Promise((resolve, reject) => {
132 this._saturationResolve = resolve
133 })
134 } else if (message._fromTicks < this._messageTickThreshold) {
135 this._oldestMessageResolve(message)
136 this._oldestMessagePromise = new Promise((resolve, reject) => {
137 this._oldestMessageResolve = resolve
138 })
139 this._messageTickThreshold = Infinity
140 }
141 }
142 }
143
144 /**
145 * gets a port given it's name
146 * @param {String} name
147 * @return {Object}
148 */
149 get (name) {
150 return this.ports[name]
151 }
152
153 /**
154 * creates a channel returns the created ports in an Array
155 * @returns {array}
156 */
157 createChannel () {
158 const [port1, port2] = this.hypervisor.createChannel()
159 this._unboundPorts.add(port1)
160 this._unboundPorts.add(port2)
161 return [port1, port2]
162 }
163
164 // find and returns the next message that this instance currently knows about
165 _peekNextMessage (ports) {
166 ports = Object.values(ports)
167 if (ports.length) {
168 const port = ports.reduce(messageArbiter)
169 return port.messages[0]
170 }
171 }
172
173 /**
174 * Waits for the the next message if any
175 * @returns {Promise}
176 */
177 async getNextMessage (ports = this.ports, timeout = Infinity) {
178 let message = this._peekNextMessage(ports)
179 let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks()
180 let saturated = false
181
182 this._waitingPorts = ports
183
184 const findOldestMessage = async () => {
185 while (// end if we have a message older then slowest containers
186 !((message && oldestTime >= message._fromTicks) ||
187 // end if there are no messages and this container is the oldest contaner
188 (!message && oldestTime === this.kernel.ticks))) {
189 if (saturated) {
190 break
191 }
192 let ticksToWait = message ? message._fromTicks : this.kernel.ticks
193 // ticksToWait = ticksToWait > timeout ? timeout : ticksToWait
194 await Promise.race([
195 this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => {
196 message = this._peekNextMessage(ports)
197 }),
198 this._olderMessage(message).then(m => {
199 message = m
200 })
201 ])
202 oldestTime = this.hypervisor.scheduler.leastNumberOfTicks()
203 }
204 }
205
206 await Promise.race([
207 this._whenSaturated(ports).then(() => {
208 message = this._peekNextMessage(ports)
209 saturated = true
210 }),
211 findOldestMessage()
212 ])
213
214 return message
215 }
216
217 // returns a promise that resolve when the ports are saturated
218 _whenSaturated (ports) {
219 if (isSaturated(ports)) {
220 return Promise.resolve()
221 } else {
222 return this._saturationPromise
223 }
224 }
225
226 // returns a promise that resolve when a message older then the given message
227 // is recived
228 _olderMessage (message) {
229 this._messageTickThreshold = message ? message._fromTicks : -1
230 return this._oldestMessagePromise
231 }
232
233 removeSentPorts (message) {
234 message.ports.forEach(port => this._unboundPorts.delete(port))
235 }
236
237 addReceivedPorts (message) {
238 message.ports.forEach(port => this._unboundPorts.add(port))
239 }
240
241 checkSendingPorts (message) {
242 for (const port of message.ports) {
243 if (this.isBound(port)) {
244 throw new Error('message must not contain bound ports')
245 }
246 }
247 }
248}
249
250// tests wether or not all the ports have a message
251function isSaturated (ports) {
252 const values = Object.values(ports)
253 return values.length ? values.every(port => port.messages.length) : true
254}
255
256// decides which message to go first
257function messageArbiter (portA, portB) {
258 const a = portA.messages[0]
259 const b = portB.messages[0]
260
261 if (!a) {
262 return portB
263 } else if (!b) {
264 return portA
265 }
266
267 // order by number of ticks if messages have different number of ticks
268 if (a._fromTicks !== b._fromTicks) {
269 return a._fromTicks < b._fromTicks ? portA : portB
270 } else {
271 // insertion order
272 return portA
273 }
274}
275

Built with git-ssb-web