git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 3caa963721b6de8700ae2769edf1ce8f78bc0718

Files: 3caa963721b6de8700ae2769edf1ce8f78bc0718 / kernel.js

4998 bytesRaw
1const PriorityQueue = require('fastpriorityqueue')
2const EventEmitter = require('events')
3const BN = require('bn.js')
4const PortManager = require('./portManager.js')
5
6module.exports = class Kernel extends EventEmitter {
7 constructor (opts) {
8 super()
9 this.state = opts.state
10 this.entryPort = opts.entryPort
11 this.hypervisor = opts.hypervisor
12
13 this.vmState = 'idle'
14 this.ticks = 0
15 // create the port manager
16 this.ports = new PortManager({
17 kernel: this,
18 hypervisor: opts.hypervisor,
19 ports: opts.state.ports,
20 entryPort: opts.entryPort,
21 parentPort: opts.parentPort
22 })
23
24 this.vm = new opts.VM(this)
25 this._waitingQueue = new PriorityQueue((a, b) => {
26 return a.threshold > b.threshold
27 })
28 this.on('result', this._runNextMessage)
29 this.on('idle', () => {
30 while (!this._waitingQueue.isEmpty()) {
31 const waiter = this._waitingQueue.poll()
32 this.wait(waiter.ticks, waiter.from).then(ticks => {
33 waiter.resolve(ticks)
34 })
35 }
36 })
37 }
38
39 start () {
40 return this.ports.start()
41 }
42
43 queue (message) {
44 this.ports.queue(message)
45 if (this.vmState !== 'running') {
46 this._updateVmState('running')
47 this._runNextMessage()
48 }
49 }
50
51 _updateVmState (vmState, message) {
52 this.vmState = vmState
53 this.emit(vmState, message)
54 }
55
56 async _runNextMessage () {
57 // console.log('next message', this.ticks, this.entryPort)
58 const message = await this.ports.getNextMessage()
59 // if the vm is paused and it gets a message; save that message for use when the VM is resumed
60 if (message && this.vmState === 'paused') {
61 this.ports._portMap(message._fromPort).unshfit(message)
62 } else if (!message && this.vmState !== 'paused') {
63 // if no more messages then shut down
64 this._updateVmState('idle')
65 } else {
66 // run the next message
67 this._run(message)
68 }
69 }
70
71 _updateEntryPort (entryPort) {
72 // reset waits, update parent port
73 }
74
75 destroy () {
76 // destory waits
77 }
78
79 pause () {
80 this._setState('paused')
81 }
82
83 resume () {
84 this._setState('running')
85 this._runNextMessage()
86 }
87
88 /**
89 * run the kernels code with a given enviroment
90 * The Kernel Stores all of its state in the Environment. The Interface is used
91 * to by the VM to retrive infromation from the Environment.
92 */
93 async _run (message) {
94 // shallow copy
95 const oldState = Object.assign({}, this.state)
96 let result
97 try {
98 result = await this.vm.run(message) || {}
99 } catch (e) {
100 result = {
101 exception: true,
102 exceptionError: e
103 }
104 clearObject(this.state)
105 Object.assign(this.state, oldState)
106 console.log(e)
107 }
108
109 this.emit('result', result)
110 return result
111 }
112
113 // returns a promise that resolves once the kernel hits the threshould tick
114 // count
115 async wait (threshold, fromPort) {
116 if (threshold <= this.ticks) {
117 return this.ticks
118 } else if (this.vmState === 'idle') {
119 return this.ports.wait(threshold, fromPort)
120 } else {
121 return new Promise((resolve, reject) => {
122 this._waitingQueue.add({
123 threshold: threshold,
124 resolve: resolve,
125 from: fromPort
126 })
127 })
128 }
129 }
130
131 incrementTicks (count) {
132 console.log('update ticks')
133 this.ticks += count
134 while (!this._waitingQueue.isEmpty()) {
135 const waiter = this._waitingQueue.peek()
136 if (waiter.threshold > this.ticks) {
137 break
138 } else {
139 this._waitingQueue.poll().resolve(this.ticks)
140 }
141 }
142 }
143
144 async createPort (type, name) {
145 const VM = this.hypervisor._VMs[type]
146 const parentId = this.entryPort ? this.entryPort.id : null
147 const portRef = {
148 'messages': [],
149 'id': {
150 '/': {
151 nonce: this.state.nonce,
152 parent: parentId
153 }
154 },
155 'type': type,
156 'link': {
157 '/': VM.createState()
158 }
159 }
160
161 // create the port instance
162 await this.ports.set(name, portRef)
163 // incerment the nonce
164 const nonce = new BN(this.state.nonce)
165 nonce.iaddn(1)
166 this.state.nonce = nonce.toArray()
167 return portRef
168 }
169
170 async send (portRef, message) {
171 try {
172 const portInstance = await this.ports.get(portRef)
173 portInstance.hasSent = true
174 } catch (e) {
175 throw new Error('invalid port referance, which means the port that the port was either moved or destoried')
176 }
177
178 const id = await this.hypervisor.generateID(this.entryPort)
179 message._fromPort = id
180 message._ticks = this.ticks
181
182 const receiverEntryPort = portRef === this.entryPort ? this.parentPort : portRef
183 const vm = await this.hypervisor.getInstance(receiverEntryPort)
184 vm.queue(message)
185 if (this.vmState !== 'running') {
186 this._updateVmState('running')
187 this._runNextMessage()
188 }
189 }
190}
191
192function clearObject (myObject) {
193 for (var member in myObject) {
194 delete myObject[member]
195 }
196}
197

Built with git-ssb-web