Files: 9e075d6fa8d8d5137146130915af95c57bc1b231 / portManager.js
6830 bytesRaw
1 | const DeleteMessage = require('./deleteMessage') |
2 | |
3 | // decides which message to go first |
4 | function messageArbiter (nameA, nameB) { |
5 | const a = this.ports[nameA].messages[0] |
6 | const b = this.ports[nameB].messages[0] |
7 | |
8 | if (!a) { |
9 | return nameB |
10 | } else if (!b) { |
11 | return nameA |
12 | } |
13 | |
14 | // order by number of ticks if messages have different number of ticks |
15 | if (a._fromTicks !== b._fromTicks) { |
16 | return a._fromTicks < b._fromTicks ? nameA : nameB |
17 | } else { |
18 | // insertion order |
19 | return nameA |
20 | } |
21 | } |
22 | |
23 | module.exports = class PortManager { |
24 | /** |
25 | * The port manager manages the the ports. This inculdes creation, deletion |
26 | * fetching and waiting on ports |
27 | * @param {Object} opts |
28 | * @param {Object} opts.state |
29 | * @param {Object} opts.hypervisor |
30 | * @param {Object} opts.exoInterface |
31 | */ |
32 | constructor (opts) { |
33 | Object.assign(this, opts) |
34 | this.ports = this.state.ports |
35 | // tracks unbounded ports that we have |
36 | this._unboundPorts = new Set() |
37 | this._saturationPromise = new Promise((resolve, reject) => { |
38 | this._saturationResolve = resolve |
39 | }) |
40 | this._oldestMessagePromise = new Promise((resolve, reject) => { |
41 | this._oldestMessageResolve = resolve |
42 | }) |
43 | } |
44 | |
45 | /** |
46 | * binds a port to a name |
47 | * @param {Object} port - the port to bind |
48 | * @param {String} name - the name of the port |
49 | */ |
50 | async bind (name, port) { |
51 | if (this.isBound(port)) { |
52 | throw new Error('cannot bind a port that is already bound') |
53 | } else if (this.ports[name]) { |
54 | throw new Error('cannot bind port to a name that is alread bound') |
55 | } else { |
56 | this._unboundPorts.delete(port) |
57 | |
58 | port.messages.forEach(message => { |
59 | message._fromPort = port |
60 | message.fromName = name |
61 | }) |
62 | |
63 | // save the port instance |
64 | this.ports[name] = port |
65 | |
66 | // update the dest port |
67 | const destPort = await this.hypervisor.getDestPort(port) |
68 | destPort.destName = name |
69 | destPort.destId = this.id |
70 | delete destPort.destPort |
71 | } |
72 | } |
73 | |
74 | /** |
75 | * unbinds a port given its name |
76 | * @param {string} name |
77 | * @returns {Promise} |
78 | */ |
79 | async unbind (name) { |
80 | const port = this.ports[name] |
81 | delete this.ports[name] |
82 | this._unboundPorts.add(port) |
83 | this.hypervisor.addNodeToCheck(this.id) |
84 | |
85 | // update the destination port |
86 | const destPort = await this.hypervisor.getDestPort(port) |
87 | delete destPort.destName |
88 | delete destPort.destId |
89 | destPort.destPort = port |
90 | return port |
91 | } |
92 | |
93 | /** |
94 | * delete an port given the name it is bound to |
95 | * @param {string} name |
96 | */ |
97 | delete (name) { |
98 | const port = this.ports[name] |
99 | this.kernel.send(port, new DeleteMessage()) |
100 | this._delete(name) |
101 | } |
102 | |
103 | _delete (name) { |
104 | this.hypervisor.addNodeToCheck(this.id) |
105 | delete this.ports[name] |
106 | } |
107 | |
108 | /** |
109 | * clears any unbounded ports referances |
110 | */ |
111 | clearUnboundedPorts () { |
112 | this._unboundPorts.forEach(port => { |
113 | this.kernel.send(port, new DeleteMessage()) |
114 | }) |
115 | this._unboundPorts.clear() |
116 | if (!Object.keys(this.ports).length) { |
117 | this.hypervisor.addNodeToCheck(this.id) |
118 | } |
119 | } |
120 | |
121 | /** |
122 | * check if a port object is still valid |
123 | * @param {Object} port |
124 | * @return {Boolean} |
125 | */ |
126 | isBound (port) { |
127 | return !this._unboundPorts.has(port) |
128 | } |
129 | |
130 | /** |
131 | * queues a message on a port |
132 | * @param {Message} message |
133 | */ |
134 | queue (name, message) { |
135 | const port = this.ports[name] |
136 | |
137 | message._fromPort = port |
138 | message.fromName = name |
139 | |
140 | if (port.messages.push(message) === 1) { |
141 | if (this._isSaturated()) { |
142 | this._saturationResolve() |
143 | this._saturationPromise = new Promise((resolve, reject) => { |
144 | this._saturationResolve = resolve |
145 | }) |
146 | } |
147 | |
148 | if (message._fromTicks < this._messageTickThreshold) { |
149 | this._oldestMessageResolve(message) |
150 | this._oldestMessagePromise = new Promise((resolve, reject) => { |
151 | this._oldestMessageResolve = resolve |
152 | }) |
153 | this._messageTickThreshold = Infinity |
154 | } |
155 | } |
156 | } |
157 | |
158 | /** |
159 | * gets a port given it's name |
160 | * @param {String} name |
161 | * @return {Object} |
162 | */ |
163 | get (name) { |
164 | return this.ports[name] |
165 | } |
166 | |
167 | /** |
168 | * creates a channel returns the created ports in an Array |
169 | * @returns {array} |
170 | */ |
171 | createChannel () { |
172 | const port1 = { |
173 | messages: [] |
174 | } |
175 | |
176 | const port2 = { |
177 | messages: [], |
178 | destPort: port1 |
179 | } |
180 | |
181 | port1.destPort = port2 |
182 | this._unboundPorts.add(port1) |
183 | this._unboundPorts.add(port2) |
184 | return [port1, port2] |
185 | } |
186 | |
187 | // find and returns the next message |
188 | _peekNextMessage () { |
189 | const names = Object.keys(this.ports) |
190 | if (names.length) { |
191 | const portName = names.reduce(messageArbiter.bind(this)) |
192 | const port = this.ports[portName] |
193 | return port.messages[0] |
194 | } |
195 | } |
196 | |
197 | /** |
198 | * Waits for the the next message if any |
199 | * @returns {Promise} |
200 | */ |
201 | async getNextMessage () { |
202 | let message = this._peekNextMessage() |
203 | let saturated = this._isSaturated() |
204 | let oldestTime = this.hypervisor.scheduler.oldest() |
205 | |
206 | while (!saturated && // end if there are messages on all the ports |
207 | // end if we have a message older then slowest containers |
208 | !((message && oldestTime >= message._fromTicks) || |
209 | // end if there are no messages and this container is the oldest contaner |
210 | (!message && oldestTime === this.kernel.ticks))) { |
211 | const ticksToWait = message ? message._fromTicks : this.kernel.ticks |
212 | |
213 | await Promise.race([ |
214 | this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => { |
215 | message = this._peekNextMessage() |
216 | }), |
217 | this._olderMessage(message).then(m => { |
218 | message = m |
219 | }), |
220 | this._whenSaturated().then(() => { |
221 | saturated = true |
222 | message = this._peekNextMessage() |
223 | }) |
224 | ]) |
225 | |
226 | oldestTime = this.hypervisor.scheduler.oldest() |
227 | } |
228 | |
229 | return message |
230 | } |
231 | |
232 | // tests wether or not all the ports have a message |
233 | _isSaturated () { |
234 | const keys = Object.keys(this.ports) |
235 | return keys.length ? keys.every(name => this.ports[name].messages.length) : 0 |
236 | } |
237 | |
238 | // returns a promise that resolve when the ports are saturated |
239 | _whenSaturated () { |
240 | return this._saturationPromise |
241 | } |
242 | |
243 | // returns a promise that resolve when a message older then the given message |
244 | // is recived |
245 | _olderMessage (message) { |
246 | this._messageTickThreshold = message ? message._fromTicks : -1 |
247 | return this._oldestMessagePromise |
248 | } |
249 | |
250 | removeSentPorts (message) { |
251 | message.ports.forEach(port => this._unboundPorts.delete(port)) |
252 | } |
253 | |
254 | addReceivedPorts (message) { |
255 | message.ports.forEach(port => this._unboundPorts.add(port)) |
256 | } |
257 | |
258 | checkSendingPorts (message) { |
259 | for (const port of message.ports) { |
260 | if (this.isBound(port)) { |
261 | throw new Error('message must not contain bound ports') |
262 | } |
263 | } |
264 | } |
265 | } |
266 |
Built with git-ssb-web