git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 5f6b460515d272f0c4dcfce10ff08f1efd953318

Files: 5f6b460515d272f0c4dcfce10ff08f1efd953318 / kernel.js

5085 bytesRaw
1const PriorityQueue = require('fastpriorityqueue')
2const EventEmitter = require('events')
3const BN = require('bn.js')
4const PortManager = require('./portManager.js')
5
6module.exports = class Kernel extends EventEmitter {
7 constructor (opts) {
8 super()
9 this.state = opts.state
10 this.entryPort = opts.entryPort
11 this.hypervisor = opts.hypervisor
12
13 this.vmState = 'idle'
14 this.ticks = 0
15 // create the port manager
16 this.ports = new PortManager({
17 kernel: this,
18 hypervisor: opts.hypervisor,
19 ports: opts.state.ports,
20 entryPort: opts.entryPort,
21 parentPort: opts.parentPort
22 })
23
24 this.vm = new opts.VM(this)
25 this._waitingQueue = new PriorityQueue((a, b) => {
26 return a.threshold > b.threshold
27 })
28 this.on('result', this._runNextMessage)
29 this.on('idle', () => {
30 while (!this._waitingQueue.isEmpty()) {
31 const waiter = this._waitingQueue.poll()
32 this.wait(waiter.ticks, waiter.from).then(ticks => {
33 waiter.resolve(ticks)
34 })
35 }
36 })
37 }
38
39 start () {
40 return this.ports.start()
41 }
42
43 queue (message) {
44 this.ports.queue(message)
45 if (this.vmState !== 'running') {
46 this._updateVmState('running')
47 this._runNextMessage()
48 }
49 }
50
51 _updateVmState (vmState, message) {
52 this.vmState = vmState
53 this.emit(vmState, message)
54 }
55
56 async _runNextMessage () {
57 // console.log('next message', this.ticks, this.entryPort)
58 const message = await this.ports.getNextMessage()
59 // if the vm is paused and it gets a message; save that message for use when the VM is resumed
60 if (message && this.vmState === 'paused') {
61 this.ports._portMap(message._fromPort).unshfit(message)
62 } else if (!message && this.vmState !== 'paused') {
63 // if no more messages then shut down
64 this._updateVmState('idle')
65 } else {
66 // run the next message
67 this._run(message)
68 }
69 }
70
71 _updateEntryPort (entryPort) {
72 // reset waits, update parent port
73 }
74
75 destroy () {
76 // destory waits
77 }
78
79 pause () {
80 this._setState('paused')
81 }
82
83 resume () {
84 this._setState('running')
85 this._runNextMessage()
86 }
87
88 /**
89 * run the kernels code with a given enviroment
90 * The Kernel Stores all of its state in the Environment. The Interface is used
91 * to by the VM to retrive infromation from the Environment.
92 */
93 async _run (message) {
94 // shallow copy
95 const oldState = Object.assign({}, this.state)
96 let result
97 try {
98 result = await this.vm.run(message) || {}
99 } catch (e) {
100 result = {
101 exception: true,
102 exceptionError: e
103 }
104 clearObject(this.state)
105 Object.assign(this.state, oldState)
106 console.log(e)
107 }
108
109 this.emit('result', result)
110 return result
111 }
112
113 // returns a promise that resolves once the kernel hits the threshould tick
114 // count
115 async wait (threshold, fromPort) {
116 console.log('wait', threshold, fromPort, this.ticks, this.vmState, this.entryPort)
117 if (threshold <= this.ticks) {
118 return this.ticks
119 } else if (this.vmState === 'idle') {
120 return this.ports.wait(threshold, fromPort)
121 } else {
122 return new Promise((resolve, reject) => {
123 this._waitingQueue.add({
124 threshold: threshold,
125 resolve: resolve,
126 from: fromPort
127 })
128 })
129 }
130 }
131
132 incrementTicks (count) {
133 console.log('update ticks')
134 this.ticks += count
135 while (!this._waitingQueue.isEmpty()) {
136 const waiter = this._waitingQueue.peek()
137 if (waiter.threshold > this.ticks) {
138 break
139 } else {
140 this._waitingQueue.poll().resolve(this.ticks)
141 }
142 }
143 }
144
145 async createPort (type, name) {
146 const VM = this.hypervisor._VMs[type]
147 const parentId = this.entryPort ? this.entryPort.id : null
148 const portRef = {
149 'messages': [],
150 'id': {
151 '/': {
152 nonce: this.state.nonce,
153 parent: parentId
154 }
155 },
156 'type': type,
157 'link': {
158 '/': VM.createState()
159 }
160 }
161
162 // create the port instance
163 await this.ports.set(name, portRef)
164 // incerment the nonce
165 const nonce = new BN(this.state.nonce)
166 nonce.iaddn(1)
167 this.state.nonce = nonce.toArray()
168 return portRef
169 }
170
171 async send (portRef, message) {
172 try {
173 const portInstance = await this.ports.get(portRef)
174 portInstance.hasSent = true
175 } catch (e) {
176 throw new Error('invalid port referance, which means the port that the port was either moved or destoried')
177 }
178
179 const id = await this.hypervisor.generateID(this.entryPort)
180 message._fromPort = id
181 message._ticks = this.ticks
182
183 const receiverEntryPort = portRef === this.entryPort ? this.parentPort : portRef
184 const vm = await this.hypervisor.getInstance(receiverEntryPort)
185 vm.queue(message)
186 if (this.vmState !== 'running') {
187 this._updateVmState('running')
188 this._runNextMessage()
189 }
190 }
191}
192
193function clearObject (myObject) {
194 for (var member in myObject) {
195 delete myObject[member]
196 }
197}
198

Built with git-ssb-web