Files: 42bb5a2f36de56726c4bd6ede09c95352d5b545d / portManager.js
4730 bytesRaw
1 | const Port = require('./port.js') |
2 | const BN = require('bn.js') |
3 | const ENTRY = Symbol('entry') |
4 | |
5 | // decides which message to go first |
6 | function messageArbiter (pairA, pairB) { |
7 | const portA = pairA[1] |
8 | const portB = pairB[1] |
9 | const a = portA.peek() |
10 | const b = portB.peek() |
11 | |
12 | if (!a) { |
13 | return pairB |
14 | } else if (!b) { |
15 | return pairA |
16 | } |
17 | |
18 | // order by number of ticks if messages have different number of ticks |
19 | if (portA.ticks !== portB.ticks) { |
20 | return portA.ticks < portB.ticks ? pairA : pairB |
21 | } else if (a.priority !== b.priority) { |
22 | // decide by priority |
23 | return a.priority > b.priority ? pairA : pairB |
24 | } else { |
25 | // insertion order |
26 | return pairA |
27 | } |
28 | } |
29 | |
30 | module.exports = class PortManager { |
31 | /** |
32 | * The port manager manages the the ports. This inculdes creation, deletion |
33 | * fetching and waiting on ports |
34 | * @param {Object} opts |
35 | * @param {Object} opts.state |
36 | * @param {Object} opts.entryPort |
37 | * @param {Object} opts.parentPort |
38 | * @param {Object} opts.hypervisor |
39 | * @param {Object} opts.exoInterface |
40 | */ |
41 | constructor (opts) { |
42 | Object.assign(this, opts) |
43 | this._portMap = new Map() |
44 | } |
45 | |
46 | /** |
47 | * starts the port manager. This fetchs the ports from the state and maps |
48 | * them to thier names |
49 | * @returns {Promise} |
50 | */ |
51 | async start () { |
52 | // skip the root, since it doesn't have a parent |
53 | if (this.parentPort !== undefined) { |
54 | this._bindRef(this.parentPort, ENTRY) |
55 | } |
56 | |
57 | // map ports to thier id's |
58 | this.ports = await this.hypervisor.graph.get(this.state, 'ports') |
59 | Object.keys(this.ports).map(name => { |
60 | const port = this.ports[name] |
61 | this._bindRef(port, name) |
62 | }) |
63 | } |
64 | |
65 | _bindRef (portRef, name) { |
66 | const port = new Port() |
67 | this._portMap.set(portRef, port) |
68 | } |
69 | |
70 | /** |
71 | * binds a port to a name |
72 | * @param {Object} port - the port to bind |
73 | * @param {String} name - the name of the port |
74 | */ |
75 | bind (port, name) { |
76 | // save the port instance |
77 | this.ports[name] = port |
78 | this._bindRef(port, name) |
79 | } |
80 | |
81 | /** |
82 | * queues a message on a port |
83 | * @param {Message} message |
84 | */ |
85 | queue (message) { |
86 | this._portMap.get(message.fromPort).queue(message) |
87 | } |
88 | |
89 | /** |
90 | * gets a port given it's name |
91 | * @param {String} name |
92 | * @return {Object} |
93 | */ |
94 | get (name) { |
95 | return this.ports[name] |
96 | } |
97 | |
98 | /** |
99 | * deletes a port given its name |
100 | * @param {String} name |
101 | */ |
102 | delete (name) { |
103 | const port = this.ports[name] |
104 | delete this.ports[name] |
105 | this._portMap.delete(port) |
106 | } |
107 | |
108 | /** |
109 | * check if a port object is still valid |
110 | * @param {Object} port |
111 | * @return {Boolean} |
112 | */ |
113 | isValidPort (port) { |
114 | return this._portMap.has(port) |
115 | } |
116 | |
117 | /** |
118 | * creates a new Port given the container type |
119 | * @param {String} type |
120 | * @returns {Object} the newly created port |
121 | */ |
122 | create (type) { |
123 | const Container = this.hypervisor._containerTypes[type] |
124 | const parentId = this.entryPort ? this.entryPort.id : null |
125 | let nonce = this.state['/'].nonce |
126 | |
127 | const portRef = { |
128 | 'messages': [], |
129 | 'id': { |
130 | '/': { |
131 | nonce: nonce, |
132 | parent: parentId |
133 | } |
134 | }, |
135 | 'type': type, |
136 | 'link': { |
137 | '/': Container.createState() |
138 | } |
139 | } |
140 | |
141 | // incerment the nonce |
142 | nonce = new BN(nonce) |
143 | nonce.iaddn(1) |
144 | this.state['/'].nonce = nonce.toArray() |
145 | return portRef |
146 | } |
147 | |
148 | /** |
149 | * waits till all ports have reached a threshold tick count |
150 | * @param {Integer} threshold - the number of ticks to wait |
151 | * @param {Object} fromPort - the port requesting the wait |
152 | * @param {Array} ports - the ports to wait on |
153 | * @returns {Promise} |
154 | */ |
155 | wait (threshold, fromPort = this.entryPort, ports = [...this._portMap]) { |
156 | // find the ports that have a smaller tick count then the threshold tick count |
157 | const unkownPorts = ports.filter(([portRef, port]) => { |
158 | return port.ticks < threshold && fromPort !== portRef |
159 | }) |
160 | |
161 | const promises = unkownPorts.map(async([portRef, port]) => { |
162 | // update the port's tick count |
163 | port.ticks = await this.hypervisor.wait(portRef, threshold, this.entryPort) |
164 | }) |
165 | |
166 | return Promise.all(promises) |
167 | } |
168 | |
169 | /** |
170 | * gets the next canonical message given the an array of ports to choose from |
171 | * @param {Array} ports |
172 | * @returns {Promise} |
173 | */ |
174 | async getNextMessage (ports = [...this._portMap]) { |
175 | if (this._portMap.size) { |
176 | // find the oldest message |
177 | const ticks = ports.map(([name, port]) => { |
178 | return port.size ? port.ticks : this.exoInterface.ticks |
179 | }).reduce((ticksA, ticksB) => { |
180 | return ticksA < ticksB ? ticksA : ticksB |
181 | }) |
182 | |
183 | await this.wait(ticks) |
184 | const portMap = ports.reduce(messageArbiter) |
185 | return portMap[1].dequeue() |
186 | } |
187 | } |
188 | } |
189 |
Built with git-ssb-web