git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: eb936c252664b13565410d09b4facc45b738e249

Files: eb936c252664b13565410d09b4facc45b738e249 / exoInterface.js

3757 bytesRaw
1const clearObject = require('object-clear')
2const clone = require('clone')
3const EventEmitter = require('events')
4const PortManager = require('./portManager.js')
5
6module.exports = class ExoInterface extends EventEmitter {
7 /**
8 * the ExoInterface manages the varous message passing functions and provides
9 * an interface for the containers to use
10 * @param {Object} opts
11 * @param {Object} opts.state
12 * @param {Object} opts.entryPort
13 * @param {Object} opts.parentPort
14 * @param {Object} opts.hypervisor
15 * @param {Object} opts.Container
16 */
17 constructor (opts) {
18 super()
19 this.state = opts.state
20 this.entryPort = opts.entryPort
21 this.hypervisor = opts.hypervisor
22
23 this.containerState = 'idle'
24 this.ticks = 0
25
26 // create the port manager
27 this.ports = new PortManager(Object.assign({
28 exoInterface: this
29 }, opts))
30
31 this._waitingMap = new Map()
32 this.container = new opts.Container(this)
33
34 // once we get an result we run the next message
35 this.on('result', this._runNextMessage)
36
37 // on idle clear all the 'wiats'
38 this.on('idle', () => {
39 for (const [, waiter] of this._waitingMap) {
40 waiter.resolve(this.ticks)
41 }
42 })
43 }
44
45 start () {
46 return this.ports.start()
47 }
48
49 queue (message) {
50 message._hops++
51 this.ports.queue(message)
52 if (this.containerState !== 'running') {
53 this._updateContainerState('running')
54 this._runNextMessage()
55 }
56 }
57
58 _updateContainerState (containerState, message) {
59 this.containerState = containerState
60 this.emit(containerState, message)
61 }
62
63 async _runNextMessage () {
64 const message = await this.ports.getNextMessage()
65 if (message) {
66 // run the next message
67 this.run(message)
68 } else {
69 // if no more messages then shut down
70 this._updateContainerState('idle')
71 }
72 }
73
74 /**
75 * run the kernels code with a given enviroment
76 * The Kernel Stores all of its state in the Environment. The Interface is used
77 * to by the VM to retrive infromation from the Environment.
78 */
79 async run (message) {
80 const oldState = clone(this.state, false, 3)
81 let result
82 try {
83 result = await this.container.run(message) || {}
84 } catch (e) {
85 // revert the state
86 clearObject(this.state)
87 Object.assign(this.state, oldState)
88
89 result = {
90 exception: true,
91 exceptionError: e
92 }
93 }
94
95 this.emit('result', result)
96 return result
97 }
98
99 // returns a promise that resolves once the kernel hits the threshould tick
100 // count
101 wait (threshold, fromPort) {
102 if (threshold <= this.ticks) {
103 return this.ticks
104 } else if (this.containerState === 'idle') {
105 return this.ports.wait(threshold, fromPort)
106 } else {
107 return new Promise((resolve, reject) => {
108 this._waitingMap.set(fromPort, {
109 threshold: threshold,
110 resolve: resolve,
111 from: fromPort
112 })
113 })
114 }
115 }
116
117 incrementTicks (count) {
118 this.ticks += count
119 for (const [fromPort, waiter] of this._waitingMap) {
120 if (waiter.threshold < this.ticks) {
121 this._waitingMap.delete(fromPort)
122 waiter.resolve(this.ticks)
123 }
124 }
125 }
126
127 async send (portRef, message) {
128 if (!this.ports.isValidPort(portRef)) {
129 throw new Error('invalid port referance')
130 }
131
132 // set the port that the message came from
133 message._fromPort = this.entryPort
134 message._fromPortTicks = this.ticks
135
136 const instance = await this.hypervisor.getOrCreateInstance(portRef, this.entryPort)
137 instance.queue(message)
138
139 const waiter = this._waitingMap.get(portRef)
140 if (waiter) {
141 waiter.resolve(this.ticks)
142 this._waitingMap.delete(portRef)
143 }
144 }
145}
146

Built with git-ssb-web