git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: e0d0380c051f78c76923c1d39111ee94896e3a8d

Files: e0d0380c051f78c76923c1d39111ee94896e3a8d / kernel.js

4549 bytesRaw
1const PriorityQueue = require('fastpriorityqueue')
2const EventEmitter = require('events')
3const BN = require('bn.js')
4const PortManager = require('./portManager.js')
5
6module.exports = class Kernel extends EventEmitter {
7 constructor (opts) {
8 super()
9 this.state = opts.state
10 this.entryPort = opts.entryPort
11 this.hypervisor = opts.hypervisor
12
13 this.vmState = 'idle'
14 this.ticks = 0
15 // create the port manager
16 this.ports = new PortManager({
17 kernel: this,
18 hypervisor: opts.hypervisor,
19 state: opts.state,
20 entryPort: opts.entryPort,
21 parentPort: opts.parentPort
22 })
23
24 this.vm = new opts.VM(this)
25 this._waitingQueue = new PriorityQueue((a, b) => {
26 return a.threshold > b.threshold
27 })
28 this.on('result', this._runNextMessage)
29 this.on('idle', () => {
30 while (!this._waitingQueue.isEmpty()) {
31 const waiter = this._waitingQueue.poll()
32 waiter.resolve(this.ticks)
33 }
34 })
35 }
36
37 start () {
38 return this.ports.start()
39 }
40
41 async queue (message) {
42 await this.ports.queue(message)
43 if (this.vmState !== 'running') {
44 this._updateVmState('running')
45 this._runNextMessage()
46 }
47 }
48
49 _updateVmState (vmState, message) {
50 this.vmState = vmState
51 this.emit(vmState, message)
52 }
53
54 async _runNextMessage () {
55 await this.hypervisor.graph.get(this.entryPort, 'id')
56 const message = await this.ports.getNextMessage()
57 // if the vm is paused and it gets a message; save that message for use when the VM is resumed
58 if (message && this.vmState === 'paused') {
59 this.ports._portMap(message._fromPort).unshfit(message)
60 } else if (!message && this.vmState !== 'paused') {
61 // if no more messages then shut down
62 this._updateVmState('idle')
63 } else {
64 // run the next message
65 this._run(message)
66 }
67 }
68
69 _updateEntryPort (entryPort) {
70 // reset waits, update parent port
71 }
72
73 destroy () {
74 // destory waits
75 }
76
77 pause () {
78 this._setState('paused')
79 }
80
81 resume () {
82 this._setState('running')
83 this._runNextMessage()
84 }
85
86 /**
87 * run the kernels code with a given enviroment
88 * The Kernel Stores all of its state in the Environment. The Interface is used
89 * to by the VM to retrive infromation from the Environment.
90 */
91 async _run (message) {
92 // shallow copy
93 const oldState = Object.assign({}, this.state)
94 let result
95 try {
96 result = await this.vm.run(message) || {}
97 } catch (e) {
98 result = {
99 exception: true,
100 exceptionError: e
101 }
102 clearObject(this.state)
103 Object.assign(this.state, oldState)
104 console.log(e)
105 }
106
107 this.emit('result', result)
108 return result
109 }
110
111 // returns a promise that resolves once the kernel hits the threshould tick
112 // count
113 async wait (threshold, fromPort) {
114 if (threshold <= this.ticks) {
115 return this.ticks
116 } else if (this.vmState === 'idle') {
117 return this.ports.wait(threshold, fromPort)
118 } else {
119 return new Promise((resolve, reject) => {
120 this._waitingQueue.add({
121 threshold: threshold,
122 resolve: resolve,
123 from: fromPort
124 })
125 })
126 }
127 }
128
129 incrementTicks (count) {
130 this.ticks += count
131 while (!this._waitingQueue.isEmpty()) {
132 const waiter = this._waitingQueue.peek()
133 if (waiter.threshold > this.ticks) {
134 break
135 } else {
136 this._waitingQueue.poll().resolve(this.ticks)
137 }
138 }
139 }
140
141 async createPort (type, name) {
142 const VM = this.hypervisor._VMs[type]
143 const parentId = this.entryPort ? this.entryPort.id : null
144 let nonce = await this.hypervisor.graph.get(this.state, 'nonce')
145 const portRef = {
146 'messages': [],
147 'id': {
148 '/': {
149 nonce: nonce,
150 parent: parentId
151 }
152 },
153 'type': type,
154 'link': {
155 '/': VM.createState()
156 }
157 }
158
159 // create the port instance
160 await this.ports.set(name, portRef)
161 // incerment the nonce
162 nonce = new BN(nonce)
163 nonce.iaddn(1)
164 this.state['/'].nonce = nonce.toArray()
165 return portRef
166 }
167
168 async send (portRef, message) {
169 message._fromPort = this.entryPort
170 message._ticks = this.ticks
171
172 const receiverEntryPort = portRef === this.entryPort ? this.parentPort : portRef
173 const vm = await this.hypervisor.getInstance(receiverEntryPort)
174 return vm.queue(message)
175 }
176}
177
178function clearObject (myObject) {
179 for (var member in myObject) {
180 delete myObject[member]
181 }
182}
183

Built with git-ssb-web