git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 001f4d679fe41ebc11e4646799dba30ccf253b73

Files: 001f4d679fe41ebc11e4646799dba30ccf253b73 / portManager.js

5424 bytesRaw
1const Port = require('./port.js')
2const BN = require('bn.js')
3const ENTRY = Symbol('entry')
4
5// decides which message to go first
6function messageArbiter (pairA, pairB) {
7 const portA = pairA[1]
8 const portB = pairB[1]
9 const a = portA.peek()
10 const b = portB.peek()
11
12 if (!a) {
13 return pairB
14 } else if (!b) {
15 return pairA
16 }
17
18 // order by number of ticks if messages have different number of ticks
19 if (portA.ticks !== portB.ticks) {
20 return portA.ticks < portB.ticks ? pairA : pairB
21 } else if (a.priority !== b.priority) {
22 // decide by priority
23 return a.priority > b.priority ? pairA : pairB
24 } else {
25 // insertion order
26 return pairA
27 }
28}
29
30module.exports = class PortManager {
31 /**
32 * The port manager manages the the ports. This inculdes creation, deletion
33 * fetching and waiting on ports
34 * @param {Object} opts
35 * @param {Object} opts.state
36 * @param {Object} opts.entryPort
37 * @param {Object} opts.parentPort
38 * @param {Object} opts.hypervisor
39 * @param {Object} opts.exoInterface
40 */
41 constructor (opts) {
42 Object.assign(this, opts)
43 this._portMap = new Map()
44 this._unboundPort = new WeakSet()
45 }
46
47 /**
48 * starts the port manager. This fetchs the ports from the state and maps
49 * them to thier names
50 * @returns {Promise}
51 */
52 async start () {
53 // skip the root, since it doesn't have a parent
54 if (this.parentPort !== undefined) {
55 this._bindHandle(this.parentPort, ENTRY)
56 }
57
58 // map ports to thier id's
59 this.ports = await this.hypervisor.graph.get(this.state, 'ports')
60 Object.keys(this.ports).map(name => {
61 const port = this.ports[name]
62 this._bindHandle(port, name)
63 })
64 }
65
66 _bindHandle (portHandle, name) {
67 const port = new Port(name)
68 this._portMap.set(portHandle, port)
69 }
70
71 /**
72 * binds a port to a name
73 * @param {Object} port - the port to bind
74 * @param {String} name - the name of the port
75 */
76 bind (port, name) {
77 if (this.isBound(port)) {
78 throw new Error('cannot bind a port that is already bound')
79 } else if (this.ports[name]) {
80 throw new Error('cannot bind port to a name that is alread bound')
81 }
82 // save the port instance
83 this.ports[name] = port
84 this._bindHandle(port, name)
85 }
86
87 /**
88 * unbinds a port given its name
89 * @param {String} name
90 * @returns {boolean} whether or not the port was deleted
91 */
92 unbind (name) {
93 const port = this.ports[name]
94 delete this.ports[name]
95 this._portMap.delete(port)
96 return port
97 }
98
99 /**
100 * get the port name given its referance
101 * @return {string}
102 */
103 getBoundName (portRef) {
104 return this._portMap.get(portRef).name
105 }
106
107 /**
108 * check if a port object is still valid
109 * @param {Object} port
110 * @return {Boolean}
111 */
112 isBound (port) {
113 return this._portMap.has(port)
114 }
115
116 /**
117 * queues a message on a port
118 * @param {Message} message
119 */
120 queue (message) {
121 this._portMap.get(message.fromPort).queue(message)
122 }
123
124 /**
125 * gets a port given it's name
126 * @param {String} name
127 * @return {Object}
128 */
129 get (name) {
130 return this.ports[name]
131 }
132
133 _createPortObject (type, link) {
134 const parentId = this.entryPort ? this.entryPort.id : null
135 let nonce = this.state['/'].nonce
136
137 const portRef = {
138 'messages': [],
139 'id': {
140 '/': {
141 nonce: nonce,
142 parent: parentId
143 }
144 },
145 'type': type,
146 'link': link
147 }
148
149 // incerment the nonce
150 nonce = new BN(nonce)
151 nonce.iaddn(1)
152 this.state['/'].nonce = nonce.toArray()
153 this._unboundPort.add(portRef)
154 return portRef
155 }
156
157 /**
158 * creates a new Port given the container type
159 * @param {String} type
160 * @param {*} data - the data to populate the initail state with
161 * @returns {Object} the newly created port
162 */
163 create (type, data) {
164 const container = this.hypervisor._containerTypes[type]
165 return this._createPortObject(type, {
166 '/': container.Constructor.createState(data)
167 })
168 }
169
170 /**
171 * waits till all ports have reached a threshold tick count
172 * @param {Integer} threshold - the number of ticks to wait
173 * @param {Object} fromPort - the port requesting the wait
174 * @param {Array} ports - the ports to wait on
175 * @returns {Promise}
176 */
177 wait (threshold, fromPort = this.entryPort, ports = [...this._portMap]) {
178 // find the ports that have a smaller tick count then the threshold tick count
179 const unkownPorts = ports.filter(([portRef, port]) => {
180 return port.ticks < threshold && fromPort !== portRef
181 })
182
183 const promises = unkownPorts.map(async([portRef, port]) => {
184 // update the port's tick count
185 port.ticks = await this.hypervisor.wait(portRef, threshold, this.entryPort)
186 })
187
188 return Promise.all(promises)
189 }
190
191 /**
192 * gets the next canonical message given the an array of ports to choose from
193 * @param {Array} ports
194 * @returns {Promise}
195 */
196 async getNextMessage (ports = [...this._portMap]) {
197 if (ports.length) {
198 // find the oldest message
199 const ticks = ports.map(([name, port]) => {
200 return port.size ? port.ticks : this.exoInterface.ticks
201 }).reduce((ticksA, ticksB) => {
202 return ticksA < ticksB ? ticksA : ticksB
203 })
204
205 await this.wait(ticks)
206 const portMap = ports.reduce(messageArbiter)
207 return portMap[1].dequeue()
208 }
209 }
210}
211

Built with git-ssb-web