git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: a2acc6de9878b25ba7ae028fcf33ca086d042fae

Files: a2acc6de9878b25ba7ae028fcf33ca086d042fae / portManager.js

7274 bytesRaw
1const DeleteMessage = require('./deleteMessage')
2
3module.exports = class PortManager {
4 /**
5 * The port manager manages the the ports. This inculdes creation, deletion
6 * fetching and waiting on ports
7 * @param {Object} opts
8 * @param {Object} opts.state
9 * @param {Object} opts.hypervisor
10 * @param {Object} opts.exoInterface
11 */
12 constructor (opts) {
13 Object.assign(this, opts)
14 this.ports = this.state.ports
15
16 this._waitingPorts = {}
17 // tracks unbounded ports that we have
18 this._unboundPorts = new Set()
19 this._saturationPromise = new Promise((resolve, reject) => {
20 this._saturationResolve = resolve
21 })
22 this._oldestMessagePromise = new Promise((resolve, reject) => {
23 this._oldestMessageResolve = resolve
24 })
25 }
26
27 /**
28 * binds a port to a name
29 * @param {Object} port - the port to bind
30 * @param {String} name - the name of the port
31 */
32 async bind (name, port) {
33 if (this.isBound(port)) {
34 throw new Error('cannot bind a port that is already bound')
35 } else if (this.ports[name]) {
36 throw new Error('cannot bind port to a name that is alread bound')
37 } else {
38 this._unboundPorts.delete(port)
39
40 // save the port instance
41 this.ports[name] = port
42
43 // update the dest port
44 const destPort = await this.hypervisor.getDestPort(port)
45 port.messages.forEach(message => {
46 message._fromPort = port
47 message.fromName = name
48 })
49
50 if (destPort) {
51 destPort.destName = name
52 destPort.destId = this.id
53 delete destPort.destPort
54 }
55 }
56 }
57
58 /**
59 * unbinds a port given its name
60 * @param {string} name
61 * @returns {Promise}
62 */
63 async unbind (name) {
64 const port = this.ports[name]
65 delete this.ports[name]
66 this._unboundPorts.add(port)
67 this.hypervisor.addNodeToCheck(this.id)
68
69 // update the destination port
70 const destPort = await this.hypervisor.getDestPort(port)
71 delete destPort.destName
72 delete destPort.destId
73 destPort.destPort = port
74
75 return port
76 }
77
78 /**
79 * delete an port given the name it is bound to
80 * @param {string} name
81 */
82 async delete (name) {
83 const port = this.ports[name]
84 await this.kernel.send(port, new DeleteMessage())
85 this._delete(name)
86 }
87
88 _delete (name) {
89 this.hypervisor.addNodeToCheck(this.id)
90 delete this.ports[name]
91 }
92
93 /**
94 * clears any unbounded ports referances
95 */
96 clearUnboundedPorts () {
97 const waits = []
98 this._unboundPorts.forEach(port => {
99 waits.push(this.kernel.send(port, new DeleteMessage()))
100 })
101
102 this._unboundPorts.clear()
103 return Promise.all(waits)
104 }
105
106 /**
107 * check if a port object is still valid
108 * @param {Object} port
109 * @return {Boolean}
110 */
111 isBound (port) {
112 return !this._unboundPorts.has(port)
113 }
114
115 /**
116 * queues a message on a port
117 * @param {Message} message
118 */
119 queue (name, message) {
120 const port = this.ports[name]
121
122 message._fromPort = port
123 message.fromName = name
124
125 const numOfMsg = port.messages.push(message)
126
127 if (numOfMsg === 1) {
128 if (isSaturated(this._waitingPorts)) {
129 this._saturationResolve()
130 this._saturationPromise = new Promise((resolve, reject) => {
131 this._saturationResolve = resolve
132 })
133 } else if (message._fromTicks < this._messageTickThreshold) {
134 this._oldestMessageResolve(message)
135 this._oldestMessagePromise = new Promise((resolve, reject) => {
136 this._oldestMessageResolve = resolve
137 })
138 this._messageTickThreshold = Infinity
139 }
140 }
141 }
142
143 /**
144 * gets a port given it's name
145 * @param {String} name
146 * @return {Object}
147 */
148 get (name) {
149 return this.ports[name]
150 }
151
152 /**
153 * creates a channel returns the created ports in an Array
154 * @returns {array}
155 */
156 createChannel () {
157 const [port1, port2] = this.hypervisor.createChannel()
158 this._unboundPorts.add(port1)
159 this._unboundPorts.add(port2)
160 return [port1, port2]
161 }
162
163 /**
164 * Waits for the the next message if any
165 * @returns {Promise}
166 */
167 async getNextMessage (ports = this.ports, timeout = Infinity) {
168 let message = peekNextMessage(ports)
169 let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks()
170 let saturated = false
171
172 if (Object.keys(this._waitingPorts).length) {
173 throw new Error('already getting next message')
174 }
175
176 this._waitingPorts = ports
177
178 const findOldestMessage = async () => {
179 while (// end if we have a message older then slowest containers
180 !((message && oldestTime >= message._fromTicks) ||
181 // end if there are no messages and this container is the oldest contaner
182 (!message && oldestTime === this.kernel.ticks))) {
183 if (saturated) {
184 break
185 }
186 let ticksToWait = message ? message._fromTicks : this.kernel.ticks
187 // ticksToWait = ticksToWait > timeout ? timeout : ticksToWait
188 await Promise.race([
189 this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => {
190 message = peekNextMessage(ports)
191 }),
192 this._olderMessage(message).then(m => {
193 message = m
194 })
195 ])
196 oldestTime = this.hypervisor.scheduler.leastNumberOfTicks()
197 }
198 }
199
200 await Promise.race([
201 this._whenSaturated(ports).then(() => {
202 message = peekNextMessage(ports)
203 saturated = true
204 }),
205 findOldestMessage()
206 ])
207
208 this._waitingPorts = {}
209
210 return message
211 }
212
213 // returns a promise that resolve when the ports are saturated
214 _whenSaturated (ports) {
215 if (isSaturated(ports)) {
216 return Promise.resolve()
217 } else {
218 return this._saturationPromise
219 }
220 }
221
222 // returns a promise that resolve when a message older then the given message
223 // is recived
224 _olderMessage (message) {
225 this._messageTickThreshold = message ? message._fromTicks : -1
226 return this._oldestMessagePromise
227 }
228
229 removeSentPorts (message) {
230 message.ports.forEach(port => this._unboundPorts.delete(port))
231 }
232
233 addReceivedPorts (message) {
234 message.ports.forEach(port => this._unboundPorts.add(port))
235 }
236
237 checkSendingPorts (message) {
238 for (const port of message.ports) {
239 if (this.isBound(port)) {
240 throw new Error('message must not contain bound ports')
241 }
242 }
243 }
244}
245
246// tests wether or not all the ports have a message
247function isSaturated (ports) {
248 const values = Object.values(ports)
249 return values.length ? values.every(port => port.messages.length) : true
250}
251
252// find and returns the next message that this instance currently knows about
253function peekNextMessage (ports) {
254 ports = Object.values(ports)
255 if (ports.length) {
256 const port = ports.reduce(messageArbiter)
257 return port.messages[0]
258 }
259}
260
261// decides which message to go first
262function messageArbiter (portA, portB) {
263 const a = portA.messages[0]
264 const b = portB.messages[0]
265
266 if (!a) {
267 return portB
268 } else if (!b) {
269 return portA
270 }
271
272 // order by number of ticks if messages have different number of ticks
273 if (a._fromTicks !== b._fromTicks) {
274 return a._fromTicks < b._fromTicks ? portA : portB
275 } else {
276 // insertion order
277 return portA
278 }
279}
280

Built with git-ssb-web