git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 42bb5a2f36de56726c4bd6ede09c95352d5b545d

Files: 42bb5a2f36de56726c4bd6ede09c95352d5b545d / portManager.js

4730 bytesRaw
1const Port = require('./port.js')
2const BN = require('bn.js')
3const ENTRY = Symbol('entry')
4
5// decides which message to go first
6function messageArbiter (pairA, pairB) {
7 const portA = pairA[1]
8 const portB = pairB[1]
9 const a = portA.peek()
10 const b = portB.peek()
11
12 if (!a) {
13 return pairB
14 } else if (!b) {
15 return pairA
16 }
17
18 // order by number of ticks if messages have different number of ticks
19 if (portA.ticks !== portB.ticks) {
20 return portA.ticks < portB.ticks ? pairA : pairB
21 } else if (a.priority !== b.priority) {
22 // decide by priority
23 return a.priority > b.priority ? pairA : pairB
24 } else {
25 // insertion order
26 return pairA
27 }
28}
29
30module.exports = class PortManager {
31 /**
32 * The port manager manages the the ports. This inculdes creation, deletion
33 * fetching and waiting on ports
34 * @param {Object} opts
35 * @param {Object} opts.state
36 * @param {Object} opts.entryPort
37 * @param {Object} opts.parentPort
38 * @param {Object} opts.hypervisor
39 * @param {Object} opts.exoInterface
40 */
41 constructor (opts) {
42 Object.assign(this, opts)
43 this._portMap = new Map()
44 }
45
46 /**
47 * starts the port manager. This fetchs the ports from the state and maps
48 * them to thier names
49 * @returns {Promise}
50 */
51 async start () {
52 // skip the root, since it doesn't have a parent
53 if (this.parentPort !== undefined) {
54 this._bindRef(this.parentPort, ENTRY)
55 }
56
57 // map ports to thier id's
58 this.ports = await this.hypervisor.graph.get(this.state, 'ports')
59 Object.keys(this.ports).map(name => {
60 const port = this.ports[name]
61 this._bindRef(port, name)
62 })
63 }
64
65 _bindRef (portRef, name) {
66 const port = new Port()
67 this._portMap.set(portRef, port)
68 }
69
70 /**
71 * binds a port to a name
72 * @param {Object} port - the port to bind
73 * @param {String} name - the name of the port
74 */
75 bind (port, name) {
76 // save the port instance
77 this.ports[name] = port
78 this._bindRef(port, name)
79 }
80
81 /**
82 * queues a message on a port
83 * @param {Message} message
84 */
85 queue (message) {
86 this._portMap.get(message.fromPort).queue(message)
87 }
88
89 /**
90 * gets a port given it's name
91 * @param {String} name
92 * @return {Object}
93 */
94 get (name) {
95 return this.ports[name]
96 }
97
98 /**
99 * deletes a port given its name
100 * @param {String} name
101 */
102 delete (name) {
103 const port = this.ports[name]
104 delete this.ports[name]
105 this._portMap.delete(port)
106 }
107
108 /**
109 * check if a port object is still valid
110 * @param {Object} port
111 * @return {Boolean}
112 */
113 isValidPort (port) {
114 return this._portMap.has(port)
115 }
116
117 /**
118 * creates a new Port given the container type
119 * @param {String} type
120 * @returns {Object} the newly created port
121 */
122 create (type) {
123 const Container = this.hypervisor._containerTypes[type]
124 const parentId = this.entryPort ? this.entryPort.id : null
125 let nonce = this.state['/'].nonce
126
127 const portRef = {
128 'messages': [],
129 'id': {
130 '/': {
131 nonce: nonce,
132 parent: parentId
133 }
134 },
135 'type': type,
136 'link': {
137 '/': Container.createState()
138 }
139 }
140
141 // incerment the nonce
142 nonce = new BN(nonce)
143 nonce.iaddn(1)
144 this.state['/'].nonce = nonce.toArray()
145 return portRef
146 }
147
148 /**
149 * waits till all ports have reached a threshold tick count
150 * @param {Integer} threshold - the number of ticks to wait
151 * @param {Object} fromPort - the port requesting the wait
152 * @param {Array} ports - the ports to wait on
153 * @returns {Promise}
154 */
155 wait (threshold, fromPort = this.entryPort, ports = [...this._portMap]) {
156 // find the ports that have a smaller tick count then the threshold tick count
157 const unkownPorts = ports.filter(([portRef, port]) => {
158 return port.ticks < threshold && fromPort !== portRef
159 })
160
161 const promises = unkownPorts.map(async([portRef, port]) => {
162 // update the port's tick count
163 port.ticks = await this.hypervisor.wait(portRef, threshold, this.entryPort)
164 })
165
166 return Promise.all(promises)
167 }
168
169 /**
170 * gets the next canonical message given the an array of ports to choose from
171 * @param {Array} ports
172 * @returns {Promise}
173 */
174 async getNextMessage (ports = [...this._portMap]) {
175 if (this._portMap.size) {
176 // find the oldest message
177 const ticks = ports.map(([name, port]) => {
178 return port.size ? port.ticks : this.exoInterface.ticks
179 }).reduce((ticksA, ticksB) => {
180 return ticksA < ticksB ? ticksA : ticksB
181 })
182
183 await this.wait(ticks)
184 const portMap = ports.reduce(messageArbiter)
185 return portMap[1].dequeue()
186 }
187 }
188}
189

Built with git-ssb-web