Files: 4f8ba9e911f6ad620fc6cd59cc15fbfaa1fe384c / kernel.js
3894 bytesRaw
1 | const PriorityQueue = require('fastpriorityqueue') |
2 | const clearObject = require('object-clear') |
3 | const clone = require('clone') |
4 | const BN = require('bn.js') |
5 | const EventEmitter = require('events') |
6 | const PortManager = require('./portManager.js') |
7 | |
8 | module.exports = class Kernel extends EventEmitter { |
9 | constructor (opts) { |
10 | super() |
11 | this.state = opts.state |
12 | this.entryPort = opts.entryPort |
13 | this.hypervisor = opts.hypervisor |
14 | |
15 | this.vmState = 'idle' |
16 | this.ticks = 0 |
17 | |
18 | // create the port manager |
19 | this.ports = new PortManager({ |
20 | kernel: this, |
21 | hypervisor: opts.hypervisor, |
22 | state: opts.state, |
23 | entryPort: opts.entryPort, |
24 | parentPort: opts.parentPort |
25 | }) |
26 | |
27 | this.vm = new opts.VM(this) |
28 | this._waitingQueue = new PriorityQueue((a, b) => { |
29 | return a.threshold > b.threshold |
30 | }) |
31 | |
32 | this.on('result', this._runNextMessage) |
33 | this.on('idle', () => { |
34 | while (!this._waitingQueue.isEmpty()) { |
35 | const waiter = this._waitingQueue.poll() |
36 | waiter.resolve(this.ticks) |
37 | } |
38 | }) |
39 | } |
40 | |
41 | start () { |
42 | return this.ports.start() |
43 | } |
44 | |
45 | queue (message) { |
46 | this.ports.queue(message) |
47 | if (this.vmState !== 'running') { |
48 | this._updateVmState('running') |
49 | this._runNextMessage() |
50 | } |
51 | } |
52 | |
53 | _updateVmState (vmState, message) { |
54 | this.vmState = vmState |
55 | this.emit(vmState, message) |
56 | } |
57 | |
58 | async _runNextMessage () { |
59 | const message = await this.ports.getNextMessage() |
60 | if (message) { |
61 | // run the next message |
62 | this.run(message) |
63 | } else { |
64 | // if no more messages then shut down |
65 | this._updateVmState('idle') |
66 | } |
67 | } |
68 | |
69 | /** |
70 | * run the kernels code with a given enviroment |
71 | * The Kernel Stores all of its state in the Environment. The Interface is used |
72 | * to by the VM to retrive infromation from the Environment. |
73 | */ |
74 | async run (message) { |
75 | const oldState = clone(this.state, false, 3) |
76 | let result |
77 | try { |
78 | result = await this.vm.run(message) || {} |
79 | } catch (e) { |
80 | // revert the state |
81 | clearObject(this.state) |
82 | Object.assign(this.state, oldState) |
83 | |
84 | result = { |
85 | exception: true, |
86 | exceptionError: e |
87 | } |
88 | } |
89 | |
90 | this.emit('result', result) |
91 | return result |
92 | } |
93 | |
94 | // returns a promise that resolves once the kernel hits the threshould tick |
95 | // count |
96 | wait (threshold, fromPort) { |
97 | if (threshold <= this.ticks) { |
98 | return this.ticks |
99 | } else if (this.vmState === 'idle') { |
100 | return this.ports.wait(threshold, fromPort) |
101 | } else { |
102 | return new Promise((resolve, reject) => { |
103 | this._waitingQueue.add({ |
104 | threshold: threshold, |
105 | resolve: resolve, |
106 | from: fromPort |
107 | }) |
108 | }) |
109 | } |
110 | } |
111 | |
112 | incrementTicks (count) { |
113 | this.ticks += count |
114 | while (!this._waitingQueue.isEmpty()) { |
115 | const waiter = this._waitingQueue.peek() |
116 | if (waiter.threshold > this.ticks) { |
117 | break |
118 | } else { |
119 | this._waitingQueue.poll().resolve(this.ticks) |
120 | } |
121 | } |
122 | } |
123 | |
124 | createPort (type, name) { |
125 | const VM = this.hypervisor._VMs[type] |
126 | const parentId = this.entryPort ? this.entryPort.id : null |
127 | let nonce = this.state['/'].nonce |
128 | |
129 | const portRef = { |
130 | 'messages': [], |
131 | 'id': { |
132 | '/': { |
133 | nonce: nonce, |
134 | parent: parentId |
135 | } |
136 | }, |
137 | 'type': type, |
138 | 'link': { |
139 | '/': VM.createState() |
140 | } |
141 | } |
142 | |
143 | // create the port instance |
144 | this.ports.set(name, portRef) |
145 | // incerment the nonce |
146 | nonce = new BN(nonce) |
147 | nonce.iaddn(1) |
148 | this.state['/'].nonce = nonce.toArray() |
149 | return portRef |
150 | } |
151 | |
152 | async send (portRef, message) { |
153 | message._fromPort = this.entryPort |
154 | message._ticks = this.ticks |
155 | |
156 | const receiverEntryPort = portRef === this.entryPort ? this.parentPort : portRef |
157 | const vm = await this.hypervisor.getInstance(receiverEntryPort) |
158 | vm.queue(message) |
159 | } |
160 | } |
161 |
Built with git-ssb-web