git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 9fb034bef38565313b13d8e7de173a14de9812df

Files: 9fb034bef38565313b13d8e7de173a14de9812df / exoInterface.js

4985 bytesRaw
1const clearObject = require('object-clear')
2const clone = require('clone')
3const EventEmitter = require('events')
4const PortManager = require('./portManager.js')
5const Message = require('primea-message')
6
7module.exports = class ExoInterface extends EventEmitter {
8 /**
9 * the ExoInterface manages the varous message passing functions and provides
10 * an interface for the containers to use
11 * @param {Object} opts
12 * @param {Object} opts.state
13 * @param {Object} opts.entryPort
14 * @param {Object} opts.parentPort
15 * @param {Object} opts.hypervisor
16 * @param {Object} opts.Container
17 */
18 constructor (opts) {
19 super()
20 this.state = opts.state
21 this.entryPort = opts.entryPort
22 this.hypervisor = opts.hypervisor
23
24 this.containerState = 'idle'
25
26 // the total number of ticks that the container has ran
27 this.ticks = 0
28
29 // create the port manager
30 this.ports = new PortManager(Object.assign({
31 exoInterface: this
32 }, opts))
33
34 this._waitingMap = new Map()
35 this.container = new opts.container.Constructor(this, opts.container.args)
36
37 // once we get an result we run the next message
38 this.on('result', this._runNextMessage)
39
40 // on idle clear all the 'wiats'
41 this.on('idle', () => {
42 for (const [, waiter] of this._waitingMap) {
43 waiter.resolve(this.ticks)
44 }
45 })
46 }
47
48 /**
49 * starts the container
50 * @returns {Promise}
51 */
52 start () {
53 return this.ports.start()
54 }
55
56 /**
57 * adds a message to this containers message queue
58 * @param {Message} message
59 */
60 queue (message) {
61 message._hops++
62 this.ports.queue(message)
63 if (this.containerState !== 'running') {
64 this._updateContainerState('running')
65 this._runNextMessage()
66 }
67 }
68
69 _updateContainerState (containerState, message) {
70 this.containerState = containerState
71 this.emit(containerState, message)
72 }
73
74 async _runNextMessage () {
75 const message = await this.ports.getNextMessage()
76 if (message) {
77 // run the next message
78 this.run(message)
79 } else {
80 // if no more messages then shut down
81 this._updateContainerState('idle')
82 }
83 }
84
85 /**
86 * run the kernels code with a given enviroment
87 * The Kernel Stores all of its state in the Environment. The Interface is used
88 * to by the VM to retrive infromation from the Environment.
89 * @returns {Promise}
90 */
91 async run (message) {
92 const oldState = clone(this.state, false, 3)
93 let result
94 try {
95 result = await this.container.run(message) || {}
96 } catch (e) {
97 // revert the state
98 clearObject(this.state)
99 Object.assign(this.state, oldState)
100
101 result = {
102 exception: true,
103 exceptionError: e
104 }
105 }
106
107 this.emit('result', result)
108 return result
109 }
110
111 /**
112 * returns a promise that resolves once the kernel hits the threshould tick count
113 * @param {Number} threshould - the number of ticks to wait
114 * @returns {Promise}
115 */
116 wait (threshold, fromPort) {
117 if (threshold <= this.ticks) {
118 return this.ticks
119 } else if (this.containerState === 'idle') {
120 return this.ports.wait(threshold, fromPort)
121 } else {
122 return new Promise((resolve, reject) => {
123 this._waitingMap.set(fromPort, {
124 threshold: threshold,
125 resolve: resolve,
126 from: fromPort
127 })
128 })
129 }
130 }
131
132 /**
133 * updates the number of ticks that the container has run
134 * @param {Number} count - the number of ticks to add
135 */
136 incrementTicks (count) {
137 this.ticks += count
138 for (const [fromPort, waiter] of this._waitingMap) {
139 if (waiter.threshold < this.ticks) {
140 this._waitingMap.delete(fromPort)
141 waiter.resolve(this.ticks)
142 }
143 }
144 }
145
146 /**
147 * creates a new message
148 * @param {*} data
149 */
150 createMessage (opts) {
151 const message = new Message(opts)
152 for (const port of message.ports) {
153 if (this.ports.isBound(port)) {
154 throw new Error('message must not contain bound ports')
155 }
156 }
157 return message
158 }
159
160 /**
161 * sends a message to a given port
162 * @param {Object} portRef - the port
163 * @param {Message} message - the message
164 */
165 async send (portRef, message) {
166 if (!this.ports.isBound(portRef)) {
167 throw new Error('cannot send message with an unbound port')
168 }
169
170 // set the port that the message came from
171 message._fromPort = this.entryPort
172 message._fromPortTicks = this.ticks
173
174 const container = await this.getInstance(portRef)
175 container.queue(message)
176
177 const waiter = this._waitingMap.get(portRef)
178 // if the was a wait on this port the resolve it
179 if (waiter) {
180 waiter.resolve(this.ticks)
181 this._waitingMap.delete(portRef)
182 }
183 }
184
185 /**
186 * gets a container instance given a port
187 * @param {Object} portRef - the port
188 * @returns {Object}
189 */
190 getInstance (portRef) {
191 return this.hypervisor.getInstanceByPort(portRef, this.entryPort)
192 }
193}
194

Built with git-ssb-web