git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 46248da0b2e047c143b387676430c8d2d27cf4a3

Files: 46248da0b2e047c143b387676430c8d2d27cf4a3 / portManager.js

6140 bytesRaw
1const BN = require('bn.js')
2const Message = require('primea-message')
3
4// decides which message to go first
5function messageArbiter (nameA, nameB) {
6 const a = this.ports[nameA].messages[0]
7 const b = this.ports[nameB].messages[0]
8
9 if (!a) {
10 return nameB
11 } else if (!b) {
12 return nameA
13 }
14
15 // order by number of ticks if messages have different number of ticks
16 if (a._fromTicks !== b._fromTicks) {
17 return a._fromTicks < b._fromTicks ? nameA : nameB
18 } else {
19 // insertion order
20 return nameA
21 }
22}
23
24module.exports = class PortManager {
25 /**
26 * The port manager manages the the ports. This inculdes creation, deletion
27 * fetching and waiting on ports
28 * @param {Object} opts
29 * @param {Object} opts.state
30 * @param {Object} opts.entryPort
31 * @param {Object} opts.parentPort
32 * @param {Object} opts.hypervisor
33 * @param {Object} opts.exoInterface
34 */
35 constructor (opts) {
36 Object.assign(this, opts)
37 this.ports = this.state.ports
38 this._unboundPorts = new Set()
39 this._waitingPorts = {}
40 this._saturationPromise = new Promise((resolve, reject) => {
41 this._saturationResolve = resolve
42 })
43 this._oldestMessagePromise = new Promise((resolve, reject) => {
44 this._oldestMessageResolve = resolve
45 })
46 }
47
48 /**
49 * binds a port to a name
50 * @param {Object} port - the port to bind
51 * @param {String} name - the name of the port
52 */
53 async bind (name, port) {
54 if (this.isBound(port)) {
55 throw new Error('cannot bind a port that is already bound')
56 } else if (this.ports[name]) {
57 throw new Error('cannot bind port to a name that is alread bound')
58 } else {
59 this._unboundPorts.delete(port)
60 this.hypervisor.removeNodeToCheck(this.id)
61
62 // save the port instance
63 this.ports[name] = port
64
65 // update the dest port
66 const destPort = await this.hypervisor.getDestPort(port)
67 destPort.destName = name
68 destPort.destId = this.id
69 delete destPort.destPort
70 }
71 }
72
73 /**
74 * unbinds a port given its name
75 * @param {string} name
76 * @returns {Promise}
77 */
78 async unbind (name) {
79 const port = this.ports[name]
80 delete this.ports[name]
81 this._unboundPorts.add(port)
82
83 // update the destination port
84 const destPort = await this.hypervisor.getDestPort(port)
85 delete destPort.destName
86 delete destPort.destId
87 destPort.destPort = port
88 this.hypervisor.addNodeToCheck(this.id)
89 return port
90 }
91
92 /**
93 * delete an port given the name it is bound to
94 * @param {string} name
95 */
96 delete (name) {
97 const port = this.ports[name]
98 this.exInterface.send(port, new Message({
99 data: 'delete'
100 }))
101 this._delete(name)
102 }
103
104 _delete (name) {
105 this.hypervisor.addNodeToCheck(this.id)
106 delete this.ports[name]
107 }
108
109 /**
110 * clears any unbounded ports referances
111 */
112 clearUnboundedPorts () {
113 this._unboundPorts.forEach(port => {
114 this.exInterface.send(port, new Message({
115 data: 'delete'
116 }))
117 })
118 this._unboundPorts.clear()
119 if (Object.keys(this.ports).length === 0) {
120 this.hypervisor.deleteInstance(this.id)
121 }
122 }
123
124 /**
125 * check if a port object is still valid
126 * @param {Object} port
127 * @return {Boolean}
128 */
129 isBound (port) {
130 return !this._unboundPorts.has(port)
131 }
132
133 /**
134 * queues a message on a port
135 * @param {Message} message
136 */
137 queue (name, message) {
138 if (name) {
139 const port = this.ports[name]
140 if (port.messages.push(message) === 1 && message._fromTicks < this._messageTickThreshold) {
141 message._fromPort = port
142 message.fromName = name
143 this._oldestMessageResolve(message)
144 this._oldestMessagePromise = new Promise((resolve, reject) => {
145 this._oldestMessageResolve = resolve
146 })
147 this._messageTickThreshold = Infinity
148 }
149
150 if (this.isSaturated()) {
151 this._saturationResolve()
152 this._saturationPromise = new Promise((resolve, reject) => {
153 this._saturationResolve = resolve
154 })
155 }
156 }
157 }
158
159 /**
160 * gets a port given it's name
161 * @param {String} name
162 * @return {Object}
163 */
164 get (name) {
165 return this.ports[name]
166 }
167
168 /**
169 * creates a new container. Returning a port to it.
170 * @param {String} type
171 * @param {*} data - the data to populate the initail state with
172 * @returns {Object}
173 */
174 create (type, data) {
175 let nonce = this.state.nonce
176
177 const id = {
178 nonce: nonce,
179 parent: this.id
180 }
181
182 // incerment the nonce
183 nonce = new BN(nonce)
184 nonce.iaddn(1)
185 this.state.nonce = nonce.toArray()
186
187 // create a new channel for the container
188 const ports = this.createChannel()
189 this._unboundPorts.delete(ports[1])
190 const promise = this.hypervisor.createInstance(type, data, [ports[1]], id)
191 this.exInterface._addWork(promise)
192
193 return ports[0]
194 }
195
196 /**
197 * creates a channel returns the created ports in an Array
198 * @returns {array}
199 */
200 createChannel () {
201 const port1 = {
202 messages: []
203 }
204
205 const port2 = {
206 messages: [],
207 destPort: port1
208 }
209
210 port1.destPort = port2
211 this._unboundPorts.add(port1)
212 this._unboundPorts.add(port2)
213 return [port1, port2]
214 }
215
216 /**
217 * find and returns the next message
218 * @returns {object}
219 */
220 peekNextMessage () {
221 const names = Object.keys(this.ports)
222 if (names.length) {
223 const portName = names.reduce(messageArbiter.bind(this))
224 const port = this.ports[portName]
225 const message = port.messages[0]
226
227 if (message) {
228 message._fromPort = port
229 message.fromName = portName
230 return message
231 }
232 }
233 }
234
235 /**
236 * tests wether or not all the ports have a message
237 * @returns {boolean}
238 */
239 isSaturated () {
240 const keys = Object.keys(this.ports)
241 return keys.length ? keys.every(name => this.ports[name].messages.length) : 0
242 }
243
244 whenSaturated () {
245 return this._saturationPromise
246 }
247
248 olderMessage (message) {
249 this._messageTickThreshold = message ? message._fromTicks : 0
250 return this._oldestMessagePromise
251 }
252}
253

Built with git-ssb-web