git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 83e76dba0665fdb89dc5636cfb77ae5150858b20

Files: 83e76dba0665fdb89dc5636cfb77ae5150858b20 / exoInterface.js

4532 bytesRaw
1const clearObject = require('object-clear')
2const clone = require('clone')
3const EventEmitter = require('events')
4const PortManager = require('./portManager.js')
5
6module.exports = class ExoInterface extends EventEmitter {
7 /**
8 * the ExoInterface manages the varous message passing functions and provides
9 * an interface for the containers to use
10 * @param {Object} opts
11 * @param {Object} opts.state
12 * @param {Object} opts.entryPort
13 * @param {Object} opts.parentPort
14 * @param {Object} opts.hypervisor
15 * @param {Object} opts.Container
16 */
17 constructor (opts) {
18 super()
19 this.state = opts.state
20 this.entryPort = opts.entryPort
21 this.hypervisor = opts.hypervisor
22
23 this.containerState = 'idle'
24 this.ticks = 0
25
26 // create the port manager
27 this.ports = new PortManager(Object.assign({
28 exoInterface: this
29 }, opts))
30
31 this._waitingMap = new Map()
32 this.container = new opts.Container(this)
33
34 // once we get an result we run the next message
35 this.on('result', this._runNextMessage)
36
37 // on idle clear all the 'wiats'
38 this.on('idle', () => {
39 for (const [, waiter] of this._waitingMap) {
40 waiter.resolve(this.ticks)
41 }
42 })
43 }
44
45 /**
46 * starts the container
47 * @returns {Promise}
48 */
49 start () {
50 return this.ports.start()
51 }
52
53 /**
54 * adds a message to this containers message queue
55 * @param {Message} message
56 */
57 queue (message) {
58 message._hops++
59 this.ports.queue(message)
60 if (this.containerState !== 'running') {
61 this._updateContainerState('running')
62 this._runNextMessage()
63 }
64 }
65
66 _updateContainerState (containerState, message) {
67 this.containerState = containerState
68 this.emit(containerState, message)
69 }
70
71 async _runNextMessage () {
72 const message = await this.ports.getNextMessage()
73 if (message) {
74 // run the next message
75 this.run(message)
76 } else {
77 // if no more messages then shut down
78 this._updateContainerState('idle')
79 }
80 }
81
82 /**
83 * run the kernels code with a given enviroment
84 * The Kernel Stores all of its state in the Environment. The Interface is used
85 * to by the VM to retrive infromation from the Environment.
86 * @returns {Promise}
87 */
88 async run (message) {
89 const oldState = clone(this.state, false, 3)
90 let result
91 try {
92 result = await this.container.run(message) || {}
93 } catch (e) {
94 // revert the state
95 clearObject(this.state)
96 Object.assign(this.state, oldState)
97
98 result = {
99 exception: true,
100 exceptionError: e
101 }
102 }
103
104 this.emit('result', result)
105 return result
106 }
107
108 /**
109 * returns a promise that resolves once the kernel hits the threshould tick count
110 * @param {Number} threshould - the number of ticks to wait
111 * @returns {Promise}
112 */
113 wait (threshold, fromPort) {
114 if (threshold <= this.ticks) {
115 return this.ticks
116 } else if (this.containerState === 'idle') {
117 return this.ports.wait(threshold, fromPort)
118 } else {
119 return new Promise((resolve, reject) => {
120 this._waitingMap.set(fromPort, {
121 threshold: threshold,
122 resolve: resolve,
123 from: fromPort
124 })
125 })
126 }
127 }
128
129 /**
130 * updates the number of ticks that the container has run
131 * @param {Number} count - the number of ticks to add
132 */
133 incrementTicks (count) {
134 this.ticks += count
135 for (const [fromPort, waiter] of this._waitingMap) {
136 if (waiter.threshold < this.ticks) {
137 this._waitingMap.delete(fromPort)
138 waiter.resolve(this.ticks)
139 }
140 }
141 }
142
143 /**
144 * sends a message to a given port
145 * @param {Object} portRef - the port
146 * @param {Message} message - the message
147 */
148 async send (portRef, message) {
149 if (!this.ports.isValidPort(portRef)) {
150 throw new Error('invalid port referance')
151 }
152
153 // set the port that the message came from
154 message._fromPort = this.entryPort
155 message._fromPortTicks = this.ticks
156
157 const container = await this.getInstance(portRef)
158 container.queue(message)
159
160 const waiter = this._waitingMap.get(portRef)
161 // if the was a wait on this port the resolve it
162 if (waiter) {
163 waiter.resolve(this.ticks)
164 this._waitingMap.delete(portRef)
165 }
166 }
167
168 /**
169 * gets a container instance given a port
170 * @param {Object} portRef - the port
171 * @returns {Object}
172 */
173 getInstance (portRef) {
174 return this.hypervisor.getInstanceByPort(portRef, this.entryPort)
175 }
176}
177

Built with git-ssb-web