Files: 3a2129ef21fb40c0212b10d052749c0437d93ffc / kernel.js
3442 bytesRaw
1 | const PriorityQueue = require('fastpriorityqueue') |
2 | const EventEmitter = require('events') |
3 | const BN = require('bn.js') |
4 | const PortManager = require('./portManager.js') |
5 | |
6 | module.exports = class Kernel extends EventEmitter { |
7 | constructor (opts) { |
8 | super() |
9 | this.state = opts.state |
10 | this.parentPort = opts.parentPort |
11 | this.hypervisor = opts.hypervisor |
12 | |
13 | this.vmState = 'idle' |
14 | this.ticks = 0 |
15 | // create the port manager |
16 | this.ports = new PortManager({ |
17 | kernel: this, |
18 | hypervisor: opts.hypervisor, |
19 | ports: opts.state.ports, |
20 | parentPort: opts.parentPort |
21 | }) |
22 | this.vm = new opts.VM(this) |
23 | this._waitingQueue = new PriorityQueue((a, b) => { |
24 | return a.threshold > b.threshold |
25 | }) |
26 | this.on('result', this._runNextMessage) |
27 | this.on('idle', () => { |
28 | while (!this._waitingQueue.isEmpty()) { |
29 | this._waitingQueue.poll().resolve() |
30 | } |
31 | }) |
32 | } |
33 | |
34 | start () { |
35 | return this.ports.start() |
36 | } |
37 | |
38 | queue (message) { |
39 | this.ports.queue(message) |
40 | if (this.vmState === 'idle') { |
41 | this._updateVmState('running') |
42 | this._runNextMessage() |
43 | } |
44 | } |
45 | |
46 | _updateVmState (vmState, message) { |
47 | this.vmState = vmState |
48 | this.emit(vmState, message) |
49 | } |
50 | |
51 | _runNextMessage () { |
52 | this.ports.getNextMessage().then(message => { |
53 | if (message) { |
54 | this._run(message) |
55 | } else { |
56 | this._updateVmState('idle', message) |
57 | } |
58 | }) |
59 | } |
60 | |
61 | /** |
62 | * run the kernels code with a given enviroment |
63 | * The Kernel Stores all of its state in the Environment. The Interface is used |
64 | * to by the VM to retrive infromation from the Environment. |
65 | */ |
66 | async _run (message) { |
67 | // shallow copy |
68 | const oldState = Object.assign({}, this.state) |
69 | let result |
70 | try { |
71 | result = await this.vm.run(message) || {} |
72 | } catch (e) { |
73 | result = { |
74 | exception: true, |
75 | exceptionError: e |
76 | } |
77 | clearObject(this.state) |
78 | Object.assign(this.state, oldState) |
79 | } |
80 | |
81 | this.emit('result', result) |
82 | return result |
83 | } |
84 | |
85 | // returns a promise that resolves once the kernel hits the threshould tick |
86 | // count |
87 | async wait (threshold) { |
88 | return new Promise((resolve, reject) => { |
89 | if (this.vmState === 'idle' || threshold <= this.ticks) { |
90 | resolve(this.ticks) |
91 | } else { |
92 | this._waitingQueue.add({ |
93 | threshold: threshold, |
94 | resolve: resolve |
95 | }) |
96 | } |
97 | }) |
98 | } |
99 | |
100 | incrementTicks (count) { |
101 | this.ticks += count |
102 | while (!this._waitingQueue.isEmpty()) { |
103 | const waiter = this._waitingQueue.peek() |
104 | if (waiter.threshold > this.ticks) { |
105 | break |
106 | } else { |
107 | this._waitingQueue.poll().resolve(this.ticks) |
108 | } |
109 | } |
110 | } |
111 | |
112 | async createPort (manager, type, name) { |
113 | // incerment the nonce |
114 | const nonce = new BN(this.state.nonce) |
115 | nonce.iaddn(1) |
116 | this.state.nonce = nonce.toArray() |
117 | |
118 | let portRef = this.hypervisor.createPort(type, { |
119 | nonce: this.state.nonce, |
120 | parent: this.parentPort.id |
121 | }) |
122 | await manager.set(name, portRef) |
123 | return portRef |
124 | } |
125 | |
126 | getPort (manager, name) { |
127 | return manager.getRef(name) |
128 | } |
129 | |
130 | async send (portRef, message) { |
131 | message._ticks = this.ticks |
132 | const portInstance = await this.ports.get(portRef) |
133 | portInstance.hasSent = true |
134 | return this.hypervisor.send(portRef, message) |
135 | } |
136 | } |
137 | |
138 | function clearObject (myObject) { |
139 | for (var member in myObject) { |
140 | delete myObject[member] |
141 | } |
142 | } |
143 |
Built with git-ssb-web