Files: 318e88b7d16e694a7a34015a3c1a6b831c91c49e / kernel.js
4003 bytesRaw
1 | const PriorityQueue = require('fastpriorityqueue') |
2 | const clearObject = require('object-clear') |
3 | const clone = require('clone') |
4 | const BN = require('bn.js') |
5 | const EventEmitter = require('events') |
6 | const PortManager = require('./portManager.js') |
7 | |
8 | module.exports = class Kernel extends EventEmitter { |
9 | constructor (opts) { |
10 | super() |
11 | this.state = opts.state |
12 | this.entryPort = opts.entryPort |
13 | this.hypervisor = opts.hypervisor |
14 | |
15 | this.vmState = 'idle' |
16 | this.ticks = 0 |
17 | |
18 | // create the port manager |
19 | this.ports = new PortManager({ |
20 | kernel: this, |
21 | hypervisor: opts.hypervisor, |
22 | state: opts.state, |
23 | entryPort: opts.entryPort, |
24 | parentPort: opts.parentPort |
25 | }) |
26 | |
27 | this.vm = new opts.VM(this) |
28 | this._waitingMap = new Map() |
29 | this._waitingQueue = new PriorityQueue((a, b) => { |
30 | return a.threshold < b.threshold |
31 | }) |
32 | |
33 | this.on('result', this._runNextMessage) |
34 | this.on('idle', () => { |
35 | while (!this._waitingQueue.isEmpty()) { |
36 | const waiter = this._waitingQueue.poll() |
37 | waiter.resolve(this.ticks) |
38 | } |
39 | }) |
40 | } |
41 | |
42 | start () { |
43 | return this.ports.start() |
44 | } |
45 | |
46 | queue (message) { |
47 | this.ports.queue(message) |
48 | if (this.vmState !== 'running') { |
49 | this._updateVmState('running') |
50 | this._runNextMessage() |
51 | } |
52 | } |
53 | |
54 | _updateVmState (vmState, message) { |
55 | this.vmState = vmState |
56 | this.emit(vmState, message) |
57 | } |
58 | |
59 | async _runNextMessage () { |
60 | const message = await this.ports.getNextMessage() |
61 | if (message) { |
62 | // run the next message |
63 | this.run(message) |
64 | } else { |
65 | // if no more messages then shut down |
66 | this._updateVmState('idle') |
67 | } |
68 | } |
69 | |
70 | /** |
71 | * run the kernels code with a given enviroment |
72 | * The Kernel Stores all of its state in the Environment. The Interface is used |
73 | * to by the VM to retrive infromation from the Environment. |
74 | */ |
75 | async run (message) { |
76 | const oldState = clone(this.state, false, 3) |
77 | let result |
78 | try { |
79 | result = await this.vm.run(message) || {} |
80 | } catch (e) { |
81 | // revert the state |
82 | clearObject(this.state) |
83 | Object.assign(this.state, oldState) |
84 | |
85 | result = { |
86 | exception: true, |
87 | exceptionError: e |
88 | } |
89 | } |
90 | |
91 | this.emit('result', result) |
92 | return result |
93 | } |
94 | |
95 | // returns a promise that resolves once the kernel hits the threshould tick |
96 | // count |
97 | wait (threshold, fromPort) { |
98 | if (threshold <= this.ticks) { |
99 | return this.ticks |
100 | } else if (this.vmState === 'idle') { |
101 | return this.ports.wait(threshold, fromPort) |
102 | } else { |
103 | const promise = new Promise((resolve, reject) => { |
104 | this._waitingQueue.add({ |
105 | threshold: threshold, |
106 | resolve: resolve, |
107 | from: fromPort |
108 | }) |
109 | }) |
110 | this._waitingMap.set(fromPort, promise) |
111 | return promise |
112 | } |
113 | } |
114 | |
115 | incrementTicks (count) { |
116 | this.ticks += count |
117 | while (!this._waitingQueue.isEmpty()) { |
118 | const waiter = this._waitingQueue.peek() |
119 | if (waiter.threshold > this.ticks) { |
120 | break |
121 | } else { |
122 | this._waitingQueue.poll().resolve(this.ticks) |
123 | } |
124 | } |
125 | } |
126 | |
127 | createPort (type, name) { |
128 | const VM = this.hypervisor._VMs[type] |
129 | const parentId = this.entryPort ? this.entryPort.id : null |
130 | let nonce = this.state['/'].nonce |
131 | |
132 | const portRef = { |
133 | 'messages': [], |
134 | 'id': { |
135 | '/': { |
136 | nonce: nonce, |
137 | parent: parentId |
138 | } |
139 | }, |
140 | 'type': type, |
141 | 'link': { |
142 | '/': VM.createState() |
143 | } |
144 | } |
145 | |
146 | // create the port instance |
147 | this.ports.set(name, portRef) |
148 | // incerment the nonce |
149 | nonce = new BN(nonce) |
150 | nonce.iaddn(1) |
151 | this.state['/'].nonce = nonce.toArray() |
152 | return portRef |
153 | } |
154 | |
155 | async send (portRef, message) { |
156 | message._fromPort = this.entryPort |
157 | message._ticks = this.ticks |
158 | |
159 | const receiverEntryPort = portRef === this.entryPort ? this.parentPort : portRef |
160 | const vm = await this.hypervisor.getInstance(receiverEntryPort) |
161 | vm.queue(message) |
162 | } |
163 | } |
164 |
Built with git-ssb-web