Files: 3d425eb5e60c74393a2ae33abb20a5df794791b3 / portManager.js
1984 bytesRaw
1 | const EventEmitter = require('events') |
2 | const path = require('path') |
3 | const AtomicMessage = require('primea-message/atomic') |
4 | const Port = require('./port.js') |
5 | const common = require('./common.js') |
6 | |
7 | module.exports = class PortManager extends EventEmitter { |
8 | constructor (opts) { |
9 | super() |
10 | Object.assign(this, opts) |
11 | this._queue = [] |
12 | // set up the parent port |
13 | const parentPort = new Port(common.PARENT) |
14 | parentPort.on('message', message => { |
15 | this._recieveMessage(message) |
16 | }) |
17 | |
18 | // create the cache |
19 | this.cache = new Map() |
20 | this.cache.set(common.PARENT, parentPort) |
21 | } |
22 | |
23 | _recieveMessage (message) { |
24 | const index = this._queue.push(message) - 1 |
25 | this.emit('message', index) |
26 | } |
27 | |
28 | async get (name) { |
29 | let port = this.cache.get(name) |
30 | if (!port) { |
31 | port = new Port(name) |
32 | port.on('message', message => { |
33 | this._recieveMessage(message) |
34 | }) |
35 | // create destination kernel |
36 | const state = await this.graph.get(this.state, name) |
37 | |
38 | const destKernel = new this.Kernel({ |
39 | state: state, |
40 | graph: this.graph, |
41 | parentPort: port, |
42 | imports: this.imports, |
43 | path: path.join(this.path, name) |
44 | }) |
45 | |
46 | // shutdown the kernel when it is done doing it work |
47 | destKernel.on('idle', () => { |
48 | destKernel.shutdown() |
49 | this.cache.delete(name) |
50 | }) |
51 | |
52 | // find and connect the destination port |
53 | const destPort = await destKernel.ports.get(common.PARENT) |
54 | port.connect(destPort) |
55 | this.cache.set(name, port) |
56 | } |
57 | return port |
58 | } |
59 | |
60 | async peek (index = 0) { |
61 | return this._queue[index] |
62 | } |
63 | |
64 | remove (index) { |
65 | return this._queue.splice(index, index + 1) |
66 | } |
67 | |
68 | async send (portName, message) { |
69 | const port = await this.get(portName) |
70 | port.send(message) |
71 | return AtomicMessage.isAtomic(message) ? message.result() : {} |
72 | } |
73 | |
74 | close () { |
75 | for (let port in this.cache) { |
76 | port.emit('close') |
77 | } |
78 | this.cache.clear() |
79 | } |
80 | } |
81 |
Built with git-ssb-web