git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 218dd59e9632521763daa53b40550901c8c2dcf0

Files: 218dd59e9632521763daa53b40550901c8c2dcf0 / portManager.js

5491 bytesRaw
1const Port = require('./port.js')
2const BN = require('bn.js')
3const ENTRY = Symbol('entry')
4
5// decides which message to go first
6function messageArbiter (pairA, pairB) {
7 const portA = pairA[1]
8 const portB = pairB[1]
9 const a = portA.peek()
10 const b = portB.peek()
11
12 if (!a) {
13 return pairB
14 } else if (!b) {
15 return pairA
16 }
17
18 // order by number of ticks if messages have different number of ticks
19 if (portA.ticks !== portB.ticks) {
20 return portA.ticks < portB.ticks ? pairA : pairB
21 } else if (a.priority !== b.priority) {
22 // decide by priority
23 return a.priority > b.priority ? pairA : pairB
24 } else {
25 // insertion order
26 return pairA
27 }
28}
29
30module.exports = class PortManager {
31 /**
32 * The port manager manages the the ports. This inculdes creation, deletion
33 * fetching and waiting on ports
34 * @param {Object} opts
35 * @param {Object} opts.state
36 * @param {Object} opts.entryPort
37 * @param {Object} opts.parentPort
38 * @param {Object} opts.hypervisor
39 * @param {Object} opts.exoInterface
40 */
41 constructor (opts) {
42 Object.assign(this, opts)
43 this._portMap = new Map()
44 this._unboundPort = new WeakSet()
45 }
46
47 /**
48 * starts the port manager. This fetchs the ports from the state and maps
49 * them to thier names
50 * @returns {Promise}
51 */
52 async start () {
53 // skip the root, since it doesn't have a parent
54 if (this.parentPort !== undefined) {
55 this._bindRef(this.parentPort, ENTRY)
56 }
57
58 // map ports to thier id's
59 this.ports = await this.hypervisor.graph.get(this.state, 'ports')
60 Object.keys(this.ports).map(name => {
61 const port = this.ports[name]
62 this._bindRef(port, name)
63 })
64 }
65
66 _bindRef (portRef, name) {
67 const port = new Port(name)
68 this._portMap.set(portRef, port)
69 }
70
71 /**
72 * binds a port to a name
73 * @param {Object} port - the port to bind
74 * @param {String} name - the name of the port
75 */
76 bind (port, name) {
77 if (this.isBound(port)) {
78 throw new Error('cannot bind a port that is already bound')
79 }
80 // save the port instance
81 this.ports[name] = port
82 this._bindRef(port, name)
83 }
84
85 /**
86 * unbinds a port given its name
87 * @param {String} name
88 * @returns {boolean} whether or not the port was deleted
89 */
90 unbind (name) {
91 const port = this.ports[name]
92 delete this.ports[name]
93 this._portMap.delete(port)
94 return port
95 }
96
97 /**
98 * get the port name given its referance
99 * @return {string}
100 */
101 getBoundName (portRef) {
102 return this._portMap.get(portRef).name
103 }
104
105 /**
106 * check if a port object is still valid
107 * @param {Object} port
108 * @return {Boolean}
109 */
110 isBound (port) {
111 return this._portMap.has(port)
112 }
113
114 /**
115 * queues a message on a port
116 * @param {Message} message
117 */
118 queue (message) {
119 this._portMap.get(message.fromPort).queue(message)
120 }
121
122 /**
123 * gets a port given it's name
124 * @param {String} name
125 * @return {Object}
126 */
127 get (name) {
128 return this.ports[name]
129 }
130
131 _createPortObject (type, link) {
132 const parentId = this.entryPort ? this.entryPort.id : null
133 let nonce = this.state['/'].nonce
134
135 const portRef = {
136 'messages': [],
137 'id': {
138 '/': {
139 nonce: nonce,
140 parent: parentId
141 }
142 },
143 'type': type,
144 'link': link
145 }
146
147 // incerment the nonce
148 nonce = new BN(nonce)
149 nonce.iaddn(1)
150 this.state['/'].nonce = nonce.toArray()
151 this._unboundPort.add(portRef)
152 return portRef
153 }
154
155 /**
156 * creates a new Port given the container type
157 * @param {String} type
158 * @param {*} data - the data to populate the initail state with
159 * @returns {Object} the newly created port
160 */
161 create (type, data) {
162 const Container = this.hypervisor._containerTypes[type]
163 return this._createPortObject(type, {
164 '/': Container.createState(data)
165 })
166 }
167
168 /**
169 * creates a copy of a port given a port referance
170 * @param {Object} port - the port to copy
171 */
172 copy (port, type = port.type) {
173 return this._createPortObject(port.type, port.link)
174 }
175
176 /**
177 * waits till all ports have reached a threshold tick count
178 * @param {Integer} threshold - the number of ticks to wait
179 * @param {Object} fromPort - the port requesting the wait
180 * @param {Array} ports - the ports to wait on
181 * @returns {Promise}
182 */
183 wait (threshold, fromPort = this.entryPort, ports = [...this._portMap]) {
184 // find the ports that have a smaller tick count then the threshold tick count
185 const unkownPorts = ports.filter(([portRef, port]) => {
186 return port.ticks < threshold && fromPort !== portRef
187 })
188
189 const promises = unkownPorts.map(async([portRef, port]) => {
190 // update the port's tick count
191 port.ticks = await this.hypervisor.wait(portRef, threshold, this.entryPort)
192 })
193
194 return Promise.all(promises)
195 }
196
197 /**
198 * gets the next canonical message given the an array of ports to choose from
199 * @param {Array} ports
200 * @returns {Promise}
201 */
202 async getNextMessage (ports = [...this._portMap]) {
203 if (ports.length) {
204 // find the oldest message
205 const ticks = ports.map(([name, port]) => {
206 return port.size ? port.ticks : this.exoInterface.ticks
207 }).reduce((ticksA, ticksB) => {
208 return ticksA < ticksB ? ticksA : ticksB
209 })
210
211 await this.wait(ticks)
212 const portMap = ports.reduce(messageArbiter)
213 return portMap[1].dequeue()
214 }
215 }
216}
217

Built with git-ssb-web