Files: 92d7c78382c4c1263d34dc2eced70feb25a04d85 / portManager.js
7274 bytesRaw
1 | const DeleteMessage = require('./deleteMessage') |
2 | |
3 | module.exports = class PortManager { |
4 | /** |
5 | * The port manager manages the the ports. This inculdes creation, deletion |
6 | * fetching and waiting on ports |
7 | * @param {Object} opts |
8 | * @param {Object} opts.state |
9 | * @param {Object} opts.hypervisor |
10 | * @param {Object} opts.exoInterface |
11 | */ |
12 | constructor (opts) { |
13 | Object.assign(this, opts) |
14 | this.ports = this.state.ports |
15 | |
16 | this._waitingPorts = {} |
17 | // tracks unbounded ports that we have |
18 | this._unboundPorts = new Set() |
19 | this._saturationPromise = new Promise((resolve, reject) => { |
20 | this._saturationResolve = resolve |
21 | }) |
22 | this._oldestMessagePromise = new Promise((resolve, reject) => { |
23 | this._oldestMessageResolve = resolve |
24 | }) |
25 | } |
26 | |
27 | /** |
28 | * binds a port to a name |
29 | * @param {Object} port - the port to bind |
30 | * @param {String} name - the name of the port |
31 | */ |
32 | async bind (name, port) { |
33 | if (this.isBound(port)) { |
34 | throw new Error('cannot bind a port that is already bound') |
35 | } else if (this.ports[name]) { |
36 | throw new Error('cannot bind port to a name that is alread bound') |
37 | } else { |
38 | this._unboundPorts.delete(port) |
39 | |
40 | // save the port instance |
41 | this.ports[name] = port |
42 | |
43 | // update the dest port |
44 | const destPort = await this.hypervisor.getDestPort(port) |
45 | port.messages.forEach(message => { |
46 | message._fromPort = port |
47 | message.fromName = name |
48 | }) |
49 | |
50 | if (destPort) { |
51 | destPort.destName = name |
52 | destPort.destId = this.id |
53 | delete destPort.destPort |
54 | } |
55 | } |
56 | } |
57 | |
58 | /** |
59 | * unbinds a port given its name |
60 | * @param {string} name |
61 | * @returns {Promise} |
62 | */ |
63 | async unbind (name) { |
64 | const port = this.ports[name] |
65 | delete this.ports[name] |
66 | this._unboundPorts.add(port) |
67 | this.hypervisor.addNodeToCheck(this.id) |
68 | |
69 | // update the destination port |
70 | const destPort = await this.hypervisor.getDestPort(port) |
71 | delete destPort.destName |
72 | delete destPort.destId |
73 | destPort.destPort = port |
74 | |
75 | return port |
76 | } |
77 | |
78 | /** |
79 | * delete an port given the name it is bound to |
80 | * @param {string} name |
81 | */ |
82 | async delete (name) { |
83 | const port = this.ports[name] |
84 | await this.kernel.send(port, new DeleteMessage()) |
85 | this._delete(name) |
86 | } |
87 | |
88 | _delete (name) { |
89 | this.hypervisor.addNodeToCheck(this.id) |
90 | delete this.ports[name] |
91 | } |
92 | |
93 | /** |
94 | * clears any unbounded ports referances |
95 | */ |
96 | clearUnboundedPorts () { |
97 | const waits = [] |
98 | this._unboundPorts.forEach(port => { |
99 | waits.push(this.kernel.send(port, new DeleteMessage())) |
100 | }) |
101 | |
102 | this._unboundPorts.clear() |
103 | return Promise.all(waits) |
104 | } |
105 | |
106 | /** |
107 | * check if a port object is still valid |
108 | * @param {Object} port |
109 | * @return {Boolean} |
110 | */ |
111 | isBound (port) { |
112 | return !this._unboundPorts.has(port) |
113 | } |
114 | |
115 | /** |
116 | * queues a message on a port |
117 | * @param {Message} message |
118 | */ |
119 | queue (name, message) { |
120 | const port = this.ports[name] |
121 | |
122 | message._fromPort = port |
123 | message.fromName = name |
124 | |
125 | const numOfMsg = port.messages.push(message) |
126 | |
127 | if (numOfMsg === 1) { |
128 | if (isSaturated(this._waitingPorts)) { |
129 | this._saturationResolve() |
130 | this._saturationPromise = new Promise((resolve, reject) => { |
131 | this._saturationResolve = resolve |
132 | }) |
133 | } else if (message._fromTicks < this._messageTickThreshold) { |
134 | this._oldestMessageResolve(message) |
135 | this._oldestMessagePromise = new Promise((resolve, reject) => { |
136 | this._oldestMessageResolve = resolve |
137 | }) |
138 | this._messageTickThreshold = Infinity |
139 | } |
140 | } |
141 | } |
142 | |
143 | /** |
144 | * gets a port given it's name |
145 | * @param {String} name |
146 | * @return {Object} |
147 | */ |
148 | get (name) { |
149 | return this.ports[name] |
150 | } |
151 | |
152 | /** |
153 | * creates a channel returns the created ports in an Array |
154 | * @returns {array} |
155 | */ |
156 | createChannel () { |
157 | const [port1, port2] = this.hypervisor.createChannel() |
158 | this._unboundPorts.add(port1) |
159 | this._unboundPorts.add(port2) |
160 | return [port1, port2] |
161 | } |
162 | |
163 | /** |
164 | * Waits for the the next message if any |
165 | * @returns {Promise} |
166 | */ |
167 | async getNextMessage (ports = this.ports, timeout = Infinity) { |
168 | let message = peekNextMessage(ports) |
169 | let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks() |
170 | let saturated = false |
171 | |
172 | if (Object.keys(this._waitingPorts).length) { |
173 | throw new Error('already getting next message') |
174 | } |
175 | |
176 | this._waitingPorts = ports |
177 | |
178 | const findOldestMessage = async () => { |
179 | while (// end if we have a message older then slowest containers |
180 | !((message && oldestTime >= message._fromTicks) || |
181 | // end if there are no messages and this container is the oldest contaner |
182 | (!message && oldestTime === this.kernel.ticks))) { |
183 | if (saturated) { |
184 | break |
185 | } |
186 | let ticksToWait = message ? message._fromTicks : this.kernel.ticks |
187 | // ticksToWait = ticksToWait > timeout ? timeout : ticksToWait |
188 | await Promise.race([ |
189 | this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => { |
190 | message = peekNextMessage(ports) |
191 | }), |
192 | this._olderMessage(message).then(m => { |
193 | message = m |
194 | }) |
195 | ]) |
196 | oldestTime = this.hypervisor.scheduler.leastNumberOfTicks() |
197 | } |
198 | } |
199 | |
200 | await Promise.race([ |
201 | this._whenSaturated(ports).then(() => { |
202 | message = peekNextMessage(ports) |
203 | saturated = true |
204 | }), |
205 | findOldestMessage() |
206 | ]) |
207 | |
208 | this._waitingPorts = {} |
209 | |
210 | return message |
211 | } |
212 | |
213 | // returns a promise that resolve when the ports are saturated |
214 | _whenSaturated (ports) { |
215 | if (isSaturated(ports)) { |
216 | return Promise.resolve() |
217 | } else { |
218 | return this._saturationPromise |
219 | } |
220 | } |
221 | |
222 | // returns a promise that resolve when a message older then the given message |
223 | // is recived |
224 | _olderMessage (message) { |
225 | this._messageTickThreshold = message ? message._fromTicks : -1 |
226 | return this._oldestMessagePromise |
227 | } |
228 | |
229 | removeSentPorts (message) { |
230 | message.ports.forEach(port => this._unboundPorts.delete(port)) |
231 | } |
232 | |
233 | addReceivedPorts (message) { |
234 | message.ports.forEach(port => this._unboundPorts.add(port)) |
235 | } |
236 | |
237 | checkSendingPorts (message) { |
238 | for (const port of message.ports) { |
239 | if (this.isBound(port)) { |
240 | throw new Error('message must not contain bound ports') |
241 | } |
242 | } |
243 | } |
244 | } |
245 | |
246 | // tests wether or not all the ports have a message |
247 | function isSaturated (ports) { |
248 | const values = Object.values(ports) |
249 | return values.length ? values.every(port => port.messages.length) : true |
250 | } |
251 | |
252 | // find and returns the next message that this instance currently knows about |
253 | function peekNextMessage (ports) { |
254 | ports = Object.values(ports) |
255 | if (ports.length) { |
256 | const port = ports.reduce(messageArbiter) |
257 | return port.messages[0] |
258 | } |
259 | } |
260 | |
261 | // decides which message to go first |
262 | function messageArbiter (portA, portB) { |
263 | const a = portA.messages[0] |
264 | const b = portB.messages[0] |
265 | |
266 | if (!a) { |
267 | return portB |
268 | } else if (!b) { |
269 | return portA |
270 | } |
271 | |
272 | // order by number of ticks if messages have different number of ticks |
273 | if (a._fromTicks !== b._fromTicks) { |
274 | return a._fromTicks < b._fromTicks ? portA : portB |
275 | } else { |
276 | // insertion order |
277 | return portA |
278 | } |
279 | } |
280 |
Built with git-ssb-web