git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 520f3b1d4db295bd3556d353999fb06821185970

Files: 520f3b1d4db295bd3556d353999fb06821185970 / exoInterface.js

4762 bytesRaw
1const EventEmitter = require('events')
2const PortManager = require('./portManager.js')
3const Message = require('primea-message')
4
5module.exports = class ExoInterface extends EventEmitter {
6 /**
7 * the ExoInterface manages the varous message passing functions and provides
8 * an interface for the containers to use
9 * @param {Object} opts
10 * @param {Object} opts.state
11 * @param {Object} opts.entryPort
12 * @param {Object} opts.parentPort
13 * @param {Object} opts.hypervisor
14 * @param {Object} opts.Container
15 */
16 constructor (opts) {
17 super()
18 this.state = opts.state
19 this.entryPort = opts.entryPort
20 this.hypervisor = opts.hypervisor
21
22 this.containerState = 'idle'
23
24 // the total number of ticks that the container has ran
25 this.ticks = 0
26
27 // create the port manager
28 this.ports = new PortManager(Object.assign({
29 exoInterface: this
30 }, opts))
31
32 this._waitingMap = new Map()
33 this.container = new opts.container.Constructor(this, opts.container.args)
34
35 // once we get an result we run the next message
36 this.on('result', this._runNextMessage)
37
38 // on idle clear all the 'wiats'
39 this.on('idle', () => {
40 for (const [, waiter] of this._waitingMap) {
41 waiter.resolve(this.ticks)
42 }
43 })
44 }
45
46 /**
47 * starts the container
48 * @returns {Promise}
49 */
50 start () {
51 return this.ports.start()
52 }
53
54 /**
55 * adds a message to this containers message queue
56 * @param {Message} message
57 */
58 queue (message) {
59 message._hops++
60 this.ports.queue(message)
61 if (this.containerState !== 'running') {
62 this._updateContainerState('running')
63 this._runNextMessage()
64 }
65 }
66
67 _updateContainerState (containerState, message) {
68 this.containerState = containerState
69 this.emit(containerState, message)
70 }
71
72 async _runNextMessage () {
73 const message = await this.ports.getNextMessage()
74 if (message) {
75 // run the next message
76 this.run(message)
77 } else {
78 // if no more messages then shut down
79 this._updateContainerState('idle')
80 }
81 }
82
83 /**
84 * run the kernels code with a given enviroment
85 * The Kernel Stores all of its state in the Environment. The Interface is used
86 * to by the VM to retrive infromation from the Environment.
87 * @returns {Promise}
88 */
89 async run (message) {
90 let result
91 try {
92 result = await this.container.run(message) || {}
93 } catch (e) {
94 result = {
95 exception: true,
96 exceptionError: e
97 }
98 }
99
100 this.emit('result', result)
101 return result
102 }
103
104 /**
105 * returns a promise that resolves once the kernel hits the threshould tick count
106 * @param {Number} threshould - the number of ticks to wait
107 * @returns {Promise}
108 */
109 wait (threshold, fromPort) {
110 if (threshold <= this.ticks) {
111 return this.ticks
112 } else if (this.containerState === 'idle') {
113 return this.ports.wait(threshold, fromPort)
114 } else {
115 return new Promise((resolve, reject) => {
116 this._waitingMap.set(fromPort, {
117 threshold: threshold,
118 resolve: resolve,
119 from: fromPort
120 })
121 })
122 }
123 }
124
125 /**
126 * updates the number of ticks that the container has run
127 * @param {Number} count - the number of ticks to add
128 */
129 incrementTicks (count) {
130 this.ticks += count
131 for (const [fromPort, waiter] of this._waitingMap) {
132 if (waiter.threshold < this.ticks) {
133 this._waitingMap.delete(fromPort)
134 waiter.resolve(this.ticks)
135 }
136 }
137 }
138
139 /**
140 * creates a new message
141 * @param {*} data
142 */
143 createMessage (opts) {
144 const message = new Message(opts)
145 for (const port of message.ports) {
146 if (this.ports.isBound(port)) {
147 throw new Error('message must not contain bound ports')
148 }
149 }
150 return message
151 }
152
153 /**
154 * sends a message to a given port
155 * @param {Object} portRef - the port
156 * @param {Message} message - the message
157 */
158 async send (portRef, message) {
159 if (!this.ports.isBound(portRef)) {
160 throw new Error('cannot send message with an unbound port')
161 }
162
163 // set the port that the message came from
164 message._fromPort = this.entryPort
165 message._fromPortTicks = this.ticks
166
167 const container = await this.getInstance(portRef)
168 container.queue(message)
169
170 const waiter = this._waitingMap.get(portRef)
171 // if the was a wait on this port the resolve it
172 if (waiter) {
173 waiter.resolve(this.ticks)
174 this._waitingMap.delete(portRef)
175 }
176 }
177
178 /**
179 * gets a container instance given a port
180 * @param {Object} portRef - the port
181 * @returns {Object}
182 */
183 getInstance (portRef) {
184 return this.hypervisor.getInstanceByPort(portRef, this.entryPort)
185 }
186}
187

Built with git-ssb-web