git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 66df0eab8d15d49d023146aa317bc8ed5a517cd1

Files: 66df0eab8d15d49d023146aa317bc8ed5a517cd1 / portManager.js

6824 bytesRaw
1const DeleteMessage = require('./deleteMessage')
2
3// decides which message to go first
4function messageArbiter (nameA, nameB) {
5 const a = this.ports[nameA].messages[0]
6 const b = this.ports[nameB].messages[0]
7
8 if (!a) {
9 return nameB
10 } else if (!b) {
11 return nameA
12 }
13
14 // order by number of ticks if messages have different number of ticks
15 if (a._fromTicks !== b._fromTicks) {
16 return a._fromTicks < b._fromTicks ? nameA : nameB
17 } else {
18 // insertion order
19 return nameA
20 }
21}
22
23module.exports = class PortManager {
24 /**
25 * The port manager manages the the ports. This inculdes creation, deletion
26 * fetching and waiting on ports
27 * @param {Object} opts
28 * @param {Object} opts.state
29 * @param {Object} opts.hypervisor
30 * @param {Object} opts.exoInterface
31 */
32 constructor (opts) {
33 Object.assign(this, opts)
34 this.ports = this.state.ports
35 // tracks unbounded ports that we have
36 this._unboundPorts = new Set()
37 this._saturationPromise = new Promise((resolve, reject) => {
38 this._saturationResolve = resolve
39 })
40 this._oldestMessagePromise = new Promise((resolve, reject) => {
41 this._oldestMessageResolve = resolve
42 })
43 }
44
45 /**
46 * binds a port to a name
47 * @param {Object} port - the port to bind
48 * @param {String} name - the name of the port
49 */
50 async bind (name, port) {
51 if (this.isBound(port)) {
52 throw new Error('cannot bind a port that is already bound')
53 } else if (this.ports[name]) {
54 throw new Error('cannot bind port to a name that is alread bound')
55 } else {
56 this._unboundPorts.delete(port)
57
58 // save the port instance
59 this.ports[name] = port
60
61 // update the dest port
62 const destPort = await this.hypervisor.getDestPort(port)
63 port.messages.forEach(message => {
64 message._fromPort = port
65 message.fromName = name
66 })
67
68 if (destPort) {
69 destPort.destName = name
70 destPort.destId = this.id
71 delete destPort.destPort
72 }
73 }
74 }
75
76 /**
77 * unbinds a port given its name
78 * @param {string} name
79 * @returns {Promise}
80 */
81 async unbind (name) {
82 const port = this.ports[name]
83 delete this.ports[name]
84 this._unboundPorts.add(port)
85 this.hypervisor.addNodeToCheck(this.id)
86
87 // update the destination port
88 const destPort = await this.hypervisor.getDestPort(port)
89 delete destPort.destName
90 delete destPort.destId
91 destPort.destPort = port
92 return port
93 }
94
95 /**
96 * delete an port given the name it is bound to
97 * @param {string} name
98 */
99 async delete (name) {
100 const port = this.ports[name]
101 await this.kernel.send(port, new DeleteMessage())
102 this._delete(name)
103 }
104
105 _delete (name) {
106 this.hypervisor.addNodeToCheck(this.id)
107 delete this.ports[name]
108 }
109
110 /**
111 * clears any unbounded ports referances
112 */
113 clearUnboundedPorts () {
114 const waits = []
115 this._unboundPorts.forEach(port => {
116 waits.push(this.kernel.send(port, new DeleteMessage()))
117 })
118 this._unboundPorts.clear()
119 return Promise.all(waits)
120 }
121
122 /**
123 * check if a port object is still valid
124 * @param {Object} port
125 * @return {Boolean}
126 */
127 isBound (port) {
128 return !this._unboundPorts.has(port)
129 }
130
131 /**
132 * queues a message on a port
133 * @param {Message} message
134 */
135 queue (name, message) {
136 const port = this.ports[name]
137
138 message._fromPort = port
139 message.fromName = name
140
141 if (port.messages.push(message) === 1) {
142 if (this._isSaturated()) {
143 this._saturationResolve()
144 this._saturationPromise = new Promise((resolve, reject) => {
145 this._saturationResolve = resolve
146 })
147 }
148
149 if (message._fromTicks < this._messageTickThreshold) {
150 this._oldestMessageResolve(message)
151 this._oldestMessagePromise = new Promise((resolve, reject) => {
152 this._oldestMessageResolve = resolve
153 })
154 this._messageTickThreshold = Infinity
155 }
156 }
157 }
158
159 /**
160 * gets a port given it's name
161 * @param {String} name
162 * @return {Object}
163 */
164 get (name) {
165 return this.ports[name]
166 }
167
168 /**
169 * creates a channel returns the created ports in an Array
170 * @returns {array}
171 */
172 createChannel () {
173 const [port1, port2] = this.hypervisor.createChannel()
174 this._unboundPorts.add(port1)
175 this._unboundPorts.add(port2)
176 return [port1, port2]
177 }
178
179 // find and returns the next message
180 _peekNextMessage () {
181 const names = Object.keys(this.ports)
182 if (names.length) {
183 const portName = names.reduce(messageArbiter.bind(this))
184 const port = this.ports[portName]
185 return port.messages[0]
186 }
187 }
188
189 waitOnPort (port, timeout) {
190
191 }
192
193 /**
194 * Waits for the the next message if any
195 * @returns {Promise}
196 */
197 async getNextMessage () {
198 let message = this._peekNextMessage()
199 let saturated = this._isSaturated()
200 let oldestTime = this.hypervisor.scheduler.leastNumberOfTicks()
201
202 while (!saturated && // end if there are messages on all the ports
203 // end if we have a message older then slowest containers
204 !((message && oldestTime >= message._fromTicks) ||
205 // end if there are no messages and this container is the oldest contaner
206 (!message && oldestTime === this.kernel.ticks))) {
207 const ticksToWait = message ? message._fromTicks : this.kernel.ticks
208
209 await Promise.race([
210 this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => {
211 message = this._peekNextMessage()
212 }),
213 this._olderMessage(message).then(m => {
214 message = m
215 }),
216 this._whenSaturated().then(() => {
217 saturated = true
218 message = this._peekNextMessage()
219 })
220 ])
221
222 oldestTime = this.hypervisor.scheduler.leastNumberOfTicks()
223 }
224
225 return message
226 }
227
228 // tests wether or not all the ports have a message
229 _isSaturated () {
230 const keys = Object.keys(this.ports)
231 return keys.length ? keys.every(name => this.ports[name].messages.length) : 0
232 }
233
234 // returns a promise that resolve when the ports are saturated
235 _whenSaturated () {
236 return this._saturationPromise
237 }
238
239 // returns a promise that resolve when a message older then the given message
240 // is recived
241 _olderMessage (message) {
242 this._messageTickThreshold = message ? message._fromTicks : -1
243 return this._oldestMessagePromise
244 }
245
246 removeSentPorts (message) {
247 message.ports.forEach(port => this._unboundPorts.delete(port))
248 }
249
250 addReceivedPorts (message) {
251 message.ports.forEach(port => this._unboundPorts.add(port))
252 }
253
254 checkSendingPorts (message) {
255 for (const port of message.ports) {
256 if (this.isBound(port)) {
257 throw new Error('message must not contain bound ports')
258 }
259 }
260 }
261}
262

Built with git-ssb-web