git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: d9b6113bb6b82443c2fb8a9a9b1afa58b1206f67

Files: d9b6113bb6b82443c2fb8a9a9b1afa58b1206f67 / portManager.js

2240 bytesRaw
1const Port = require('./port.js')
2const ENTRY = Symbol('entry')
3
4// decides which message to go first
5function messageArbiter (pairA, pairB) {
6 const portA = pairA[1]
7 const portB = pairB[1]
8 const a = portA.peek()
9 const b = portB.peek()
10
11 if (!a) {
12 return pairB
13 } else if (!b) {
14 return pairA
15 }
16
17 if (a._fromPortTicks !== b._fromPortTicks) {
18 return a._fromPortTicks < b._fromPortTicks ? pairA : pairB
19 } else if (a.priority !== b.priority) {
20 // decide by priority
21 return a.priority > b.priority ? pairA : pairB
22 } else if (portA.name === ENTRY) {
23 return pairA
24 } else {
25 return portA.name < portB.name ? pairA : pairB
26 }
27}
28
29module.exports = class PortManager {
30 constructor (opts) {
31 Object.assign(this, opts)
32 this._portMap = new Map()
33 }
34
35 async start () {
36 // skip the root, since it doesn't have a parent
37 if (this.parentPort !== undefined) {
38 this._portMap.set(this.parentPort, new Port(ENTRY))
39 }
40 // map ports to thier id's
41 this.ports = await this.hypervisor.graph.get(this.state, 'ports')
42 Object.keys(this.ports).map(name => {
43 const port = this.ports[name]
44 this._mapPort(name, port)
45 })
46 }
47
48 _mapPort (name, portRef) {
49 const port = new Port(name)
50 this._portMap.set(portRef, port)
51 }
52
53 queue (message) {
54 this._portMap.get(message.fromPort).queue(message)
55 }
56
57 set (name, port) {
58 this.ports[name] = port
59 return this._mapPort(name, port)
60 }
61
62 get (key) {
63 return this.ports[key]
64 }
65
66 // waits till all ports have reached a threshold tick count
67 wait (threshold, fromPort) {
68 // find the ports that have a smaller tick count then the threshold tick count
69 const unkownPorts = [...this._portMap].filter(([portRef, port]) => {
70 return port.ticks < threshold && fromPort !== portRef
71 })
72
73 const promises = unkownPorts.map(async ([portRef, port]) => {
74 // update the port's tick count
75 port.ticks = await this.hypervisor.wait(portRef, threshold, this.entryPort)
76 })
77 return Promise.all(promises)
78 }
79
80 async getNextMessage () {
81 await this.wait(this.kernel.ticks, this.entryPort)
82 const portMap = [...this._portMap].reduce(messageArbiter)
83 return portMap[1].shift()
84 }
85}
86

Built with git-ssb-web