git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: d30608cc6c438ea4527a506e061d183c30099168

Files: d30608cc6c438ea4527a506e061d183c30099168 / portManager.js

6072 bytesRaw
1const BN = require('bn.js')
2const Message = require('primea-message')
3
4// decides which message to go first
5function messageArbiter (nameA, nameB) {
6 const a = this.ports[nameA].messages[0]
7 const b = this.ports[nameB].messages[0]
8
9 if (!a) {
10 return nameB
11 } else if (!b) {
12 return nameA
13 }
14
15 // order by number of ticks if messages have different number of ticks
16 if (a._fromTicks !== b._fromTicks) {
17 return a._fromTicks < b._fromTicks ? nameA : nameB
18 } else {
19 // insertion order
20 return nameA
21 }
22}
23
24module.exports = class PortManager {
25 /**
26 * The port manager manages the the ports. This inculdes creation, deletion
27 * fetching and waiting on ports
28 * @param {Object} opts
29 * @param {Object} opts.state
30 * @param {Object} opts.entryPort
31 * @param {Object} opts.parentPort
32 * @param {Object} opts.hypervisor
33 * @param {Object} opts.exoInterface
34 */
35 constructor (opts) {
36 Object.assign(this, opts)
37 this.ports = this.state.ports
38 this._unboundPorts = new Set()
39 this._waitingPorts = {}
40 this._saturationPromise = new Promise((resolve, reject) => {
41 this._saturationResolve = resolve
42 })
43 this._oldestMessagePromise = new Promise((resolve, reject) => {
44 this._oldestMessageResolve = resolve
45 })
46 }
47
48 /**
49 * binds a port to a name
50 * @param {Object} port - the port to bind
51 * @param {String} name - the name of the port
52 */
53 async bind (name, port) {
54 if (this.isBound(port)) {
55 throw new Error('cannot bind a port that is already bound')
56 } else if (this.ports[name]) {
57 throw new Error('cannot bind port to a name that is alread bound')
58 } else {
59 this._unboundPorts.delete(port)
60 this.hypervisor.removeNodeToCheck(this.id)
61
62 // save the port instance
63 this.ports[name] = port
64
65 // update the dest port
66 const destPort = await this.hypervisor.getDestPort(port)
67 destPort.destName = name
68 destPort.destId = this.id
69 delete destPort.destPort
70 }
71 }
72
73 /**
74 * unbinds a port given its name
75 * @param {string} name
76 * @returns {Promise}
77 */
78 async unbind (name) {
79 const port = this.ports[name]
80 delete this.ports[name]
81 this._unboundPorts.add(port)
82
83 // update the destination port
84 const destPort = await this.hypervisor.getDestPort(port)
85 delete destPort.destName
86 delete destPort.destId
87 destPort.destPort = port
88 this.hypervisor.addNodeToCheck(this.id)
89 return port
90 }
91
92 /**
93 * delete an port given the name it is bound to
94 * @param {string} name
95 */
96 delete (name) {
97 const port = this.ports[name]
98 this.exInterface.send(port, new Message({
99 data: 'delete'
100 }))
101 this._delete(name)
102 }
103
104 _delete (name) {
105 this.hypervisor.addNodeToCheck(this.id)
106 delete this.ports[name]
107 }
108
109 /**
110 * clears any unbounded ports referances
111 */
112 clearUnboundedPorts () {
113 this._unboundPorts.forEach(port => {
114 this.exInterface.send(port, new Message({
115 data: 'delete'
116 }))
117 })
118 this._unboundPorts.clear()
119 if (Object.keys(this.ports).length === 0) {
120 this.hypervisor.deleteInstance(this.id)
121 }
122 }
123
124 /**
125 * check if a port object is still valid
126 * @param {Object} port
127 * @return {Boolean}
128 */
129 isBound (port) {
130 return !this._unboundPorts.has(port)
131 }
132
133 /**
134 * queues a message on a port
135 * @param {Message} message
136 */
137 queue (name, message) {
138 if (name) {
139 const port = this.ports[name]
140 if (port.messages.push(message) === 1 && message._fromTicks < this._messageTickThreshold) {
141 message._fromPort = port
142 message.fromName = name
143 this._oldestMessageResolve(message)
144 this._oldestMessagePromise = new Promise((resolve, reject) => {
145 this._oldestMessageResolve = resolve
146 })
147 this._messageTickThreshold = Infinity
148 }
149 }
150 if (this.isSaturated()) {
151 this._saturationResolve()
152 this._saturationPromise = new Promise((resolve, reject) => {
153 this._saturationResolve = resolve
154 })
155 }
156 }
157
158 /**
159 * gets a port given it's name
160 * @param {String} name
161 * @return {Object}
162 */
163 get (name) {
164 return this.ports[name]
165 }
166
167 /**
168 * creates a new container. Returning a port to it.
169 * @param {String} type
170 * @param {*} data - the data to populate the initail state with
171 * @returns {Object}
172 */
173 create (type, data) {
174 let nonce = this.state.nonce
175
176 const id = {
177 nonce: nonce,
178 parent: this.id
179 }
180
181 // incerment the nonce
182 nonce = new BN(nonce)
183 nonce.iaddn(1)
184 this.state.nonce = nonce.toArray()
185
186 // create a new channel for the container
187 const ports = this.createChannel()
188 this._unboundPorts.delete(ports[1])
189 this.hypervisor.createInstance(type, data, [ports[1]], id)
190
191 return ports[0]
192 }
193
194 /**
195 * creates a channel returns the created ports in an Array
196 * @returns {array}
197 */
198 createChannel () {
199 const port1 = {
200 messages: []
201 }
202
203 const port2 = {
204 messages: [],
205 destPort: port1
206 }
207
208 port1.destPort = port2
209 this._unboundPorts.add(port1)
210 this._unboundPorts.add(port2)
211 return [port1, port2]
212 }
213
214 /**
215 * find and returns the next message
216 * @returns {object}
217 */
218 peekNextMessage () {
219 const names = Object.keys(this.ports)
220 if (names.length) {
221 const portName = names.reduce(messageArbiter.bind(this))
222 const port = this.ports[portName]
223 const message = port.messages[0]
224
225 if (message) {
226 message._fromPort = port
227 message.fromName = portName
228 return message
229 }
230 }
231 }
232
233 /**
234 * tests wether or not all the ports have a message
235 * @returns {boolean}
236 */
237 isSaturated () {
238 const keys = Object.keys(this.ports)
239 return keys.length ? keys.every(name => this.ports[name].messages.length) : 0
240 }
241
242 whenSaturated () {
243 return this._saturationPromise
244 }
245
246 olderMessage (message) {
247 this._messageTickThreshold = message ? message._fromTicks : 0
248 return this._oldestMessagePromise
249 }
250}
251

Built with git-ssb-web