Files: 1171580743394a34306e2a12800ea5606f7e5d67 / portManager.js
2580 bytesRaw
1 | const Port = require('./port.js') |
2 | const common = require('./common.js') |
3 | |
4 | module.exports = class PortManager { |
5 | constructor (ports, kernel) { |
6 | this.kernel = kernel |
7 | this.ports = ports |
8 | this._portMap = new Map() |
9 | this._hasMappedPorts = false |
10 | this._tempQueue = [] |
11 | this._mapPorts(this.ports).then(ports => { |
12 | this._portMap = ports |
13 | this.queue = this._queue |
14 | for (const message of this._tempQueue) { |
15 | this.queue(message) |
16 | } |
17 | }) |
18 | } |
19 | |
20 | // temporaly queue message untill the ports have been mapped. Mapping the |
21 | // ports is async since the ports could just be merkle links |
22 | queue (message) { |
23 | this._tempQueue.push(message) |
24 | } |
25 | |
26 | _queue (message) { |
27 | this._portMap.get(message.from).push(message) |
28 | } |
29 | |
30 | async _mapPorts (ports) { |
31 | ports = Object.key(ports).map(name => { |
32 | const port = ports[name] |
33 | this.kernel.id(port).then(id => { |
34 | return [id, new Port(name)] |
35 | }) |
36 | }) |
37 | ports = await Promise.all(ports) |
38 | return new Map(ports) |
39 | } |
40 | |
41 | create (name, value) { |
42 | this.ports[name] = value |
43 | } |
44 | |
45 | del (name) { |
46 | delete this.ports[name] |
47 | } |
48 | |
49 | move (from, to) { |
50 | this.ports[to] = this.ports[from] |
51 | delete this.ports[from] |
52 | } |
53 | |
54 | async get (name) { |
55 | const port = await name === common.PARENT ? this.graph.get(this.state.ports, name) : this.parentId |
56 | const id = await this.kernel.id(port) |
57 | return this._portMap.get(id) |
58 | } |
59 | |
60 | // waits till all ports have reached a threshold tick count |
61 | async wait (threshold) { |
62 | // find the ports that have a smaller tick count then the threshold tick count |
63 | const unkownPorts = [...this._ports].filter((id, port) => { |
64 | const message = port.peek() |
65 | return !message || message.ticks < threshold |
66 | }) |
67 | |
68 | const promises = unkownPorts.map(port => { |
69 | this.hypervisor.waitOnVM(port, threshold).then(ticks => { |
70 | // update the port's tick count |
71 | port.ticks = ticks |
72 | }) |
73 | }) |
74 | await Promise.all(promises) |
75 | } |
76 | |
77 | async getNextMessage (ticks) { |
78 | await this.wait(ticks) |
79 | return [...this._ports].reduce(messageArbiter).shift() |
80 | } |
81 | } |
82 | |
83 | // decides which message to go firts |
84 | function messageArbiter (portA, portB) { |
85 | const a = portA.peek() |
86 | const b = portB.peek() |
87 | |
88 | if (!a) { |
89 | return b |
90 | } else if (!b) { |
91 | return a |
92 | } |
93 | |
94 | const aGasPrice = a.resources.gasPrice |
95 | const bGasPrice = b.resources.gasPrice |
96 | if (a.ticks !== b.ticks) { |
97 | return a.ticks < b.ticks ? a : b |
98 | } else if (aGasPrice === bGasPrice) { |
99 | return a.hash() > b.hash() ? a : b |
100 | } else { |
101 | return aGasPrice > bGasPrice ? a : b |
102 | } |
103 | } |
104 |
Built with git-ssb-web