Files: 535100729b49473f07a4c560fa6b81aa2b369544 / portManager.js
2798 bytesRaw
1 | const Port = require('./port.js') |
2 | const PARENT = Symbol('parent') |
3 | |
4 | // decides which message to go firts |
5 | function messageArbiter (portA, portB) { |
6 | portA = portA[1] |
7 | portB = portB[1] |
8 | const a = portA.peek() |
9 | const b = portB.peek() |
10 | |
11 | if (!a) { |
12 | return b |
13 | } else if (!b) { |
14 | return a |
15 | } |
16 | |
17 | const aGasPrice = a.resources.gasPrice |
18 | const bGasPrice = b.resources.gasPrice |
19 | if (a.ticks !== b.ticks) { |
20 | return a.ticks < b.ticks ? a : b |
21 | } else if (aGasPrice === bGasPrice) { |
22 | return a.hash() > b.hash() ? a : b |
23 | } else { |
24 | return aGasPrice > bGasPrice ? a : b |
25 | } |
26 | } |
27 | |
28 | module.exports = class PortManager { |
29 | constructor (kernel) { |
30 | this.kernel = kernel |
31 | this.hypervisor = kernel._opts.hypervisor |
32 | this.ports = kernel.state.ports |
33 | this.parentPort = kernel._opts.parentPort |
34 | this._portMap = new Map() |
35 | } |
36 | |
37 | async start () { |
38 | // map ports to thier id's |
39 | let ports = Object.keys(this.ports).map(name => { |
40 | const port = this.ports[name] |
41 | this._mapPort(name, port) |
42 | }) |
43 | |
44 | // create the parent port |
45 | await Promise.all(ports) |
46 | const parentID = await this.hypervisor.generateID(this.kernel._opts.parentPort) |
47 | this._portMap.set(parentID, new Port(PARENT)) |
48 | } |
49 | |
50 | async _mapPort (name, port) { |
51 | const id = await this.hypervisor.generateID(port) |
52 | port = new Port(name) |
53 | this._portMap.set(id, port) |
54 | } |
55 | |
56 | queue (message) { |
57 | this._portMap.get(message.fromPort).queue(message) |
58 | } |
59 | |
60 | set (name, port) { |
61 | this.ports[name] = port |
62 | return this._mapPort(name, port) |
63 | } |
64 | |
65 | del (name) { |
66 | delete this.ports[name] |
67 | } |
68 | |
69 | move (from, to) { |
70 | this.ports[to] = this.ports[from] |
71 | delete this.ports[from] |
72 | } |
73 | |
74 | async get (port) { |
75 | const id = await this.hypervisor.generateID(port) |
76 | return this._portMap.get(id) |
77 | } |
78 | |
79 | async getParent () { |
80 | const id = await this.hypervisor.generateID(this.kernel._opt.parentPort) |
81 | return this._portMap.get(id) |
82 | } |
83 | |
84 | // waits till all ports have reached a threshold tick count |
85 | async wait (threshold) { |
86 | // find the ports that have a smaller tick count then the threshold tick count |
87 | const unkownPorts = [...this._portMap].filter(([id, port]) => { |
88 | return (port.hasSent || port.name === PARENT) && port.ticks < threshold |
89 | }) |
90 | |
91 | const promises = unkownPorts.map(([id, port]) => { |
92 | if (port.name === PARENT) { |
93 | port = this.parentPort |
94 | } else { |
95 | port = this.ports[port.name] |
96 | } |
97 | this.hypervisor.wait(port, threshold).then(ticks => { |
98 | // update the port's tick count |
99 | port.ticks = ticks |
100 | }) |
101 | }) |
102 | return await Promise.all(promises) |
103 | } |
104 | |
105 | async getNextMessage (ticks) { |
106 | await this.wait(ticks) |
107 | const portMap = [...this._portMap].reduce(messageArbiter) |
108 | if (portMap) { |
109 | return portMap[1].shift() |
110 | } |
111 | } |
112 | } |
113 |
Built with git-ssb-web