Files: 9ff63d468409beadacd8b12f0eddf4160cecf81e / exoInterface.js
3986 bytesRaw
1 | const clearObject = require('object-clear') |
2 | const clone = require('clone') |
3 | const EventEmitter = require('events') |
4 | const PortManager = require('./portManager.js') |
5 | |
6 | module.exports = class ExoInterface extends EventEmitter { |
7 | /** |
8 | * the ExoInterface manages the varous message passing functions and provides |
9 | * an interface for the containers to use |
10 | * @param {Object} opts |
11 | * @param {Object} opts.state |
12 | * @param {Object} opts.entryPort |
13 | * @param {Object} opts.parentPort |
14 | * @param {Object} opts.hypervisor |
15 | * @param {Object} opts.Container |
16 | */ |
17 | constructor (opts) { |
18 | super() |
19 | this.state = opts.state |
20 | this.entryPort = opts.entryPort |
21 | this.hypervisor = opts.hypervisor |
22 | |
23 | this.containerState = 'idle' |
24 | this.ticks = 0 |
25 | |
26 | // create the port manager |
27 | this.ports = new PortManager(Object.assign({ |
28 | exoInterface: this |
29 | }, opts)) |
30 | |
31 | this._waitingMap = new Map() |
32 | this.container = new opts.Container(this) |
33 | |
34 | // once we get an result we run the next message |
35 | this.on('result', this._runNextMessage) |
36 | |
37 | // on idle clear all the 'wiats' |
38 | this.on('idle', () => { |
39 | for (const [, waiter] of this._waitingMap) { |
40 | waiter.resolve(this.ticks) |
41 | } |
42 | }) |
43 | } |
44 | |
45 | /** |
46 | * starts the container |
47 | * @returns {Promise} |
48 | */ |
49 | start () { |
50 | return this.ports.start() |
51 | } |
52 | |
53 | /** |
54 | * adds a message to this containers message queue |
55 | * @param {Message} message |
56 | */ |
57 | queue (message) { |
58 | message._hops++ |
59 | this.ports.queue(message) |
60 | if (this.containerState !== 'running') { |
61 | this._updateContainerState('running') |
62 | this._runNextMessage() |
63 | } |
64 | } |
65 | |
66 | _updateContainerState (containerState, message) { |
67 | this.containerState = containerState |
68 | this.emit(containerState, message) |
69 | } |
70 | |
71 | async _runNextMessage () { |
72 | const message = await this.ports.getNextMessage() |
73 | if (message) { |
74 | // run the next message |
75 | this.run(message) |
76 | } else { |
77 | // if no more messages then shut down |
78 | this._updateContainerState('idle') |
79 | } |
80 | } |
81 | |
82 | /** |
83 | * run the kernels code with a given enviroment |
84 | * The Kernel Stores all of its state in the Environment. The Interface is used |
85 | * to by the VM to retrive infromation from the Environment. |
86 | */ |
87 | async run (message) { |
88 | const oldState = clone(this.state, false, 3) |
89 | let result |
90 | try { |
91 | result = await this.container.run(message) || {} |
92 | } catch (e) { |
93 | // revert the state |
94 | clearObject(this.state) |
95 | Object.assign(this.state, oldState) |
96 | |
97 | result = { |
98 | exception: true, |
99 | exceptionError: e |
100 | } |
101 | } |
102 | |
103 | this.emit('result', result) |
104 | return result |
105 | } |
106 | |
107 | // returns a promise that resolves once the kernel hits the threshould tick |
108 | // count |
109 | wait (threshold, fromPort) { |
110 | if (threshold <= this.ticks) { |
111 | return this.ticks |
112 | } else if (this.containerState === 'idle') { |
113 | return this.ports.wait(threshold, fromPort) |
114 | } else { |
115 | return new Promise((resolve, reject) => { |
116 | this._waitingMap.set(fromPort, { |
117 | threshold: threshold, |
118 | resolve: resolve, |
119 | from: fromPort |
120 | }) |
121 | }) |
122 | } |
123 | } |
124 | |
125 | incrementTicks (count) { |
126 | this.ticks += count |
127 | for (const [fromPort, waiter] of this._waitingMap) { |
128 | if (waiter.threshold < this.ticks) { |
129 | this._waitingMap.delete(fromPort) |
130 | waiter.resolve(this.ticks) |
131 | } |
132 | } |
133 | } |
134 | |
135 | async send (portRef, message) { |
136 | if (!this.ports.isValidPort(portRef)) { |
137 | throw new Error('invalid port referance') |
138 | } |
139 | |
140 | // set the port that the message came from |
141 | message._fromPort = this.entryPort |
142 | message._fromPortTicks = this.ticks |
143 | |
144 | const container = await this.getContainer(portRef) |
145 | container.queue(message) |
146 | |
147 | const waiter = this._waitingMap.get(portRef) |
148 | if (waiter) { |
149 | waiter.resolve(this.ticks) |
150 | this._waitingMap.delete(portRef) |
151 | } |
152 | } |
153 | |
154 | getContainer (portRef) { |
155 | return this.hypervisor.getOrCreateInstance(portRef, this.entryPort) |
156 | } |
157 | } |
158 |
Built with git-ssb-web