git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: b7561a6ca259ee9c581614ff1f8878ed5eeee970

Files: b7561a6ca259ee9c581614ff1f8878ed5eeee970 / portManager.js

6830 bytesRaw
1const DeleteMessage = require('./deleteMessage')
2
3// decides which message to go first
4function messageArbiter (nameA, nameB) {
5 const a = this.ports[nameA].messages[0]
6 const b = this.ports[nameB].messages[0]
7
8 if (!a) {
9 return nameB
10 } else if (!b) {
11 return nameA
12 }
13
14 // order by number of ticks if messages have different number of ticks
15 if (a._fromTicks !== b._fromTicks) {
16 return a._fromTicks < b._fromTicks ? nameA : nameB
17 } else {
18 // insertion order
19 return nameA
20 }
21}
22
23module.exports = class PortManager {
24 /**
25 * The port manager manages the the ports. This inculdes creation, deletion
26 * fetching and waiting on ports
27 * @param {Object} opts
28 * @param {Object} opts.state
29 * @param {Object} opts.hypervisor
30 * @param {Object} opts.exoInterface
31 */
32 constructor (opts) {
33 Object.assign(this, opts)
34 this.ports = this.state.ports
35 // tracks unbounded ports that we have
36 this._unboundPorts = new Set()
37 this._saturationPromise = new Promise((resolve, reject) => {
38 this._saturationResolve = resolve
39 })
40 this._oldestMessagePromise = new Promise((resolve, reject) => {
41 this._oldestMessageResolve = resolve
42 })
43 }
44
45 /**
46 * binds a port to a name
47 * @param {Object} port - the port to bind
48 * @param {String} name - the name of the port
49 */
50 async bind (name, port) {
51 if (this.isBound(port)) {
52 throw new Error('cannot bind a port that is already bound')
53 } else if (this.ports[name]) {
54 throw new Error('cannot bind port to a name that is alread bound')
55 } else {
56 this._unboundPorts.delete(port)
57
58 port.messages.forEach(message => {
59 message._fromPort = port
60 message.fromName = name
61 })
62
63 // save the port instance
64 this.ports[name] = port
65
66 // update the dest port
67 const destPort = await this.hypervisor.getDestPort(port)
68 destPort.destName = name
69 destPort.destId = this.id
70 delete destPort.destPort
71 }
72 }
73
74 /**
75 * unbinds a port given its name
76 * @param {string} name
77 * @returns {Promise}
78 */
79 async unbind (name) {
80 const port = this.ports[name]
81 delete this.ports[name]
82 this._unboundPorts.add(port)
83 this.hypervisor.addNodeToCheck(this.id)
84
85 // update the destination port
86 const destPort = await this.hypervisor.getDestPort(port)
87 delete destPort.destName
88 delete destPort.destId
89 destPort.destPort = port
90 return port
91 }
92
93 /**
94 * delete an port given the name it is bound to
95 * @param {string} name
96 */
97 delete (name) {
98 const port = this.ports[name]
99 this.kernel.send(port, new DeleteMessage())
100 this._delete(name)
101 }
102
103 _delete (name) {
104 this.hypervisor.addNodeToCheck(this.id)
105 delete this.ports[name]
106 }
107
108 /**
109 * clears any unbounded ports referances
110 */
111 clearUnboundedPorts () {
112 this._unboundPorts.forEach(port => {
113 this.kernel.send(port, new DeleteMessage())
114 })
115 this._unboundPorts.clear()
116 if (!Object.keys(this.ports).length) {
117 this.hypervisor.addNodeToCheck(this.id)
118 }
119 }
120
121 /**
122 * check if a port object is still valid
123 * @param {Object} port
124 * @return {Boolean}
125 */
126 isBound (port) {
127 return !this._unboundPorts.has(port)
128 }
129
130 /**
131 * queues a message on a port
132 * @param {Message} message
133 */
134 queue (name, message) {
135 const port = this.ports[name]
136
137 message._fromPort = port
138 message.fromName = name
139
140 if (port.messages.push(message) === 1) {
141 if (this._isSaturated()) {
142 this._saturationResolve()
143 this._saturationPromise = new Promise((resolve, reject) => {
144 this._saturationResolve = resolve
145 })
146 }
147
148 if (message._fromTicks < this._messageTickThreshold) {
149 this._oldestMessageResolve(message)
150 this._oldestMessagePromise = new Promise((resolve, reject) => {
151 this._oldestMessageResolve = resolve
152 })
153 this._messageTickThreshold = Infinity
154 }
155 }
156 }
157
158 /**
159 * gets a port given it's name
160 * @param {String} name
161 * @return {Object}
162 */
163 get (name) {
164 return this.ports[name]
165 }
166
167 /**
168 * creates a channel returns the created ports in an Array
169 * @returns {array}
170 */
171 createChannel () {
172 const port1 = {
173 messages: []
174 }
175
176 const port2 = {
177 messages: [],
178 destPort: port1
179 }
180
181 port1.destPort = port2
182 this._unboundPorts.add(port1)
183 this._unboundPorts.add(port2)
184 return [port1, port2]
185 }
186
187 // find and returns the next message
188 _peekNextMessage () {
189 const names = Object.keys(this.ports)
190 if (names.length) {
191 const portName = names.reduce(messageArbiter.bind(this))
192 const port = this.ports[portName]
193 return port.messages[0]
194 }
195 }
196
197 /**
198 * Waits for the the next message if any
199 * @returns {Promise}
200 */
201 async getNextMessage () {
202 let message = this._peekNextMessage()
203 let saturated = this._isSaturated()
204 let oldestTime = this.hypervisor.scheduler.oldest()
205
206 while (!saturated && // end if there are messages on all the ports
207 // end if we have a message older then slowest containers
208 !((message && oldestTime >= message._fromTicks) ||
209 // end if there are no messages and this container is the oldest contaner
210 (!message && oldestTime === this.kernel.ticks))) {
211 const ticksToWait = message ? message._fromTicks : this.kernel.ticks
212
213 await Promise.race([
214 this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => {
215 message = this._peekNextMessage()
216 }),
217 this._olderMessage(message).then(m => {
218 message = m
219 }),
220 this._whenSaturated().then(() => {
221 saturated = true
222 message = this._peekNextMessage()
223 })
224 ])
225
226 oldestTime = this.hypervisor.scheduler.oldest()
227 }
228
229 return message
230 }
231
232 // tests wether or not all the ports have a message
233 _isSaturated () {
234 const keys = Object.keys(this.ports)
235 return keys.length ? keys.every(name => this.ports[name].messages.length) : 0
236 }
237
238 // returns a promise that resolve when the ports are saturated
239 _whenSaturated () {
240 return this._saturationPromise
241 }
242
243 // returns a promise that resolve when a message older then the given message
244 // is recived
245 _olderMessage (message) {
246 this._messageTickThreshold = message ? message._fromTicks : -1
247 return this._oldestMessagePromise
248 }
249
250 removeSentPorts (message) {
251 message.ports.forEach(port => this._unboundPorts.delete(port))
252 }
253
254 addReceivedPorts (message) {
255 message.ports.forEach(port => this._unboundPorts.add(port))
256 }
257
258 checkSendingPorts (message) {
259 for (const port of message.ports) {
260 if (this.isBound(port)) {
261 throw new Error('message must not contain bound ports')
262 }
263 }
264 }
265}
266

Built with git-ssb-web