git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 463dc84f2cfff964ba0c0ea55de6f903a4f344a3

Files: 463dc84f2cfff964ba0c0ea55de6f903a4f344a3 / kernel.js

4946 bytesRaw
1const PriorityQueue = require('fastpriorityqueue')
2const EventEmitter = require('events')
3const BN = require('bn.js')
4const PortManager = require('./portManager.js')
5
6module.exports = class Kernel extends EventEmitter {
7 constructor (opts) {
8 super()
9 this.state = opts.state
10 this.entryPort = opts.entryPort
11 this.hypervisor = opts.hypervisor
12
13 this.vmState = 'idle'
14 this.ticks = 0
15 // create the port manager
16 this.ports = new PortManager({
17 kernel: this,
18 hypervisor: opts.hypervisor,
19 state: opts.state,
20 entryPort: opts.entryPort,
21 parentPort: opts.parentPort
22 })
23
24 this.vm = new opts.VM(this)
25 this._waitingQueue = new PriorityQueue((a, b) => {
26 return a.threshold > b.threshold
27 })
28 this.on('result', this._runNextMessage)
29 this.on('idle', () => {
30 while (!this._waitingQueue.isEmpty()) {
31 const waiter = this._waitingQueue.poll()
32 this.wait(waiter.ticks, waiter.from).then(ticks => {
33 waiter.resolve(ticks)
34 })
35 }
36 })
37 }
38
39 start () {
40 return this.ports.start()
41 }
42
43 queue (message) {
44 this.ports.queue(message)
45 if (this.vmState !== 'running') {
46 this._updateVmState('running')
47 this._runNextMessage()
48 }
49 }
50
51 _updateVmState (vmState, message) {
52 this.vmState = vmState
53 this.emit(vmState, message)
54 }
55
56 async _runNextMessage () {
57 console.log('run next message')
58 try {
59 const message = await this.ports.getNextMessage()
60 // if the vm is paused and it gets a message; save that message for use when the VM is resumed
61 if (message && this.vmState === 'paused') {
62 this.ports._portMap(message._fromPort).unshfit(message)
63 } else if (!message && this.vmState !== 'paused') {
64 // if no more messages then shut down
65 this._updateVmState('idle')
66 } else {
67 // run the next message
68 this._run(message)
69 }
70 } catch (e) {
71 console.log(e)
72 }
73 }
74
75 _updateEntryPort (entryPort) {
76 // reset waits, update parent port
77 }
78
79 destroy () {
80 // destory waits
81 }
82
83 pause () {
84 this._setState('paused')
85 }
86
87 resume () {
88 this._setState('running')
89 this._runNextMessage()
90 }
91
92 /**
93 * run the kernels code with a given enviroment
94 * The Kernel Stores all of its state in the Environment. The Interface is used
95 * to by the VM to retrive infromation from the Environment.
96 */
97 async _run (message) {
98 // shallow copy
99 const oldState = Object.assign({}, this.state)
100 let result
101 try {
102 result = await this.vm.run(message) || {}
103 } catch (e) {
104 result = {
105 exception: true,
106 exceptionError: e
107 }
108 clearObject(this.state)
109 Object.assign(this.state, oldState)
110 console.log(e)
111 }
112
113 this.emit('result', result)
114 return result
115 }
116
117 // returns a promise that resolves once the kernel hits the threshould tick
118 // count
119 async wait (threshold, fromPort) {
120 if (threshold <= this.ticks) {
121 return this.ticks
122 } else if (this.vmState === 'idle') {
123 return this.ports.wait(threshold, fromPort)
124 } else {
125 return new Promise((resolve, reject) => {
126 this._waitingQueue.add({
127 threshold: threshold,
128 resolve: resolve,
129 from: fromPort
130 })
131 })
132 }
133 }
134
135 incrementTicks (count) {
136 this.ticks += count
137 while (!this._waitingQueue.isEmpty()) {
138 const waiter = this._waitingQueue.peek()
139 if (waiter.threshold > this.ticks) {
140 break
141 } else {
142 this._waitingQueue.poll().resolve(this.ticks)
143 }
144 }
145 }
146
147 async createPort (type, name) {
148 const VM = this.hypervisor._VMs[type]
149 const parentId = this.entryPort ? this.entryPort.id : null
150 let nonce = await this.hypervisor.graph.get(this.state, 'nonce')
151 const portRef = {
152 'messages': [],
153 'id': {
154 '/': {
155 nonce: nonce,
156 parent: parentId
157 }
158 },
159 'type': type,
160 'link': {
161 '/': VM.createState()
162 }
163 }
164
165 // create the port instance
166 await this.ports.set(name, portRef)
167 // incerment the nonce
168 nonce = new BN(nonce)
169 nonce.iaddn(1)
170 this.state['/'].nonce = nonce.toArray()
171 return portRef
172 }
173
174 async send (portRef, message) {
175 try {
176 const portInstance = await this.ports.get(portRef)
177 portInstance.hasSent = true
178 } catch (e) {
179 throw new Error('invalid port referance, which means the port that the port was either moved or destoried')
180 }
181
182 const id = await this.hypervisor.generateID(this.entryPort)
183 message._fromPort = id
184 message._ticks = this.ticks
185
186 const receiverEntryPort = portRef === this.entryPort ? this.parentPort : portRef
187 const vm = await this.hypervisor.getInstance(receiverEntryPort)
188 vm.queue(message)
189 }
190}
191
192function clearObject (myObject) {
193 for (var member in myObject) {
194 delete myObject[member]
195 }
196}
197

Built with git-ssb-web