git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 544aad0311fb3d40875bbd64bbdb01d7253fe810

Files: 544aad0311fb3d40875bbd64bbdb01d7253fe810 / portManager.js

6833 bytesRaw
1const DeleteMessage = require('./deleteMessage')
2
3// decides which message to go first
4function messageArbiter (nameA, nameB) {
5 const a = this.ports[nameA].messages[0]
6 const b = this.ports[nameB].messages[0]
7
8 if (!a) {
9 return nameB
10 } else if (!b) {
11 return nameA
12 }
13
14 // order by number of ticks if messages have different number of ticks
15 if (a._fromTicks !== b._fromTicks) {
16 return a._fromTicks < b._fromTicks ? nameA : nameB
17 } else {
18 // insertion order
19 return nameA
20 }
21}
22
23module.exports = class PortManager {
24 /**
25 * The port manager manages the the ports. This inculdes creation, deletion
26 * fetching and waiting on ports
27 * @param {Object} opts
28 * @param {Object} opts.state
29 * @param {Object} opts.hypervisor
30 * @param {Object} opts.exoInterface
31 */
32 constructor (opts) {
33 Object.assign(this, opts)
34 this.ports = this.state.ports
35 // tracks unbounded ports that we have
36 this._unboundPorts = new Set()
37 this._saturationPromise = new Promise((resolve, reject) => {
38 this._saturationResolve = resolve
39 })
40 this._oldestMessagePromise = new Promise((resolve, reject) => {
41 this._oldestMessageResolve = resolve
42 })
43 }
44
45 /**
46 * binds a port to a name
47 * @param {Object} port - the port to bind
48 * @param {String} name - the name of the port
49 */
50 async bind (name, port) {
51 if (this.isBound(port)) {
52 throw new Error('cannot bind a port that is already bound')
53 } else if (this.ports[name]) {
54 throw new Error('cannot bind port to a name that is alread bound')
55 } else {
56 this._unboundPorts.delete(port)
57
58 // save the port instance
59 this.ports[name] = port
60
61 // update the dest port
62 const destPort = await this.hypervisor.getDestPort(port)
63 port.messages.forEach(message => {
64 message._fromPort = port
65 message.fromName = name
66 })
67 destPort.destName = name
68 destPort.destId = this.id
69 delete destPort.destPort
70 }
71 }
72
73 /**
74 * unbinds a port given its name
75 * @param {string} name
76 * @returns {Promise}
77 */
78 async unbind (name) {
79 const port = this.ports[name]
80 delete this.ports[name]
81 this._unboundPorts.add(port)
82 this.hypervisor.addNodeToCheck(this.id)
83
84 // update the destination port
85 const destPort = await this.hypervisor.getDestPort(port)
86 delete destPort.destName
87 delete destPort.destId
88 destPort.destPort = port
89 return port
90 }
91
92 /**
93 * delete an port given the name it is bound to
94 * @param {string} name
95 */
96 async delete (name) {
97 const port = this.ports[name]
98 await this.kernel.send(port, new DeleteMessage())
99 this._delete(name)
100 }
101
102 _delete (name) {
103 this.hypervisor.addNodeToCheck(this.id)
104 delete this.ports[name]
105 }
106
107 /**
108 * clears any unbounded ports referances
109 */
110 clearUnboundedPorts () {
111 const waits = []
112 this._unboundPorts.forEach(port => {
113 waits.push(this.kernel.send(port, new DeleteMessage()))
114 })
115 this._unboundPorts.clear()
116 return Promise.all(waits)
117 }
118
119 /**
120 * check if a port object is still valid
121 * @param {Object} port
122 * @return {Boolean}
123 */
124 isBound (port) {
125 return !this._unboundPorts.has(port)
126 }
127
128 /**
129 * queues a message on a port
130 * @param {Message} message
131 */
132 queue (name, message) {
133 const port = this.ports[name]
134
135 message._fromPort = port
136 message.fromName = name
137
138 if (port.messages.push(message) === 1) {
139 if (this._isSaturated()) {
140 this._saturationResolve()
141 this._saturationPromise = new Promise((resolve, reject) => {
142 this._saturationResolve = resolve
143 })
144 }
145
146 if (message._fromTicks < this._messageTickThreshold) {
147 this._oldestMessageResolve(message)
148 this._oldestMessagePromise = new Promise((resolve, reject) => {
149 this._oldestMessageResolve = resolve
150 })
151 this._messageTickThreshold = Infinity
152 }
153 }
154 }
155
156 /**
157 * gets a port given it's name
158 * @param {String} name
159 * @return {Object}
160 */
161 get (name) {
162 return this.ports[name]
163 }
164
165 /**
166 * creates a channel returns the created ports in an Array
167 * @returns {array}
168 */
169 createChannel () {
170 const port1 = {
171 messages: []
172 }
173
174 const port2 = {
175 messages: [],
176 destPort: port1
177 }
178
179 port1.destPort = port2
180 this._unboundPorts.add(port1)
181 this._unboundPorts.add(port2)
182 return [port1, port2]
183 }
184
185 // find and returns the next message
186 _peekNextMessage () {
187 const names = Object.keys(this.ports)
188 if (names.length) {
189 const portName = names.reduce(messageArbiter.bind(this))
190 const port = this.ports[portName]
191 return port.messages[0]
192 }
193 }
194
195 /**
196 * Waits for the the next message if any
197 * @returns {Promise}
198 */
199 async getNextMessage () {
200 let message = this._peekNextMessage()
201 let saturated = this._isSaturated()
202 let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks()
203
204 while (!saturated && // end if there are messages on all the ports
205 // end if we have a message older then slowest containers
206 !((message && oldestTime >= message._fromTicks) ||
207 // end if there are no messages and this container is the oldest contaner
208 (!message && oldestTime === this.kernel.ticks))) {
209 const ticksToWait = message ? message._fromTicks : this.kernel.ticks
210
211 await Promise.race([
212 this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => {
213 message = this._peekNextMessage()
214 }),
215 this._olderMessage(message).then(m => {
216 message = m
217 }),
218 this._whenSaturated().then(() => {
219 saturated = true
220 message = this._peekNextMessage()
221 })
222 ])
223
224 oldestTime = this.hypervisor.scheduler.leastNumberOfTicks()
225 }
226
227 return message
228 }
229
230 // tests wether or not all the ports have a message
231 _isSaturated () {
232 const keys = Object.keys(this.ports)
233 return keys.length ? keys.every(name => this.ports[name].messages.length) : 0
234 }
235
236 // returns a promise that resolve when the ports are saturated
237 _whenSaturated () {
238 return this._saturationPromise
239 }
240
241 // returns a promise that resolve when a message older then the given message
242 // is recived
243 _olderMessage (message) {
244 this._messageTickThreshold = message ? message._fromTicks : -1
245 return this._oldestMessagePromise
246 }
247
248 removeSentPorts (message) {
249 message.ports.forEach(port => this._unboundPorts.delete(port))
250 }
251
252 addReceivedPorts (message) {
253 message.ports.forEach(port => this._unboundPorts.add(port))
254 }
255
256 checkSendingPorts (message) {
257 for (const port of message.ports) {
258 if (this.isBound(port)) {
259 throw new Error('message must not contain bound ports')
260 }
261 }
262 }
263}
264

Built with git-ssb-web