git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: acc114f061e4e4c2cbede55a72d008b7e46c5cbc

Files: acc114f061e4e4c2cbede55a72d008b7e46c5cbc / kernel.js

4793 bytesRaw
1const PriorityQueue = require('fastpriorityqueue')
2const EventEmitter = require('events')
3const BN = require('bn.js')
4const PortManager = require('./portManager.js')
5
6module.exports = class Kernel extends EventEmitter {
7 constructor (opts) {
8 super()
9 this.state = opts.state
10 this.entryPort = opts.entryPort
11 this.hypervisor = opts.hypervisor
12
13 this.vmState = 'idle'
14 this.ticks = 0
15 // create the port manager
16 this.ports = new PortManager({
17 kernel: this,
18 hypervisor: opts.hypervisor,
19 ports: opts.state.ports,
20 entryPort: opts.entryPort,
21 parentPort: opts.parentPort
22 })
23
24 this.vm = new opts.VM(this)
25 this._waitingQueue = new PriorityQueue((a, b) => {
26 return a.threshold > b.threshold
27 })
28 this.on('result', this._runNextMessage)
29 this.on('idle', () => {
30 while (!this._waitingQueue.isEmpty()) {
31 const waiter = this._waitingQueue.poll()
32 this.wait(waiter.ticks, waiter.from).then(ticks => {
33 waiter.resolve(ticks)
34 })
35 }
36 })
37 }
38
39 start () {
40 return this.ports.start()
41 }
42
43 queue (message) {
44 this.ports.queue(message)
45 if (this.vmState !== 'running') {
46 this._updateVmState('running')
47 this._runNextMessage()
48 }
49 }
50
51 _updateVmState (vmState, message) {
52 this.vmState = vmState
53 this.emit(vmState, message)
54 }
55
56 async _runNextMessage () {
57 const message = await this.ports.getNextMessage()
58 // if the vm is paused and it gets a message; save that message for use when the VM is resumed
59 if (message && this.vmState === 'paused') {
60 this.ports._portMap(message._fromPort).unshfit(message)
61 } else if (!message && this.vmState !== 'paused') {
62 // if no more messages then shut down
63 this._updateVmState('idle')
64 } else {
65 // run the next message
66 this._run(message)
67 }
68 }
69
70 _updateEntryPort (entryPort) {
71 // reset waits, update parent port
72 }
73
74 destroy () {
75 // destory waits
76 }
77
78 pause () {
79 this._setState('paused')
80 }
81
82 resume () {
83 this._setState('running')
84 this._runNextMessage()
85 }
86
87 /**
88 * run the kernels code with a given enviroment
89 * The Kernel Stores all of its state in the Environment. The Interface is used
90 * to by the VM to retrive infromation from the Environment.
91 */
92 async _run (message) {
93 // shallow copy
94 const oldState = Object.assign({}, this.state)
95 let result
96 try {
97 result = await this.vm.run(message) || {}
98 } catch (e) {
99 result = {
100 exception: true,
101 exceptionError: e
102 }
103 clearObject(this.state)
104 Object.assign(this.state, oldState)
105 console.log(e)
106 }
107
108 this.emit('result', result)
109 return result
110 }
111
112 // returns a promise that resolves once the kernel hits the threshould tick
113 // count
114 async wait (threshold, fromPort) {
115 if (threshold <= this.ticks) {
116 return this.ticks
117 } else if (this.vmState === 'idle') {
118 return this.ports.wait(threshold, fromPort)
119 } else {
120 return new Promise((resolve, reject) => {
121 this._waitingQueue.add({
122 threshold: threshold,
123 resolve: resolve,
124 from: fromPort
125 })
126 })
127 }
128 }
129
130 incrementTicks (count) {
131 this.ticks += count
132 while (!this._waitingQueue.isEmpty()) {
133 const waiter = this._waitingQueue.peek()
134 if (waiter.threshold > this.ticks) {
135 break
136 } else {
137 this._waitingQueue.poll().resolve(this.ticks)
138 }
139 }
140 }
141
142 async createPort (type, name) {
143 const VM = this.hypervisor._VMs[type]
144 const parentId = this.entryPort ? this.entryPort.id : null
145 const portRef = {
146 'messages': [],
147 'id': {
148 '/': {
149 nonce: this.state.nonce,
150 parent: parentId
151 }
152 },
153 'type': type,
154 'link': {
155 '/': VM.createState()
156 }
157 }
158
159 // create the port instance
160 await this.ports.set(name, portRef)
161 // incerment the nonce
162 const nonce = new BN(this.state.nonce)
163 nonce.iaddn(1)
164 this.state.nonce = nonce.toArray()
165 return portRef
166 }
167
168 async send (portRef, message) {
169 try {
170 const portInstance = await this.ports.get(portRef)
171 portInstance.hasSent = true
172 } catch (e) {
173 throw new Error('invalid port referance, which means the port that the port was either moved or destoried')
174 }
175
176 const id = await this.hypervisor.generateID(this.entryPort)
177 message._fromPort = id
178 message._ticks = this.ticks
179
180 const receiverEntryPort = portRef === this.entryPort ? this.parentPort : portRef
181 const vm = await this.hypervisor.getInstance(receiverEntryPort)
182 vm.queue(message)
183 }
184}
185
186function clearObject (myObject) {
187 for (var member in myObject) {
188 delete myObject[member]
189 }
190}
191

Built with git-ssb-web