Files: 83e76dba0665fdb89dc5636cfb77ae5150858b20 / exoInterface.js
4532 bytesRaw
1 | const clearObject = require('object-clear') |
2 | const clone = require('clone') |
3 | const EventEmitter = require('events') |
4 | const PortManager = require('./portManager.js') |
5 | |
6 | module.exports = class ExoInterface extends EventEmitter { |
7 | /** |
8 | * the ExoInterface manages the varous message passing functions and provides |
9 | * an interface for the containers to use |
10 | * @param {Object} opts |
11 | * @param {Object} opts.state |
12 | * @param {Object} opts.entryPort |
13 | * @param {Object} opts.parentPort |
14 | * @param {Object} opts.hypervisor |
15 | * @param {Object} opts.Container |
16 | */ |
17 | constructor (opts) { |
18 | super() |
19 | this.state = opts.state |
20 | this.entryPort = opts.entryPort |
21 | this.hypervisor = opts.hypervisor |
22 | |
23 | this.containerState = 'idle' |
24 | this.ticks = 0 |
25 | |
26 | // create the port manager |
27 | this.ports = new PortManager(Object.assign({ |
28 | exoInterface: this |
29 | }, opts)) |
30 | |
31 | this._waitingMap = new Map() |
32 | this.container = new opts.Container(this) |
33 | |
34 | // once we get an result we run the next message |
35 | this.on('result', this._runNextMessage) |
36 | |
37 | // on idle clear all the 'wiats' |
38 | this.on('idle', () => { |
39 | for (const [, waiter] of this._waitingMap) { |
40 | waiter.resolve(this.ticks) |
41 | } |
42 | }) |
43 | } |
44 | |
45 | /** |
46 | * starts the container |
47 | * @returns {Promise} |
48 | */ |
49 | start () { |
50 | return this.ports.start() |
51 | } |
52 | |
53 | /** |
54 | * adds a message to this containers message queue |
55 | * @param {Message} message |
56 | */ |
57 | queue (message) { |
58 | message._hops++ |
59 | this.ports.queue(message) |
60 | if (this.containerState !== 'running') { |
61 | this._updateContainerState('running') |
62 | this._runNextMessage() |
63 | } |
64 | } |
65 | |
66 | _updateContainerState (containerState, message) { |
67 | this.containerState = containerState |
68 | this.emit(containerState, message) |
69 | } |
70 | |
71 | async _runNextMessage () { |
72 | const message = await this.ports.getNextMessage() |
73 | if (message) { |
74 | // run the next message |
75 | this.run(message) |
76 | } else { |
77 | // if no more messages then shut down |
78 | this._updateContainerState('idle') |
79 | } |
80 | } |
81 | |
82 | /** |
83 | * run the kernels code with a given enviroment |
84 | * The Kernel Stores all of its state in the Environment. The Interface is used |
85 | * to by the VM to retrive infromation from the Environment. |
86 | * @returns {Promise} |
87 | */ |
88 | async run (message) { |
89 | const oldState = clone(this.state, false, 3) |
90 | let result |
91 | try { |
92 | result = await this.container.run(message) || {} |
93 | } catch (e) { |
94 | // revert the state |
95 | clearObject(this.state) |
96 | Object.assign(this.state, oldState) |
97 | |
98 | result = { |
99 | exception: true, |
100 | exceptionError: e |
101 | } |
102 | } |
103 | |
104 | this.emit('result', result) |
105 | return result |
106 | } |
107 | |
108 | /** |
109 | * returns a promise that resolves once the kernel hits the threshould tick count |
110 | * @param {Number} threshould - the number of ticks to wait |
111 | * @returns {Promise} |
112 | */ |
113 | wait (threshold, fromPort) { |
114 | if (threshold <= this.ticks) { |
115 | return this.ticks |
116 | } else if (this.containerState === 'idle') { |
117 | return this.ports.wait(threshold, fromPort) |
118 | } else { |
119 | return new Promise((resolve, reject) => { |
120 | this._waitingMap.set(fromPort, { |
121 | threshold: threshold, |
122 | resolve: resolve, |
123 | from: fromPort |
124 | }) |
125 | }) |
126 | } |
127 | } |
128 | |
129 | /** |
130 | * updates the number of ticks that the container has run |
131 | * @param {Number} count - the number of ticks to add |
132 | */ |
133 | incrementTicks (count) { |
134 | this.ticks += count |
135 | for (const [fromPort, waiter] of this._waitingMap) { |
136 | if (waiter.threshold < this.ticks) { |
137 | this._waitingMap.delete(fromPort) |
138 | waiter.resolve(this.ticks) |
139 | } |
140 | } |
141 | } |
142 | |
143 | /** |
144 | * sends a message to a given port |
145 | * @param {Object} portRef - the port |
146 | * @param {Message} message - the message |
147 | */ |
148 | async send (portRef, message) { |
149 | if (!this.ports.isValidPort(portRef)) { |
150 | throw new Error('invalid port referance') |
151 | } |
152 | |
153 | // set the port that the message came from |
154 | message._fromPort = this.entryPort |
155 | message._fromPortTicks = this.ticks |
156 | |
157 | const container = await this.getInstance(portRef) |
158 | container.queue(message) |
159 | |
160 | const waiter = this._waitingMap.get(portRef) |
161 | // if the was a wait on this port the resolve it |
162 | if (waiter) { |
163 | waiter.resolve(this.ticks) |
164 | this._waitingMap.delete(portRef) |
165 | } |
166 | } |
167 | |
168 | /** |
169 | * gets a container instance given a port |
170 | * @param {Object} portRef - the port |
171 | * @returns {Object} |
172 | */ |
173 | getInstance (portRef) { |
174 | return this.hypervisor.getInstanceByPort(portRef, this.entryPort) |
175 | } |
176 | } |
177 |
Built with git-ssb-web