Files: 6b20ce6345f5cf4e554a3d7b67876db977eb8070 / portManager.js
5398 bytesRaw
1 | const BN = require('bn.js') |
2 | |
3 | // decides which message to go first |
4 | function messageArbiter (nameA, nameB) { |
5 | const a = this.ports[nameA].messages[0] |
6 | const b = this.ports[nameB].messages[0] |
7 | |
8 | if (!a) { |
9 | return nameB |
10 | } else if (!b) { |
11 | return nameA |
12 | } |
13 | |
14 | // order by number of ticks if messages have different number of ticks |
15 | if (a._fromTicks !== b._fromTicks) { |
16 | return a._fromTicks < b._fromTicks ? nameA : nameB |
17 | } else { |
18 | // insertion order |
19 | return nameA |
20 | } |
21 | } |
22 | |
23 | module.exports = class PortManager { |
24 | /** |
25 | * The port manager manages the the ports. This inculdes creation, deletion |
26 | * fetching and waiting on ports |
27 | * @param {Object} opts |
28 | * @param {Object} opts.state |
29 | * @param {Object} opts.entryPort |
30 | * @param {Object} opts.parentPort |
31 | * @param {Object} opts.hypervisor |
32 | * @param {Object} opts.exoInterface |
33 | */ |
34 | constructor (opts) { |
35 | Object.assign(this, opts) |
36 | this.ports = this.state.ports |
37 | this._unboundPorts = new Set() |
38 | this._waitingPorts = {} |
39 | } |
40 | |
41 | /** |
42 | * binds a port to a name |
43 | * @param {Object} port - the port to bind |
44 | * @param {String} name - the name of the port |
45 | */ |
46 | async bind (name, port) { |
47 | if (this.isBound(port)) { |
48 | throw new Error('cannot bind a port that is already bound') |
49 | } else if (this.ports[name]) { |
50 | throw new Error('cannot bind port to a name that is alread bound') |
51 | } else { |
52 | this._unboundPorts.delete(port) |
53 | |
54 | // save the port instance |
55 | this.ports[name] = port |
56 | |
57 | // update the dest port |
58 | const destPort = await this.hypervisor.getDestPort(port) |
59 | destPort.destName = name |
60 | destPort.destId = this.id |
61 | delete destPort.destPort |
62 | } |
63 | } |
64 | |
65 | /** |
66 | * unbinds a port given its name |
67 | * @param {String} name |
68 | * @returns {boolean} whether or not the port was deleted |
69 | */ |
70 | async unbind (name) { |
71 | const port = this.ports[name] |
72 | delete this.ports[name] |
73 | this._unboundPorts.add(port) |
74 | |
75 | let destPort = port.destPort |
76 | // if the dest is unbound |
77 | if (destPort) { |
78 | delete destPort.destName |
79 | delete destPort.destId |
80 | } else { |
81 | destPort = await this.hypervisor.getDestPort(port) |
82 | } |
83 | destPort.destPort = port |
84 | return port |
85 | } |
86 | |
87 | delete (name) { |
88 | |
89 | } |
90 | |
91 | _deleteDestPort (port) { |
92 | this.exInterface.send(port, 'delete') |
93 | } |
94 | |
95 | _delete (name) { |
96 | delete this.ports[name] |
97 | } |
98 | |
99 | /** |
100 | * check if a port object is still valid |
101 | * @param {Object} port |
102 | * @return {Boolean} |
103 | */ |
104 | isBound (port) { |
105 | return !this._unboundPorts.has(port) |
106 | } |
107 | |
108 | /** |
109 | * queues a message on a port |
110 | * @param {Message} message |
111 | */ |
112 | queue (name, message) { |
113 | message.ports.forEach(port => { |
114 | this._unboundPorts.add(port) |
115 | }) |
116 | |
117 | const resolve = this._waitingPorts[name] |
118 | if (resolve) { |
119 | resolve(message) |
120 | } else if (name) { |
121 | this.ports[name].messages.push(message) |
122 | } |
123 | } |
124 | |
125 | /** |
126 | * gets a port given it's name |
127 | * @param {String} name |
128 | * @return {Object} |
129 | */ |
130 | get (name) { |
131 | return this.ports[name] |
132 | } |
133 | |
134 | /** |
135 | * creates a new Port given the container type |
136 | * @param {String} type |
137 | * @param {*} data - the data to populate the initail state with |
138 | * @returns {Promise} |
139 | */ |
140 | create (type, data) { |
141 | // const container = this.hypervisor._containerTypes[type] |
142 | let nonce = this.state.nonce |
143 | |
144 | const id = { |
145 | nonce: nonce, |
146 | parent: this.id |
147 | } |
148 | |
149 | const entryPort = { |
150 | messages: [] |
151 | } |
152 | |
153 | const port = { |
154 | messages: [], |
155 | destPort: entryPort |
156 | } |
157 | |
158 | entryPort.destPort = port |
159 | |
160 | this.hypervisor.createInstance(type, data, [entryPort], id) |
161 | |
162 | // incerment the nonce |
163 | nonce = new BN(nonce) |
164 | nonce.iaddn(1) |
165 | this.state.nonce = nonce.toArray() |
166 | this._unboundPorts.add(port) |
167 | return port |
168 | } |
169 | |
170 | /** |
171 | * waits till all ports have reached a threshold tick count |
172 | * @param {Integer} threshold - the number of ticks to wait |
173 | * @param {Object} fromPort - the port requesting the wait |
174 | * @param {Array} ports - the ports to wait on |
175 | * @returns {Promise} |
176 | */ |
177 | wait (ticks, port) { |
178 | if (this._waitingPorts[port]) { |
179 | throw new Error('cannot wait on port that already has a wait on it') |
180 | } |
181 | const message = this.ports[port].message.shift() |
182 | if (message) { |
183 | return message |
184 | } else { |
185 | const waitPromise = this.hypervisor.scheduler.wait(ticks) |
186 | const promise = new Promise((resolve, reject) => { |
187 | this._waitingPorts[port] = resolve |
188 | }) |
189 | |
190 | return Promise.race([waitPromise, promise]) |
191 | } |
192 | } |
193 | |
194 | /** |
195 | * gets the next canonical message given the an array of ports to choose from |
196 | * @param {Array} ports |
197 | * @returns {Promise} |
198 | */ |
199 | nextMessage () { |
200 | const portName = Object.keys(this.ports).reduce(messageArbiter.bind(this)) |
201 | const port = this.ports[portName] |
202 | const message = port.messages.shift() |
203 | message._fromPort = port |
204 | message.fromName = portName |
205 | return message |
206 | } |
207 | |
208 | peekNextMessage () { |
209 | const portName = Object.keys(this.ports).reduce(messageArbiter.bind(this)) |
210 | const port = this.ports[portName] |
211 | const message = port.messages[0] |
212 | message._fromPort = port |
213 | message.fromName = portName |
214 | return message |
215 | } |
216 | |
217 | hasMessages () { |
218 | return Object.keys(this.ports).some(name => this.ports[name].messages.length) |
219 | } |
220 | |
221 | isSaturated () { |
222 | return Object.keys(this.ports).every(name => this.ports[name].messages.length) |
223 | } |
224 | } |
225 |
Built with git-ssb-web