Files: dee5ab84ac49442b22f614eaf5a76c873d572941 / portManager.js
5503 bytesRaw
1 | const Port = require('./port.js') |
2 | const BN = require('bn.js') |
3 | const ENTRY = Symbol('entry') |
4 | |
5 | // decides which message to go first |
6 | function messageArbiter (pairA, pairB) { |
7 | const portA = pairA[1] |
8 | const portB = pairB[1] |
9 | const a = portA.peek() |
10 | const b = portB.peek() |
11 | |
12 | if (!a) { |
13 | return pairB |
14 | } else if (!b) { |
15 | return pairA |
16 | } |
17 | |
18 | // order by number of ticks if messages have different number of ticks |
19 | if (portA.ticks !== portB.ticks) { |
20 | return portA.ticks < portB.ticks ? pairA : pairB |
21 | } else if (a.priority !== b.priority) { |
22 | // decide by priority |
23 | return a.priority > b.priority ? pairA : pairB |
24 | } else { |
25 | // insertion order |
26 | return pairA |
27 | } |
28 | } |
29 | |
30 | module.exports = class PortManager { |
31 | /** |
32 | * The port manager manages the the ports. This inculdes creation, deletion |
33 | * fetching and waiting on ports |
34 | * @param {Object} opts |
35 | * @param {Object} opts.state |
36 | * @param {Object} opts.entryPort |
37 | * @param {Object} opts.parentPort |
38 | * @param {Object} opts.hypervisor |
39 | * @param {Object} opts.exoInterface |
40 | */ |
41 | constructor (opts) { |
42 | Object.assign(this, opts) |
43 | this._portMap = new Map() |
44 | this._unboundPort = new WeakSet() |
45 | } |
46 | |
47 | /** |
48 | * starts the port manager. This fetchs the ports from the state and maps |
49 | * them to thier names |
50 | * @returns {Promise} |
51 | */ |
52 | async start () { |
53 | // skip the root, since it doesn't have a parent |
54 | if (this.parentPort !== undefined) { |
55 | this._bindRef(this.parentPort, ENTRY) |
56 | } |
57 | |
58 | // map ports to thier id's |
59 | this.ports = await this.hypervisor.graph.get(this.state, 'ports') |
60 | Object.keys(this.ports).map(name => { |
61 | const port = this.ports[name] |
62 | this._bindRef(port, name) |
63 | }) |
64 | } |
65 | |
66 | _bindRef (portRef, name) { |
67 | const port = new Port(name) |
68 | this._portMap.set(portRef, port) |
69 | } |
70 | |
71 | /** |
72 | * binds a port to a name |
73 | * @param {Object} port - the port to bind |
74 | * @param {String} name - the name of the port |
75 | */ |
76 | bind (port, name) { |
77 | if (this.isBound(port)) { |
78 | throw new Error('cannot bind a port that is already bound') |
79 | } |
80 | // save the port instance |
81 | this.ports[name] = port |
82 | this._bindRef(port, name) |
83 | } |
84 | |
85 | /** |
86 | * unbinds a port given its name |
87 | * @param {String} name |
88 | * @returns {boolean} whether or not the port was deleted |
89 | */ |
90 | unbind (name) { |
91 | const port = this.ports[name] |
92 | delete this.ports[name] |
93 | this._portMap.delete(port) |
94 | return port |
95 | } |
96 | |
97 | /** |
98 | * get the port name given its referance |
99 | * @return {string} |
100 | */ |
101 | getBoundName (portRef) { |
102 | return this._portMap.get(portRef).name |
103 | } |
104 | |
105 | /** |
106 | * check if a port object is still valid |
107 | * @param {Object} port |
108 | * @return {Boolean} |
109 | */ |
110 | isBound (port) { |
111 | return this._portMap.has(port) |
112 | } |
113 | |
114 | /** |
115 | * queues a message on a port |
116 | * @param {Message} message |
117 | */ |
118 | queue (message) { |
119 | this._portMap.get(message.fromPort).queue(message) |
120 | } |
121 | |
122 | /** |
123 | * gets a port given it's name |
124 | * @param {String} name |
125 | * @return {Object} |
126 | */ |
127 | get (name) { |
128 | return this.ports[name] |
129 | } |
130 | |
131 | _createPortObject (type, link) { |
132 | const parentId = this.entryPort ? this.entryPort.id : null |
133 | let nonce = this.state['/'].nonce |
134 | |
135 | const portRef = { |
136 | 'messages': [], |
137 | 'id': { |
138 | '/': { |
139 | nonce: nonce, |
140 | parent: parentId |
141 | } |
142 | }, |
143 | 'type': type, |
144 | 'link': link |
145 | } |
146 | |
147 | // incerment the nonce |
148 | nonce = new BN(nonce) |
149 | nonce.iaddn(1) |
150 | this.state['/'].nonce = nonce.toArray() |
151 | this._unboundPort.add(portRef) |
152 | return portRef |
153 | } |
154 | |
155 | /** |
156 | * creates a new Port given the container type |
157 | * @param {String} type |
158 | * @param {*} data - the data to populate the initail state with |
159 | * @returns {Object} the newly created port |
160 | */ |
161 | create (type, data) { |
162 | const container = this.hypervisor._containerTypes[type] |
163 | return this._createPortObject(type, { |
164 | '/': container.Constructor.createState(data) |
165 | }) |
166 | } |
167 | |
168 | /** |
169 | * creates a copy of a port given a port referance |
170 | * @param {Object} port - the port to copy |
171 | */ |
172 | copy (port, type = port.type) { |
173 | return this._createPortObject(port.type, port.link) |
174 | } |
175 | |
176 | /** |
177 | * waits till all ports have reached a threshold tick count |
178 | * @param {Integer} threshold - the number of ticks to wait |
179 | * @param {Object} fromPort - the port requesting the wait |
180 | * @param {Array} ports - the ports to wait on |
181 | * @returns {Promise} |
182 | */ |
183 | wait (threshold, fromPort = this.entryPort, ports = [...this._portMap]) { |
184 | // find the ports that have a smaller tick count then the threshold tick count |
185 | const unkownPorts = ports.filter(([portRef, port]) => { |
186 | return port.ticks < threshold && fromPort !== portRef |
187 | }) |
188 | |
189 | const promises = unkownPorts.map(async([portRef, port]) => { |
190 | // update the port's tick count |
191 | port.ticks = await this.hypervisor.wait(portRef, threshold, this.entryPort) |
192 | }) |
193 | |
194 | return Promise.all(promises) |
195 | } |
196 | |
197 | /** |
198 | * gets the next canonical message given the an array of ports to choose from |
199 | * @param {Array} ports |
200 | * @returns {Promise} |
201 | */ |
202 | async getNextMessage (ports = [...this._portMap]) { |
203 | if (ports.length) { |
204 | // find the oldest message |
205 | const ticks = ports.map(([name, port]) => { |
206 | return port.size ? port.ticks : this.exoInterface.ticks |
207 | }).reduce((ticksA, ticksB) => { |
208 | return ticksA < ticksB ? ticksA : ticksB |
209 | }) |
210 | |
211 | await this.wait(ticks) |
212 | const portMap = ports.reduce(messageArbiter) |
213 | return portMap[1].dequeue() |
214 | } |
215 | } |
216 | } |
217 |
Built with git-ssb-web