Files: 46248da0b2e047c143b387676430c8d2d27cf4a3 / portManager.js
6140 bytesRaw
1 | const BN = require('bn.js') |
2 | const Message = require('primea-message') |
3 | |
4 | // decides which message to go first |
5 | function messageArbiter (nameA, nameB) { |
6 | const a = this.ports[nameA].messages[0] |
7 | const b = this.ports[nameB].messages[0] |
8 | |
9 | if (!a) { |
10 | return nameB |
11 | } else if (!b) { |
12 | return nameA |
13 | } |
14 | |
15 | // order by number of ticks if messages have different number of ticks |
16 | if (a._fromTicks !== b._fromTicks) { |
17 | return a._fromTicks < b._fromTicks ? nameA : nameB |
18 | } else { |
19 | // insertion order |
20 | return nameA |
21 | } |
22 | } |
23 | |
24 | module.exports = class PortManager { |
25 | /** |
26 | * The port manager manages the the ports. This inculdes creation, deletion |
27 | * fetching and waiting on ports |
28 | * @param {Object} opts |
29 | * @param {Object} opts.state |
30 | * @param {Object} opts.entryPort |
31 | * @param {Object} opts.parentPort |
32 | * @param {Object} opts.hypervisor |
33 | * @param {Object} opts.exoInterface |
34 | */ |
35 | constructor (opts) { |
36 | Object.assign(this, opts) |
37 | this.ports = this.state.ports |
38 | this._unboundPorts = new Set() |
39 | this._waitingPorts = {} |
40 | this._saturationPromise = new Promise((resolve, reject) => { |
41 | this._saturationResolve = resolve |
42 | }) |
43 | this._oldestMessagePromise = new Promise((resolve, reject) => { |
44 | this._oldestMessageResolve = resolve |
45 | }) |
46 | } |
47 | |
48 | /** |
49 | * binds a port to a name |
50 | * @param {Object} port - the port to bind |
51 | * @param {String} name - the name of the port |
52 | */ |
53 | async bind (name, port) { |
54 | if (this.isBound(port)) { |
55 | throw new Error('cannot bind a port that is already bound') |
56 | } else if (this.ports[name]) { |
57 | throw new Error('cannot bind port to a name that is alread bound') |
58 | } else { |
59 | this._unboundPorts.delete(port) |
60 | this.hypervisor.removeNodeToCheck(this.id) |
61 | |
62 | // save the port instance |
63 | this.ports[name] = port |
64 | |
65 | // update the dest port |
66 | const destPort = await this.hypervisor.getDestPort(port) |
67 | destPort.destName = name |
68 | destPort.destId = this.id |
69 | delete destPort.destPort |
70 | } |
71 | } |
72 | |
73 | /** |
74 | * unbinds a port given its name |
75 | * @param {string} name |
76 | * @returns {Promise} |
77 | */ |
78 | async unbind (name) { |
79 | const port = this.ports[name] |
80 | delete this.ports[name] |
81 | this._unboundPorts.add(port) |
82 | |
83 | // update the destination port |
84 | const destPort = await this.hypervisor.getDestPort(port) |
85 | delete destPort.destName |
86 | delete destPort.destId |
87 | destPort.destPort = port |
88 | this.hypervisor.addNodeToCheck(this.id) |
89 | return port |
90 | } |
91 | |
92 | /** |
93 | * delete an port given the name it is bound to |
94 | * @param {string} name |
95 | */ |
96 | delete (name) { |
97 | const port = this.ports[name] |
98 | this.exInterface.send(port, new Message({ |
99 | data: 'delete' |
100 | })) |
101 | this._delete(name) |
102 | } |
103 | |
104 | _delete (name) { |
105 | this.hypervisor.addNodeToCheck(this.id) |
106 | delete this.ports[name] |
107 | } |
108 | |
109 | /** |
110 | * clears any unbounded ports referances |
111 | */ |
112 | clearUnboundedPorts () { |
113 | this._unboundPorts.forEach(port => { |
114 | this.exInterface.send(port, new Message({ |
115 | data: 'delete' |
116 | })) |
117 | }) |
118 | this._unboundPorts.clear() |
119 | if (Object.keys(this.ports).length === 0) { |
120 | this.hypervisor.deleteInstance(this.id) |
121 | } |
122 | } |
123 | |
124 | /** |
125 | * check if a port object is still valid |
126 | * @param {Object} port |
127 | * @return {Boolean} |
128 | */ |
129 | isBound (port) { |
130 | return !this._unboundPorts.has(port) |
131 | } |
132 | |
133 | /** |
134 | * queues a message on a port |
135 | * @param {Message} message |
136 | */ |
137 | queue (name, message) { |
138 | if (name) { |
139 | const port = this.ports[name] |
140 | if (port.messages.push(message) === 1 && message._fromTicks < this._messageTickThreshold) { |
141 | message._fromPort = port |
142 | message.fromName = name |
143 | this._oldestMessageResolve(message) |
144 | this._oldestMessagePromise = new Promise((resolve, reject) => { |
145 | this._oldestMessageResolve = resolve |
146 | }) |
147 | this._messageTickThreshold = Infinity |
148 | } |
149 | |
150 | if (this.isSaturated()) { |
151 | this._saturationResolve() |
152 | this._saturationPromise = new Promise((resolve, reject) => { |
153 | this._saturationResolve = resolve |
154 | }) |
155 | } |
156 | } |
157 | } |
158 | |
159 | /** |
160 | * gets a port given it's name |
161 | * @param {String} name |
162 | * @return {Object} |
163 | */ |
164 | get (name) { |
165 | return this.ports[name] |
166 | } |
167 | |
168 | /** |
169 | * creates a new container. Returning a port to it. |
170 | * @param {String} type |
171 | * @param {*} data - the data to populate the initail state with |
172 | * @returns {Object} |
173 | */ |
174 | create (type, data) { |
175 | let nonce = this.state.nonce |
176 | |
177 | const id = { |
178 | nonce: nonce, |
179 | parent: this.id |
180 | } |
181 | |
182 | // incerment the nonce |
183 | nonce = new BN(nonce) |
184 | nonce.iaddn(1) |
185 | this.state.nonce = nonce.toArray() |
186 | |
187 | // create a new channel for the container |
188 | const ports = this.createChannel() |
189 | this._unboundPorts.delete(ports[1]) |
190 | const promise = this.hypervisor.createInstance(type, data, [ports[1]], id) |
191 | this.exInterface._addWork(promise) |
192 | |
193 | return ports[0] |
194 | } |
195 | |
196 | /** |
197 | * creates a channel returns the created ports in an Array |
198 | * @returns {array} |
199 | */ |
200 | createChannel () { |
201 | const port1 = { |
202 | messages: [] |
203 | } |
204 | |
205 | const port2 = { |
206 | messages: [], |
207 | destPort: port1 |
208 | } |
209 | |
210 | port1.destPort = port2 |
211 | this._unboundPorts.add(port1) |
212 | this._unboundPorts.add(port2) |
213 | return [port1, port2] |
214 | } |
215 | |
216 | /** |
217 | * find and returns the next message |
218 | * @returns {object} |
219 | */ |
220 | peekNextMessage () { |
221 | const names = Object.keys(this.ports) |
222 | if (names.length) { |
223 | const portName = names.reduce(messageArbiter.bind(this)) |
224 | const port = this.ports[portName] |
225 | const message = port.messages[0] |
226 | |
227 | if (message) { |
228 | message._fromPort = port |
229 | message.fromName = portName |
230 | return message |
231 | } |
232 | } |
233 | } |
234 | |
235 | /** |
236 | * tests wether or not all the ports have a message |
237 | * @returns {boolean} |
238 | */ |
239 | isSaturated () { |
240 | const keys = Object.keys(this.ports) |
241 | return keys.length ? keys.every(name => this.ports[name].messages.length) : 0 |
242 | } |
243 | |
244 | whenSaturated () { |
245 | return this._saturationPromise |
246 | } |
247 | |
248 | olderMessage (message) { |
249 | this._messageTickThreshold = message ? message._fromTicks : 0 |
250 | return this._oldestMessagePromise |
251 | } |
252 | } |
253 |
Built with git-ssb-web