Files: eb936c252664b13565410d09b4facc45b738e249 / exoInterface.js
3757 bytesRaw
1 | const clearObject = require('object-clear') |
2 | const clone = require('clone') |
3 | const EventEmitter = require('events') |
4 | const PortManager = require('./portManager.js') |
5 | |
6 | module.exports = class ExoInterface extends EventEmitter { |
7 | /** |
8 | * the ExoInterface manages the varous message passing functions and provides |
9 | * an interface for the containers to use |
10 | * @param {Object} opts |
11 | * @param {Object} opts.state |
12 | * @param {Object} opts.entryPort |
13 | * @param {Object} opts.parentPort |
14 | * @param {Object} opts.hypervisor |
15 | * @param {Object} opts.Container |
16 | */ |
17 | constructor (opts) { |
18 | super() |
19 | this.state = opts.state |
20 | this.entryPort = opts.entryPort |
21 | this.hypervisor = opts.hypervisor |
22 | |
23 | this.containerState = 'idle' |
24 | this.ticks = 0 |
25 | |
26 | // create the port manager |
27 | this.ports = new PortManager(Object.assign({ |
28 | exoInterface: this |
29 | }, opts)) |
30 | |
31 | this._waitingMap = new Map() |
32 | this.container = new opts.Container(this) |
33 | |
34 | // once we get an result we run the next message |
35 | this.on('result', this._runNextMessage) |
36 | |
37 | // on idle clear all the 'wiats' |
38 | this.on('idle', () => { |
39 | for (const [, waiter] of this._waitingMap) { |
40 | waiter.resolve(this.ticks) |
41 | } |
42 | }) |
43 | } |
44 | |
45 | start () { |
46 | return this.ports.start() |
47 | } |
48 | |
49 | queue (message) { |
50 | message._hops++ |
51 | this.ports.queue(message) |
52 | if (this.containerState !== 'running') { |
53 | this._updateContainerState('running') |
54 | this._runNextMessage() |
55 | } |
56 | } |
57 | |
58 | _updateContainerState (containerState, message) { |
59 | this.containerState = containerState |
60 | this.emit(containerState, message) |
61 | } |
62 | |
63 | async _runNextMessage () { |
64 | const message = await this.ports.getNextMessage() |
65 | if (message) { |
66 | // run the next message |
67 | this.run(message) |
68 | } else { |
69 | // if no more messages then shut down |
70 | this._updateContainerState('idle') |
71 | } |
72 | } |
73 | |
74 | /** |
75 | * run the kernels code with a given enviroment |
76 | * The Kernel Stores all of its state in the Environment. The Interface is used |
77 | * to by the VM to retrive infromation from the Environment. |
78 | */ |
79 | async run (message) { |
80 | const oldState = clone(this.state, false, 3) |
81 | let result |
82 | try { |
83 | result = await this.container.run(message) || {} |
84 | } catch (e) { |
85 | // revert the state |
86 | clearObject(this.state) |
87 | Object.assign(this.state, oldState) |
88 | |
89 | result = { |
90 | exception: true, |
91 | exceptionError: e |
92 | } |
93 | } |
94 | |
95 | this.emit('result', result) |
96 | return result |
97 | } |
98 | |
99 | // returns a promise that resolves once the kernel hits the threshould tick |
100 | // count |
101 | wait (threshold, fromPort) { |
102 | if (threshold <= this.ticks) { |
103 | return this.ticks |
104 | } else if (this.containerState === 'idle') { |
105 | return this.ports.wait(threshold, fromPort) |
106 | } else { |
107 | return new Promise((resolve, reject) => { |
108 | this._waitingMap.set(fromPort, { |
109 | threshold: threshold, |
110 | resolve: resolve, |
111 | from: fromPort |
112 | }) |
113 | }) |
114 | } |
115 | } |
116 | |
117 | incrementTicks (count) { |
118 | this.ticks += count |
119 | for (const [fromPort, waiter] of this._waitingMap) { |
120 | if (waiter.threshold < this.ticks) { |
121 | this._waitingMap.delete(fromPort) |
122 | waiter.resolve(this.ticks) |
123 | } |
124 | } |
125 | } |
126 | |
127 | async send (portRef, message) { |
128 | if (!this.ports.isValidPort(portRef)) { |
129 | throw new Error('invalid port referance') |
130 | } |
131 | |
132 | // set the port that the message came from |
133 | message._fromPort = this.entryPort |
134 | message._fromPortTicks = this.ticks |
135 | |
136 | const instance = await this.hypervisor.getOrCreateInstance(portRef, this.entryPort) |
137 | instance.queue(message) |
138 | |
139 | const waiter = this._waitingMap.get(portRef) |
140 | if (waiter) { |
141 | waiter.resolve(this.ticks) |
142 | this._waitingMap.delete(portRef) |
143 | } |
144 | } |
145 | } |
146 |
Built with git-ssb-web