git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 6b20ce6345f5cf4e554a3d7b67876db977eb8070

Files: 6b20ce6345f5cf4e554a3d7b67876db977eb8070 / portManager.js

5398 bytesRaw
1const BN = require('bn.js')
2
3// decides which message to go first
4function messageArbiter (nameA, nameB) {
5 const a = this.ports[nameA].messages[0]
6 const b = this.ports[nameB].messages[0]
7
8 if (!a) {
9 return nameB
10 } else if (!b) {
11 return nameA
12 }
13
14 // order by number of ticks if messages have different number of ticks
15 if (a._fromTicks !== b._fromTicks) {
16 return a._fromTicks < b._fromTicks ? nameA : nameB
17 } else {
18 // insertion order
19 return nameA
20 }
21}
22
23module.exports = class PortManager {
24 /**
25 * The port manager manages the the ports. This inculdes creation, deletion
26 * fetching and waiting on ports
27 * @param {Object} opts
28 * @param {Object} opts.state
29 * @param {Object} opts.entryPort
30 * @param {Object} opts.parentPort
31 * @param {Object} opts.hypervisor
32 * @param {Object} opts.exoInterface
33 */
34 constructor (opts) {
35 Object.assign(this, opts)
36 this.ports = this.state.ports
37 this._unboundPorts = new Set()
38 this._waitingPorts = {}
39 }
40
41 /**
42 * binds a port to a name
43 * @param {Object} port - the port to bind
44 * @param {String} name - the name of the port
45 */
46 async bind (name, port) {
47 if (this.isBound(port)) {
48 throw new Error('cannot bind a port that is already bound')
49 } else if (this.ports[name]) {
50 throw new Error('cannot bind port to a name that is alread bound')
51 } else {
52 this._unboundPorts.delete(port)
53
54 // save the port instance
55 this.ports[name] = port
56
57 // update the dest port
58 const destPort = await this.hypervisor.getDestPort(port)
59 destPort.destName = name
60 destPort.destId = this.id
61 delete destPort.destPort
62 }
63 }
64
65 /**
66 * unbinds a port given its name
67 * @param {String} name
68 * @returns {boolean} whether or not the port was deleted
69 */
70 async unbind (name) {
71 const port = this.ports[name]
72 delete this.ports[name]
73 this._unboundPorts.add(port)
74
75 let destPort = port.destPort
76 // if the dest is unbound
77 if (destPort) {
78 delete destPort.destName
79 delete destPort.destId
80 } else {
81 destPort = await this.hypervisor.getDestPort(port)
82 }
83 destPort.destPort = port
84 return port
85 }
86
87 delete (name) {
88
89 }
90
91 _deleteDestPort (port) {
92 this.exInterface.send(port, 'delete')
93 }
94
95 _delete (name) {
96 delete this.ports[name]
97 }
98
99 /**
100 * check if a port object is still valid
101 * @param {Object} port
102 * @return {Boolean}
103 */
104 isBound (port) {
105 return !this._unboundPorts.has(port)
106 }
107
108 /**
109 * queues a message on a port
110 * @param {Message} message
111 */
112 queue (name, message) {
113 message.ports.forEach(port => {
114 this._unboundPorts.add(port)
115 })
116
117 const resolve = this._waitingPorts[name]
118 if (resolve) {
119 resolve(message)
120 } else if (name) {
121 this.ports[name].messages.push(message)
122 }
123 }
124
125 /**
126 * gets a port given it's name
127 * @param {String} name
128 * @return {Object}
129 */
130 get (name) {
131 return this.ports[name]
132 }
133
134 /**
135 * creates a new Port given the container type
136 * @param {String} type
137 * @param {*} data - the data to populate the initail state with
138 * @returns {Promise}
139 */
140 create (type, data) {
141 // const container = this.hypervisor._containerTypes[type]
142 let nonce = this.state.nonce
143
144 const id = {
145 nonce: nonce,
146 parent: this.id
147 }
148
149 const entryPort = {
150 messages: []
151 }
152
153 const port = {
154 messages: [],
155 destPort: entryPort
156 }
157
158 entryPort.destPort = port
159
160 this.hypervisor.createInstance(type, data, [entryPort], id)
161
162 // incerment the nonce
163 nonce = new BN(nonce)
164 nonce.iaddn(1)
165 this.state.nonce = nonce.toArray()
166 this._unboundPorts.add(port)
167 return port
168 }
169
170 /**
171 * waits till all ports have reached a threshold tick count
172 * @param {Integer} threshold - the number of ticks to wait
173 * @param {Object} fromPort - the port requesting the wait
174 * @param {Array} ports - the ports to wait on
175 * @returns {Promise}
176 */
177 wait (ticks, port) {
178 if (this._waitingPorts[port]) {
179 throw new Error('cannot wait on port that already has a wait on it')
180 }
181 const message = this.ports[port].message.shift()
182 if (message) {
183 return message
184 } else {
185 const waitPromise = this.hypervisor.scheduler.wait(ticks)
186 const promise = new Promise((resolve, reject) => {
187 this._waitingPorts[port] = resolve
188 })
189
190 return Promise.race([waitPromise, promise])
191 }
192 }
193
194 /**
195 * gets the next canonical message given the an array of ports to choose from
196 * @param {Array} ports
197 * @returns {Promise}
198 */
199 nextMessage () {
200 const portName = Object.keys(this.ports).reduce(messageArbiter.bind(this))
201 const port = this.ports[portName]
202 const message = port.messages.shift()
203 message._fromPort = port
204 message.fromName = portName
205 return message
206 }
207
208 peekNextMessage () {
209 const portName = Object.keys(this.ports).reduce(messageArbiter.bind(this))
210 const port = this.ports[portName]
211 const message = port.messages[0]
212 message._fromPort = port
213 message.fromName = portName
214 return message
215 }
216
217 hasMessages () {
218 return Object.keys(this.ports).some(name => this.ports[name].messages.length)
219 }
220
221 isSaturated () {
222 return Object.keys(this.ports).every(name => this.ports[name].messages.length)
223 }
224}
225

Built with git-ssb-web