Files: b9ca792985eeb45f5be1b56ca7b1f361c6e5585c / exoInterface.js
4762 bytesRaw
1 | const EventEmitter = require('events') |
2 | const PortManager = require('./portManager.js') |
3 | const Message = require('primea-message') |
4 | |
5 | module.exports = class ExoInterface extends EventEmitter { |
6 | /** |
7 | * the ExoInterface manages the varous message passing functions and provides |
8 | * an interface for the containers to use |
9 | * @param {Object} opts |
10 | * @param {Object} opts.state |
11 | * @param {Object} opts.entryPort |
12 | * @param {Object} opts.parentPort |
13 | * @param {Object} opts.hypervisor |
14 | * @param {Object} opts.Container |
15 | */ |
16 | constructor (opts) { |
17 | super() |
18 | this.state = opts.state |
19 | this.entryPort = opts.entryPort |
20 | this.hypervisor = opts.hypervisor |
21 | |
22 | this.containerState = 'idle' |
23 | |
24 | // the total number of ticks that the container has ran |
25 | this.ticks = 0 |
26 | |
27 | // create the port manager |
28 | this.ports = new PortManager(Object.assign({ |
29 | exoInterface: this |
30 | }, opts)) |
31 | |
32 | this._waitingMap = new Map() |
33 | this.container = new opts.container.Constructor(this, opts.container.args) |
34 | |
35 | // once we get an result we run the next message |
36 | this.on('result', this._runNextMessage) |
37 | |
38 | // on idle clear all the 'wiats' |
39 | this.on('idle', () => { |
40 | for (const [, waiter] of this._waitingMap) { |
41 | waiter.resolve(this.ticks) |
42 | } |
43 | }) |
44 | } |
45 | |
46 | /** |
47 | * starts the container |
48 | * @returns {Promise} |
49 | */ |
50 | start () { |
51 | return this.ports.start() |
52 | } |
53 | |
54 | /** |
55 | * adds a message to this containers message queue |
56 | * @param {Message} message |
57 | */ |
58 | queue (message) { |
59 | message._hops++ |
60 | this.ports.queue(message) |
61 | if (this.containerState !== 'running') { |
62 | this._updateContainerState('running') |
63 | this._runNextMessage() |
64 | } |
65 | } |
66 | |
67 | _updateContainerState (containerState, message) { |
68 | this.containerState = containerState |
69 | this.emit(containerState, message) |
70 | } |
71 | |
72 | async _runNextMessage () { |
73 | const message = await this.ports.getNextMessage() |
74 | if (message) { |
75 | // run the next message |
76 | this.run(message) |
77 | } else { |
78 | // if no more messages then shut down |
79 | this._updateContainerState('idle') |
80 | } |
81 | } |
82 | |
83 | /** |
84 | * run the kernels code with a given enviroment |
85 | * The Kernel Stores all of its state in the Environment. The Interface is used |
86 | * to by the VM to retrive infromation from the Environment. |
87 | * @returns {Promise} |
88 | */ |
89 | async run (message) { |
90 | let result |
91 | try { |
92 | result = await this.container.run(message) || {} |
93 | } catch (e) { |
94 | result = { |
95 | exception: true, |
96 | exceptionError: e |
97 | } |
98 | } |
99 | |
100 | this.emit('result', result) |
101 | return result |
102 | } |
103 | |
104 | /** |
105 | * returns a promise that resolves once the kernel hits the threshould tick count |
106 | * @param {Number} threshould - the number of ticks to wait |
107 | * @returns {Promise} |
108 | */ |
109 | wait (threshold, fromPort) { |
110 | if (threshold <= this.ticks) { |
111 | return this.ticks |
112 | } else if (this.containerState === 'idle') { |
113 | return this.ports.wait(threshold, fromPort) |
114 | } else { |
115 | return new Promise((resolve, reject) => { |
116 | this._waitingMap.set(fromPort, { |
117 | threshold: threshold, |
118 | resolve: resolve, |
119 | from: fromPort |
120 | }) |
121 | }) |
122 | } |
123 | } |
124 | |
125 | /** |
126 | * updates the number of ticks that the container has run |
127 | * @param {Number} count - the number of ticks to add |
128 | */ |
129 | incrementTicks (count) { |
130 | this.ticks += count |
131 | for (const [fromPort, waiter] of this._waitingMap) { |
132 | if (waiter.threshold < this.ticks) { |
133 | this._waitingMap.delete(fromPort) |
134 | waiter.resolve(this.ticks) |
135 | } |
136 | } |
137 | } |
138 | |
139 | /** |
140 | * creates a new message |
141 | * @param {*} data |
142 | */ |
143 | createMessage (opts) { |
144 | const message = new Message(opts) |
145 | for (const port of message.ports) { |
146 | if (this.ports.isBound(port)) { |
147 | throw new Error('message must not contain bound ports') |
148 | } |
149 | } |
150 | return message |
151 | } |
152 | |
153 | /** |
154 | * sends a message to a given port |
155 | * @param {Object} portRef - the port |
156 | * @param {Message} message - the message |
157 | */ |
158 | async send (portRef, message) { |
159 | if (!this.ports.isBound(portRef)) { |
160 | throw new Error('cannot send message with an unbound port') |
161 | } |
162 | |
163 | // set the port that the message came from |
164 | message._fromPort = this.entryPort |
165 | message._fromPortTicks = this.ticks |
166 | |
167 | const container = await this.getInstance(portRef) |
168 | container.queue(message) |
169 | |
170 | const waiter = this._waitingMap.get(portRef) |
171 | // if the was a wait on this port the resolve it |
172 | if (waiter) { |
173 | waiter.resolve(this.ticks) |
174 | this._waitingMap.delete(portRef) |
175 | } |
176 | } |
177 | |
178 | /** |
179 | * gets a container instance given a port |
180 | * @param {Object} portRef - the port |
181 | * @returns {Object} |
182 | */ |
183 | getInstance (portRef) { |
184 | return this.hypervisor.getInstanceByPort(portRef, this.entryPort) |
185 | } |
186 | } |
187 |
Built with git-ssb-web