Files: 1df41b95ebf09568a8c125df38f2bcedfb9ad8b5 / exoInterface.js
4768 bytesRaw
1 | const clearObject = require('object-clear') |
2 | const clone = require('clone') |
3 | const EventEmitter = require('events') |
4 | const PortManager = require('./portManager.js') |
5 | const Message = require('primea-message') |
6 | |
7 | module.exports = class ExoInterface extends EventEmitter { |
8 | /** |
9 | * the ExoInterface manages the varous message passing functions and provides |
10 | * an interface for the containers to use |
11 | * @param {Object} opts |
12 | * @param {Object} opts.state |
13 | * @param {Object} opts.entryPort |
14 | * @param {Object} opts.parentPort |
15 | * @param {Object} opts.hypervisor |
16 | * @param {Object} opts.Container |
17 | */ |
18 | constructor (opts) { |
19 | super() |
20 | this.state = opts.state |
21 | this.entryPort = opts.entryPort |
22 | this.hypervisor = opts.hypervisor |
23 | |
24 | this.containerState = 'idle' |
25 | |
26 | // the total number of ticks that the container has ran |
27 | this.ticks = 0 |
28 | |
29 | // create the port manager |
30 | this.ports = new PortManager(Object.assign({ |
31 | exoInterface: this |
32 | }, opts)) |
33 | |
34 | this._waitingMap = new Map() |
35 | this.container = new opts.Container(this) |
36 | |
37 | // once we get an result we run the next message |
38 | this.on('result', this._runNextMessage) |
39 | |
40 | // on idle clear all the 'wiats' |
41 | this.on('idle', () => { |
42 | for (const [, waiter] of this._waitingMap) { |
43 | waiter.resolve(this.ticks) |
44 | } |
45 | }) |
46 | } |
47 | |
48 | /** |
49 | * starts the container |
50 | * @returns {Promise} |
51 | */ |
52 | start () { |
53 | return this.ports.start() |
54 | } |
55 | |
56 | /** |
57 | * adds a message to this containers message queue |
58 | * @param {Message} message |
59 | */ |
60 | queue (message) { |
61 | message._hops++ |
62 | this.ports.queue(message) |
63 | if (this.containerState !== 'running') { |
64 | this._updateContainerState('running') |
65 | this._runNextMessage() |
66 | } |
67 | } |
68 | |
69 | _updateContainerState (containerState, message) { |
70 | this.containerState = containerState |
71 | this.emit(containerState, message) |
72 | } |
73 | |
74 | async _runNextMessage () { |
75 | const message = await this.ports.getNextMessage() |
76 | if (message) { |
77 | // run the next message |
78 | this.run(message) |
79 | } else { |
80 | // if no more messages then shut down |
81 | this._updateContainerState('idle') |
82 | } |
83 | } |
84 | |
85 | /** |
86 | * run the kernels code with a given enviroment |
87 | * The Kernel Stores all of its state in the Environment. The Interface is used |
88 | * to by the VM to retrive infromation from the Environment. |
89 | * @returns {Promise} |
90 | */ |
91 | async run (message) { |
92 | const oldState = clone(this.state, false, 3) |
93 | let result |
94 | try { |
95 | result = await this.container.run(message) || {} |
96 | } catch (e) { |
97 | // revert the state |
98 | clearObject(this.state) |
99 | Object.assign(this.state, oldState) |
100 | |
101 | result = { |
102 | exception: true, |
103 | exceptionError: e |
104 | } |
105 | } |
106 | |
107 | this.emit('result', result) |
108 | return result |
109 | } |
110 | |
111 | /** |
112 | * returns a promise that resolves once the kernel hits the threshould tick count |
113 | * @param {Number} threshould - the number of ticks to wait |
114 | * @returns {Promise} |
115 | */ |
116 | wait (threshold, fromPort) { |
117 | if (threshold <= this.ticks) { |
118 | return this.ticks |
119 | } else if (this.containerState === 'idle') { |
120 | return this.ports.wait(threshold, fromPort) |
121 | } else { |
122 | return new Promise((resolve, reject) => { |
123 | this._waitingMap.set(fromPort, { |
124 | threshold: threshold, |
125 | resolve: resolve, |
126 | from: fromPort |
127 | }) |
128 | }) |
129 | } |
130 | } |
131 | |
132 | /** |
133 | * updates the number of ticks that the container has run |
134 | * @param {Number} count - the number of ticks to add |
135 | */ |
136 | incrementTicks (count) { |
137 | this.ticks += count |
138 | for (const [fromPort, waiter] of this._waitingMap) { |
139 | if (waiter.threshold < this.ticks) { |
140 | this._waitingMap.delete(fromPort) |
141 | waiter.resolve(this.ticks) |
142 | } |
143 | } |
144 | } |
145 | |
146 | /** |
147 | * creates a new message |
148 | * @param {*} data |
149 | */ |
150 | createMessage (data) { |
151 | return new Message(data) |
152 | } |
153 | |
154 | /** |
155 | * sends a message to a given port |
156 | * @param {Object} portRef - the port |
157 | * @param {Message} message - the message |
158 | */ |
159 | async send (portRef, message) { |
160 | if (!this.ports.isBound(portRef)) { |
161 | throw new Error('cannot send message with an unbound port') |
162 | } |
163 | |
164 | // set the port that the message came from |
165 | message._fromPort = this.entryPort |
166 | message._fromPortTicks = this.ticks |
167 | |
168 | const container = await this.getInstance(portRef) |
169 | container.queue(message) |
170 | |
171 | const waiter = this._waitingMap.get(portRef) |
172 | // if the was a wait on this port the resolve it |
173 | if (waiter) { |
174 | waiter.resolve(this.ticks) |
175 | this._waitingMap.delete(portRef) |
176 | } |
177 | } |
178 | |
179 | /** |
180 | * gets a container instance given a port |
181 | * @param {Object} portRef - the port |
182 | * @returns {Object} |
183 | */ |
184 | getInstance (portRef) { |
185 | return this.hypervisor.getInstanceByPort(portRef, this.entryPort) |
186 | } |
187 | } |
188 |
Built with git-ssb-web