Files: 37a595c1c849fab142d6c2aae0ab36c19e39bb1b / portManager.js
5424 bytesRaw
1 | const Port = require('./port.js') |
2 | const BN = require('bn.js') |
3 | const ENTRY = Symbol('entry') |
4 | |
5 | // decides which message to go first |
6 | function messageArbiter (pairA, pairB) { |
7 | const portA = pairA[1] |
8 | const portB = pairB[1] |
9 | const a = portA.peek() |
10 | const b = portB.peek() |
11 | |
12 | if (!a) { |
13 | return pairB |
14 | } else if (!b) { |
15 | return pairA |
16 | } |
17 | |
18 | // order by number of ticks if messages have different number of ticks |
19 | if (portA.ticks !== portB.ticks) { |
20 | return portA.ticks < portB.ticks ? pairA : pairB |
21 | } else if (a.priority !== b.priority) { |
22 | // decide by priority |
23 | return a.priority > b.priority ? pairA : pairB |
24 | } else { |
25 | // insertion order |
26 | return pairA |
27 | } |
28 | } |
29 | |
30 | module.exports = class PortManager { |
31 | /** |
32 | * The port manager manages the the ports. This inculdes creation, deletion |
33 | * fetching and waiting on ports |
34 | * @param {Object} opts |
35 | * @param {Object} opts.state |
36 | * @param {Object} opts.entryPort |
37 | * @param {Object} opts.parentPort |
38 | * @param {Object} opts.hypervisor |
39 | * @param {Object} opts.exoInterface |
40 | */ |
41 | constructor (opts) { |
42 | Object.assign(this, opts) |
43 | this._portMap = new Map() |
44 | this._unboundPort = new WeakSet() |
45 | } |
46 | |
47 | /** |
48 | * starts the port manager. This fetchs the ports from the state and maps |
49 | * them to thier names |
50 | * @returns {Promise} |
51 | */ |
52 | async start () { |
53 | // skip the root, since it doesn't have a parent |
54 | if (this.parentPort !== undefined) { |
55 | this._bindHandle(this.parentPort, ENTRY) |
56 | } |
57 | |
58 | // map ports to thier id's |
59 | this.ports = await this.hypervisor.graph.get(this.state, 'ports') |
60 | Object.keys(this.ports).map(name => { |
61 | const port = this.ports[name] |
62 | this._bindHandle(port, name) |
63 | }) |
64 | } |
65 | |
66 | _bindHandle (portHandle, name) { |
67 | const port = new Port(name) |
68 | this._portMap.set(portHandle, port) |
69 | } |
70 | |
71 | /** |
72 | * binds a port to a name |
73 | * @param {Object} port - the port to bind |
74 | * @param {String} name - the name of the port |
75 | */ |
76 | bind (port, name) { |
77 | if (this.isBound(port)) { |
78 | throw new Error('cannot bind a port that is already bound') |
79 | } else if (this.ports[name]) { |
80 | throw new Error('cannot bind port to a name that is alread bound') |
81 | } |
82 | // save the port instance |
83 | this.ports[name] = port |
84 | this._bindHandle(port, name) |
85 | } |
86 | |
87 | /** |
88 | * unbinds a port given its name |
89 | * @param {String} name |
90 | * @returns {boolean} whether or not the port was deleted |
91 | */ |
92 | unbind (name) { |
93 | const port = this.ports[name] |
94 | delete this.ports[name] |
95 | this._portMap.delete(port) |
96 | return port |
97 | } |
98 | |
99 | /** |
100 | * get the port name given its referance |
101 | * @return {string} |
102 | */ |
103 | getBoundName (portRef) { |
104 | return this._portMap.get(portRef).name |
105 | } |
106 | |
107 | /** |
108 | * check if a port object is still valid |
109 | * @param {Object} port |
110 | * @return {Boolean} |
111 | */ |
112 | isBound (port) { |
113 | return this._portMap.has(port) |
114 | } |
115 | |
116 | /** |
117 | * queues a message on a port |
118 | * @param {Message} message |
119 | */ |
120 | queue (message) { |
121 | this._portMap.get(message.fromPort).queue(message) |
122 | } |
123 | |
124 | /** |
125 | * gets a port given it's name |
126 | * @param {String} name |
127 | * @return {Object} |
128 | */ |
129 | get (name) { |
130 | return this.ports[name] |
131 | } |
132 | |
133 | _createPortObject (type, link) { |
134 | const parentId = this.entryPort ? this.entryPort.id : null |
135 | let nonce = this.state['/'].nonce |
136 | |
137 | const portRef = { |
138 | 'messages': [], |
139 | 'id': { |
140 | '/': { |
141 | nonce: nonce, |
142 | parent: parentId |
143 | } |
144 | }, |
145 | 'type': type, |
146 | 'link': link |
147 | } |
148 | |
149 | // incerment the nonce |
150 | nonce = new BN(nonce) |
151 | nonce.iaddn(1) |
152 | this.state['/'].nonce = nonce.toArray() |
153 | this._unboundPort.add(portRef) |
154 | return portRef |
155 | } |
156 | |
157 | /** |
158 | * creates a new Port given the container type |
159 | * @param {String} type |
160 | * @param {*} data - the data to populate the initail state with |
161 | * @returns {Object} the newly created port |
162 | */ |
163 | create (type, data) { |
164 | const container = this.hypervisor._containerTypes[type] |
165 | return this._createPortObject(type, { |
166 | '/': container.Constructor.createState(data) |
167 | }) |
168 | } |
169 | |
170 | /** |
171 | * waits till all ports have reached a threshold tick count |
172 | * @param {Integer} threshold - the number of ticks to wait |
173 | * @param {Object} fromPort - the port requesting the wait |
174 | * @param {Array} ports - the ports to wait on |
175 | * @returns {Promise} |
176 | */ |
177 | wait (threshold, fromPort = this.entryPort, ports = [...this._portMap]) { |
178 | // find the ports that have a smaller tick count then the threshold tick count |
179 | const unkownPorts = ports.filter(([portRef, port]) => { |
180 | return port.ticks < threshold && fromPort !== portRef |
181 | }) |
182 | |
183 | const promises = unkownPorts.map(async([portRef, port]) => { |
184 | // update the port's tick count |
185 | port.ticks = await this.hypervisor.wait(portRef, threshold, this.entryPort) |
186 | }) |
187 | |
188 | return Promise.all(promises) |
189 | } |
190 | |
191 | /** |
192 | * gets the next canonical message given the an array of ports to choose from |
193 | * @param {Array} ports |
194 | * @returns {Promise} |
195 | */ |
196 | async getNextMessage (ports = [...this._portMap]) { |
197 | if (ports.length) { |
198 | // find the oldest message |
199 | const ticks = ports.map(([name, port]) => { |
200 | return port.size ? port.ticks : this.exoInterface.ticks |
201 | }).reduce((ticksA, ticksB) => { |
202 | return ticksA < ticksB ? ticksA : ticksB |
203 | }) |
204 | |
205 | await this.wait(ticks) |
206 | const portMap = ports.reduce(messageArbiter) |
207 | return portMap[1].dequeue() |
208 | } |
209 | } |
210 | } |
211 |
Built with git-ssb-web