git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 0a8350a93125e32b15276e387bc08d9e51610c0c

Files: 0a8350a93125e32b15276e387bc08d9e51610c0c / portManager.js

6850 bytesRaw
1const DeleteMessage = require('./deleteMessage')
2
3// decides which message to go first
4function messageArbiter (nameA, nameB) {
5 const a = this.ports[nameA].messages[0]
6 const b = this.ports[nameB].messages[0]
7
8 if (!a) {
9 return nameB
10 } else if (!b) {
11 return nameA
12 }
13
14 // order by number of ticks if messages have different number of ticks
15 if (a._fromTicks !== b._fromTicks) {
16 return a._fromTicks < b._fromTicks ? nameA : nameB
17 } else {
18 // insertion order
19 return nameA
20 }
21}
22
23module.exports = class PortManager {
24 /**
25 * The port manager manages the the ports. This inculdes creation, deletion
26 * fetching and waiting on ports
27 * @param {Object} opts
28 * @param {Object} opts.state
29 * @param {Object} opts.hypervisor
30 * @param {Object} opts.exoInterface
31 */
32 constructor (opts) {
33 Object.assign(this, opts)
34 this.ports = this.state.ports
35 // tracks unbounded ports that we have
36 this._unboundPorts = new Set()
37 this._saturationPromise = new Promise((resolve, reject) => {
38 this._saturationResolve = resolve
39 })
40 this._oldestMessagePromise = new Promise((resolve, reject) => {
41 this._oldestMessageResolve = resolve
42 })
43 }
44
45 /**
46 * binds a port to a name
47 * @param {Object} port - the port to bind
48 * @param {String} name - the name of the port
49 */
50 async bind (name, port) {
51 if (this.isBound(port)) {
52 throw new Error('cannot bind a port that is already bound')
53 } else if (this.ports[name]) {
54 throw new Error('cannot bind port to a name that is alread bound')
55 } else {
56 this._unboundPorts.delete(port)
57
58 // save the port instance
59 this.ports[name] = port
60
61 // console.log(name, port)
62 // update the dest port
63 const destPort = await this.hypervisor.getDestPort(port)
64 port.messages.forEach(message => {
65 message._fromPort = port
66 message.fromName = name
67 })
68
69 if (destPort) {
70 destPort.destName = name
71 destPort.destId = this.id
72 delete destPort.destPort
73 }
74 }
75 }
76
77 /**
78 * unbinds a port given its name
79 * @param {string} name
80 * @returns {Promise}
81 */
82 async unbind (name) {
83 const port = this.ports[name]
84 delete this.ports[name]
85 this._unboundPorts.add(port)
86 this.hypervisor.addNodeToCheck(this.id)
87
88 // update the destination port
89 const destPort = await this.hypervisor.getDestPort(port)
90 delete destPort.destName
91 delete destPort.destId
92 destPort.destPort = port
93
94 return port
95 }
96
97 /**
98 * delete an port given the name it is bound to
99 * @param {string} name
100 */
101 async delete (name) {
102 const port = this.ports[name]
103 await this.kernel.send(port, new DeleteMessage())
104 this._delete(name)
105 }
106
107 _delete (name) {
108 this.hypervisor.addNodeToCheck(this.id)
109 delete this.ports[name]
110 }
111
112 /**
113 * clears any unbounded ports referances
114 */
115 clearUnboundedPorts () {
116 const waits = []
117 this._unboundPorts.forEach(port => {
118 waits.push(this.kernel.send(port, new DeleteMessage()))
119 })
120 this._unboundPorts.clear()
121 return Promise.all(waits)
122 }
123
124 /**
125 * check if a port object is still valid
126 * @param {Object} port
127 * @return {Boolean}
128 */
129 isBound (port) {
130 return !this._unboundPorts.has(port)
131 }
132
133 /**
134 * queues a message on a port
135 * @param {Message} message
136 */
137 queue (name, message) {
138 const port = this.ports[name]
139
140 message._fromPort = port
141 message.fromName = name
142
143 const numOfMsg = port.messages.push(message)
144
145 if (numOfMsg === 1) {
146 if (this._isSaturated()) {
147 this._saturationResolve()
148 this._saturationPromise = new Promise((resolve, reject) => {
149 this._saturationResolve = resolve
150 })
151 } else if (message._fromTicks < this._messageTickThreshold) {
152 this._oldestMessageResolve(message)
153 this._oldestMessagePromise = new Promise((resolve, reject) => {
154 this._oldestMessageResolve = resolve
155 })
156 this._messageTickThreshold = Infinity
157 }
158 }
159 }
160
161 /**
162 * gets a port given it's name
163 * @param {String} name
164 * @return {Object}
165 */
166 get (name) {
167 return this.ports[name]
168 }
169
170 /**
171 * creates a channel returns the created ports in an Array
172 * @returns {array}
173 */
174 createChannel () {
175 const [port1, port2] = this.hypervisor.createChannel()
176 this._unboundPorts.add(port1)
177 this._unboundPorts.add(port2)
178 return [port1, port2]
179 }
180
181 // find and returns the next message
182 _peekNextMessage () {
183 const names = Object.keys(this.ports)
184 if (names.length) {
185 const portName = names.reduce(messageArbiter.bind(this))
186 const port = this.ports[portName]
187 return port.messages[0]
188 }
189 }
190
191 /**
192 * Waits for the the next message if any
193 * @returns {Promise}
194 */
195 async getNextMessage () {
196 let message = this._peekNextMessage()
197 let saturated = this._isSaturated()
198 let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks()
199
200 while (!saturated && // end if there are messages on all the ports
201 // end if we have a message older then slowest containers
202 !((message && oldestTime >= message._fromTicks) ||
203 // end if there are no messages and this container is the oldest contaner
204 (!message && oldestTime === this.kernel.ticks))) {
205 const ticksToWait = message ? message._fromTicks : this.kernel.ticks
206
207 await Promise.race([
208 this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => {
209 message = this._peekNextMessage()
210 }),
211 this._olderMessage(message).then(m => {
212 message = m
213 }),
214 this._whenSaturated().then(() => {
215 saturated = true
216 message = this._peekNextMessage()
217 })
218 ])
219
220 oldestTime = this.hypervisor.scheduler.leastNumberOfTicks()
221 }
222
223 return message
224 }
225
226 // tests wether or not all the ports have a message
227 _isSaturated () {
228 const keys = Object.keys(this.ports)
229 return keys.length ? keys.every(name => this.ports[name].messages.length) : 0
230 }
231
232 // returns a promise that resolve when the ports are saturated
233 _whenSaturated () {
234 return this._saturationPromise
235 }
236
237 // returns a promise that resolve when a message older then the given message
238 // is recived
239 _olderMessage (message) {
240 this._messageTickThreshold = message ? message._fromTicks : -1
241 return this._oldestMessagePromise
242 }
243
244 removeSentPorts (message) {
245 message.ports.forEach(port => this._unboundPorts.delete(port))
246 }
247
248 addReceivedPorts (message) {
249 message.ports.forEach(port => this._unboundPorts.add(port))
250 }
251
252 checkSendingPorts (message) {
253 for (const port of message.ports) {
254 if (this.isBound(port)) {
255 throw new Error('message must not contain bound ports')
256 }
257 }
258 }
259}
260

Built with git-ssb-web