git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 73b5b187d94ac2f96f31922f4249e72df021ec1c

Files: 73b5b187d94ac2f96f31922f4249e72df021ec1c / portManager.js

7124 bytesRaw
1const BN = require('bn.js')
2const DeleteMessage = require('./deleteMessage')
3
4// decides which message to go first
5function messageArbiter (nameA, nameB) {
6 const a = this.ports[nameA].messages[0]
7 const b = this.ports[nameB].messages[0]
8
9 if (!a) {
10 return nameB
11 } else if (!b) {
12 return nameA
13 }
14
15 // order by number of ticks if messages have different number of ticks
16 if (a._fromTicks !== b._fromTicks) {
17 return a._fromTicks < b._fromTicks ? nameA : nameB
18 } else {
19 // insertion order
20 return nameA
21 }
22}
23
24module.exports = class PortManager {
25 /**
26 * The port manager manages the the ports. This inculdes creation, deletion
27 * fetching and waiting on ports
28 * @param {Object} opts
29 * @param {Object} opts.state
30 * @param {Object} opts.hypervisor
31 * @param {Object} opts.exoInterface
32 */
33 constructor (opts) {
34 Object.assign(this, opts)
35 this.ports = this.state.ports
36 // tracks unbounded ports that we have
37 this._unboundPorts = new Set()
38 this._saturationPromise = new Promise((resolve, reject) => {
39 this._saturationResolve = resolve
40 })
41 this._oldestMessagePromise = new Promise((resolve, reject) => {
42 this._oldestMessageResolve = resolve
43 })
44 }
45
46 /**
47 * binds a port to a name
48 * @param {Object} port - the port to bind
49 * @param {String} name - the name of the port
50 */
51 async bind (name, port) {
52 if (this.isBound(port)) {
53 throw new Error('cannot bind a port that is already bound')
54 } else if (this.ports[name]) {
55 throw new Error('cannot bind port to a name that is alread bound')
56 } else {
57 this._unboundPorts.delete(port)
58
59 port.messages.forEach(message => {
60 message._fromPort = port
61 message.fromName = name
62 })
63
64 // save the port instance
65 this.ports[name] = port
66
67 // update the dest port
68 const destPort = await this.hypervisor.getDestPort(port)
69 destPort.destName = name
70 destPort.destId = this.id
71 delete destPort.destPort
72 }
73 }
74
75 /**
76 * unbinds a port given its name
77 * @param {string} name
78 * @returns {Promise}
79 */
80 async unbind (name) {
81 const port = this.ports[name]
82 delete this.ports[name]
83 this._unboundPorts.add(port)
84 this.hypervisor.addNodeToCheck(this.id)
85
86 // update the destination port
87 const destPort = await this.hypervisor.getDestPort(port)
88 delete destPort.destName
89 delete destPort.destId
90 destPort.destPort = port
91 return port
92 }
93
94 /**
95 * delete an port given the name it is bound to
96 * @param {string} name
97 */
98 delete (name) {
99 const port = this.ports[name]
100 this.exInterface.send(port, new DeleteMessage())
101 this._delete(name)
102 }
103
104 _delete (name) {
105 this.hypervisor.addNodeToCheck(this.id)
106 delete this.ports[name]
107 }
108
109 /**
110 * clears any unbounded ports referances
111 */
112 clearUnboundedPorts () {
113 this._unboundPorts.forEach(port => {
114 this.exInterface.send(port, new DeleteMessage())
115 })
116 this._unboundPorts.clear()
117 if (Object.keys(this.ports).length === 0) {
118 this.hypervisor.addNodeToCheck(this.id)
119 }
120 }
121
122 /**
123 * check if a port object is still valid
124 * @param {Object} port
125 * @return {Boolean}
126 */
127 isBound (port) {
128 return !this._unboundPorts.has(port)
129 }
130
131 /**
132 * queues a message on a port
133 * @param {Message} message
134 */
135 queue (name, message) {
136 const port = this.ports[name]
137
138 message._fromPort = port
139 message.fromName = name
140
141 if (port.messages.push(message) === 1) {
142 if (this._isSaturated()) {
143 this._saturationResolve()
144 this._saturationPromise = new Promise((resolve, reject) => {
145 this._saturationResolve = resolve
146 })
147 }
148
149 if (message._fromTicks < this._messageTickThreshold) {
150 this._oldestMessageResolve(message)
151 this._oldestMessagePromise = new Promise((resolve, reject) => {
152 this._oldestMessageResolve = resolve
153 })
154 this._messageTickThreshold = Infinity
155 }
156 }
157 }
158
159 /**
160 * gets a port given it's name
161 * @param {String} name
162 * @return {Object}
163 */
164 get (name) {
165 return this.ports[name]
166 }
167
168 /**
169 * creates a new container. Returning a port to it.
170 * @param {String} type
171 * @param {*} data - the data to populate the initail state with
172 * @returns {Object}
173 */
174 create (type, data) {
175 let nonce = this.state.nonce
176
177 const id = {
178 nonce: nonce,
179 parent: this.id
180 }
181
182 // incerment the nonce
183 nonce = new BN(nonce)
184 nonce.iaddn(1)
185 this.state.nonce = nonce.toArray()
186
187 // create a new channel for the container
188 const ports = this.createChannel()
189 this._unboundPorts.delete(ports[1])
190 this.hypervisor.createInstance(type, data, [ports[1]], id)
191
192 return ports[0]
193 }
194
195 /**
196 * creates a channel returns the created ports in an Array
197 * @returns {array}
198 */
199 createChannel () {
200 const port1 = {
201 messages: []
202 }
203
204 const port2 = {
205 messages: [],
206 destPort: port1
207 }
208
209 port1.destPort = port2
210 this._unboundPorts.add(port1)
211 this._unboundPorts.add(port2)
212 return [port1, port2]
213 }
214
215 // find and returns the next message
216 _peekNextMessage () {
217 const names = Object.keys(this.ports)
218 if (names.length) {
219 const portName = names.reduce(messageArbiter.bind(this))
220 const port = this.ports[portName]
221 return port.messages[0]
222 }
223 }
224
225 /**
226 * Waits for the the next message if any
227 * @returns {Promise}
228 */
229 async getNextMessage () {
230 let message = this._peekNextMessage()
231 let saturated = this._isSaturated()
232 let oldestTime = this.hypervisor.scheduler.oldest()
233
234 while (!saturated && // end if there are messages on all the ports
235 // end if we have a message older then slowest containers
236 !((message && oldestTime >= message._fromTicks) ||
237 // end if there are no messages and this container is the oldest contaner
238 (!message && oldestTime === this.exInterface.ticks))) {
239 const ticksToWait = message ? message._fromTicks : this.exInterface.ticks
240
241 await Promise.race([
242 this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => {
243 message = this._peekNextMessage()
244 }),
245 this._olderMessage(message).then(m => {
246 message = m
247 }),
248 this._whenSaturated().then(() => {
249 saturated = true
250 message = this._peekNextMessage()
251 })
252 ])
253
254 oldestTime = this.hypervisor.scheduler.oldest()
255 }
256 return message
257 }
258
259 // tests wether or not all the ports have a message
260 _isSaturated () {
261 const keys = Object.keys(this.ports)
262 return keys.length ? keys.every(name => this.ports[name].messages.length) : 0
263 }
264
265 // returns a promise that resolve when the ports are saturated
266 _whenSaturated () {
267 return this._saturationPromise
268 }
269
270 // returns a promise that resolve when a message older then the given message
271 // is recived
272 _olderMessage (message) {
273 this._messageTickThreshold = message ? message._fromTicks : 0
274 return this._oldestMessagePromise
275 }
276}
277

Built with git-ssb-web