git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 186843b537fb56c4bd7ddada96ebc089fcc3bde1

Files: 186843b537fb56c4bd7ddada96ebc089fcc3bde1 / portManager.js

7020 bytesRaw
1const BN = require('bn.js')
2const DeleteMessage = require('./deleteMessage')
3
4// decides which message to go first
5function messageArbiter (nameA, nameB) {
6 const a = this.ports[nameA].messages[0]
7 const b = this.ports[nameB].messages[0]
8
9 if (!a) {
10 return nameB
11 } else if (!b) {
12 return nameA
13 }
14
15 // order by number of ticks if messages have different number of ticks
16 if (a._fromTicks !== b._fromTicks) {
17 return a._fromTicks < b._fromTicks ? nameA : nameB
18 } else {
19 // insertion order
20 return nameA
21 }
22}
23
24module.exports = class PortManager {
25 /**
26 * The port manager manages the the ports. This inculdes creation, deletion
27 * fetching and waiting on ports
28 * @param {Object} opts
29 * @param {Object} opts.state
30 * @param {Object} opts.hypervisor
31 * @param {Object} opts.exoInterface
32 */
33 constructor (opts) {
34 Object.assign(this, opts)
35 this.ports = this.state.ports
36 // tracks unbounded ports that we have
37 this._unboundPorts = new Set()
38 this._saturationPromise = new Promise((resolve, reject) => {
39 this._saturationResolve = resolve
40 })
41 this._oldestMessagePromise = new Promise((resolve, reject) => {
42 this._oldestMessageResolve = resolve
43 })
44 }
45
46 /**
47 * binds a port to a name
48 * @param {Object} port - the port to bind
49 * @param {String} name - the name of the port
50 */
51 async bind (name, port) {
52 if (this.isBound(port)) {
53 throw new Error('cannot bind a port that is already bound')
54 } else if (this.ports[name]) {
55 throw new Error('cannot bind port to a name that is alread bound')
56 } else {
57 this._unboundPorts.delete(port)
58
59 port.messages.forEach(message => {
60 message._fromPort = port
61 message.fromName = name
62 })
63
64 // save the port instance
65 this.ports[name] = port
66
67 // update the dest port
68 const destPort = await this.hypervisor.getDestPort(port)
69 destPort.destName = name
70 destPort.destId = this.id
71 delete destPort.destPort
72 }
73 }
74
75 /**
76 * unbinds a port given its name
77 * @param {string} name
78 * @returns {Promise}
79 */
80 async unbind (name) {
81 const port = this.ports[name]
82 delete this.ports[name]
83 this._unboundPorts.add(port)
84 this.hypervisor.addNodeToCheck(this.id)
85
86 // update the destination port
87 const destPort = await this.hypervisor.getDestPort(port)
88 delete destPort.destName
89 delete destPort.destId
90 destPort.destPort = port
91 return port
92 }
93
94 /**
95 * delete an port given the name it is bound to
96 * @param {string} name
97 */
98 delete (name) {
99 const port = this.ports[name]
100 this.kernel.send(port, new DeleteMessage())
101 this._delete(name)
102 }
103
104 _delete (name) {
105 this.hypervisor.addNodeToCheck(this.id)
106 delete this.ports[name]
107 }
108
109 /**
110 * clears any unbounded ports referances
111 */
112 clearUnboundedPorts () {
113 this._unboundPorts.forEach(port => {
114 this.kernel.send(port, new DeleteMessage())
115 })
116 this._unboundPorts.clear()
117 if (Object.keys(this.ports).length === 0) {
118 this.hypervisor.addNodeToCheck(this.id)
119 }
120 }
121
122 /**
123 * check if a port object is still valid
124 * @param {Object} port
125 * @return {Boolean}
126 */
127 isBound (port) {
128 return !this._unboundPorts.has(port)
129 }
130
131 /**
132 * queues a message on a port
133 * @param {Message} message
134 */
135 queue (name, message) {
136 const port = this.ports[name]
137
138 message._fromPort = port
139 message.fromName = name
140
141 if (port.messages.push(message) === 1) {
142 if (this._isSaturated()) {
143 this._saturationResolve()
144 this._saturationPromise = new Promise((resolve, reject) => {
145 this._saturationResolve = resolve
146 })
147 }
148
149 if (message._fromTicks < this._messageTickThreshold) {
150 this._oldestMessageResolve(message)
151 this._oldestMessagePromise = new Promise((resolve, reject) => {
152 this._oldestMessageResolve = resolve
153 })
154 this._messageTickThreshold = Infinity
155 }
156 }
157 }
158
159 /**
160 * gets a port given it's name
161 * @param {String} name
162 * @return {Object}
163 */
164 get (name) {
165 return this.ports[name]
166 }
167
168 /**
169 * creates a new container. Returning a port to it.
170 * @param {String} type
171 * @param {*} data - the data to populate the initail state with
172 * @returns {Object}
173 */
174 create (type, message) {
175 let nonce = this.state.nonce
176
177 const id = {
178 nonce: nonce,
179 parent: this.id
180 }
181
182 // incerment the nonce
183 nonce = new BN(nonce)
184 nonce.iaddn(1)
185 this.state.nonce = nonce.toArray()
186 message.ports.forEach(port => this._unboundPorts.delete(port))
187
188 this.hypervisor.createInstance(type, message, id)
189 }
190
191 /**
192 * creates a channel returns the created ports in an Array
193 * @returns {array}
194 */
195 createChannel () {
196 const port1 = {
197 messages: []
198 }
199
200 const port2 = {
201 messages: [],
202 destPort: port1
203 }
204
205 port1.destPort = port2
206 this._unboundPorts.add(port1)
207 this._unboundPorts.add(port2)
208 return [port1, port2]
209 }
210
211 // find and returns the next message
212 _peekNextMessage () {
213 const names = Object.keys(this.ports)
214 if (names.length) {
215 const portName = names.reduce(messageArbiter.bind(this))
216 const port = this.ports[portName]
217 return port.messages[0]
218 }
219 }
220
221 /**
222 * Waits for the the next message if any
223 * @returns {Promise}
224 */
225 async getNextMessage () {
226 let message = this._peekNextMessage()
227 let saturated = this._isSaturated()
228 let oldestTime = this.hypervisor.scheduler.oldest()
229
230 while (!saturated && // end if there are messages on all the ports
231 // end if we have a message older then slowest containers
232 !((message && oldestTime >= message._fromTicks) ||
233 // end if there are no messages and this container is the oldest contaner
234 (!message && oldestTime === this.kernel.ticks))) {
235 const ticksToWait = message ? message._fromTicks : this.kernel.ticks
236
237 await Promise.race([
238 this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => {
239 message = this._peekNextMessage()
240 }),
241 this._olderMessage(message).then(m => {
242 message = m
243 }),
244 this._whenSaturated().then(() => {
245 saturated = true
246 message = this._peekNextMessage()
247 })
248 ])
249
250 oldestTime = this.hypervisor.scheduler.oldest()
251 }
252
253 return message
254 }
255
256 // tests wether or not all the ports have a message
257 _isSaturated () {
258 const keys = Object.keys(this.ports)
259 return keys.length ? keys.every(name => this.ports[name].messages.length) : 0
260 }
261
262 // returns a promise that resolve when the ports are saturated
263 _whenSaturated () {
264 return this._saturationPromise
265 }
266
267 // returns a promise that resolve when a message older then the given message
268 // is recived
269 _olderMessage (message) {
270 this._messageTickThreshold = message ? message._fromTicks : 0
271 return this._oldestMessagePromise
272 }
273}
274

Built with git-ssb-web