Files: f53952e52386255d9558c848de3deb7c976779cd / kernel.js
3896 bytesRaw
1 | const PriorityQueue = require('fastpriorityqueue') |
2 | const clearObject = require('object-clear') |
3 | const clone = require('clone') |
4 | const BN = require('bn.js') |
5 | const EventEmitter = require('events') |
6 | const PortManager = require('./portManager.js') |
7 | |
8 | module.exports = class Kernel extends EventEmitter { |
9 | constructor (opts) { |
10 | super() |
11 | this.state = opts.state |
12 | this.entryPort = opts.entryPort |
13 | this.hypervisor = opts.hypervisor |
14 | |
15 | this.vmState = 'idle' |
16 | this.ticks = 0 |
17 | |
18 | // create the port manager |
19 | this.ports = new PortManager({ |
20 | kernel: this, |
21 | hypervisor: opts.hypervisor, |
22 | state: opts.state, |
23 | entryPort: opts.entryPort, |
24 | parentPort: opts.parentPort |
25 | }) |
26 | |
27 | this.vm = new opts.VM(this) |
28 | this._waitingMap = new Map() |
29 | |
30 | this.on('result', this._runNextMessage) |
31 | this.on('idle', () => { |
32 | for (const [, waiter] of this._waitingMap) { |
33 | waiter.resolve(this.ticks) |
34 | } |
35 | }) |
36 | } |
37 | |
38 | start () { |
39 | return this.ports.start() |
40 | } |
41 | |
42 | queue (message) { |
43 | this.ports.queue(message) |
44 | if (this.vmState !== 'running') { |
45 | this._updateVmState('running') |
46 | this._runNextMessage() |
47 | } |
48 | } |
49 | |
50 | _updateVmState (vmState, message) { |
51 | this.vmState = vmState |
52 | this.emit(vmState, message) |
53 | } |
54 | |
55 | async _runNextMessage () { |
56 | const message = await this.ports.getNextMessage() |
57 | if (message) { |
58 | // run the next message |
59 | this.run(message) |
60 | } else { |
61 | // if no more messages then shut down |
62 | this._updateVmState('idle') |
63 | } |
64 | } |
65 | |
66 | /** |
67 | * run the kernels code with a given enviroment |
68 | * The Kernel Stores all of its state in the Environment. The Interface is used |
69 | * to by the VM to retrive infromation from the Environment. |
70 | */ |
71 | async run (message) { |
72 | const oldState = clone(this.state, false, 3) |
73 | let result |
74 | try { |
75 | result = await this.vm.run(message) || {} |
76 | } catch (e) { |
77 | // revert the state |
78 | clearObject(this.state) |
79 | Object.assign(this.state, oldState) |
80 | |
81 | result = { |
82 | exception: true, |
83 | exceptionError: e |
84 | } |
85 | } |
86 | |
87 | this.emit('result', result) |
88 | return result |
89 | } |
90 | |
91 | // returns a promise that resolves once the kernel hits the threshould tick |
92 | // count |
93 | wait (threshold, fromPort) { |
94 | if (threshold <= this.ticks) { |
95 | return this.ticks |
96 | } else if (this.vmState === 'idle') { |
97 | return this.ports.wait(threshold, fromPort) |
98 | } else { |
99 | return new Promise((resolve, reject) => { |
100 | this._waitingMap.set(fromPort, { |
101 | threshold: threshold, |
102 | resolve: resolve, |
103 | from: fromPort |
104 | }) |
105 | }) |
106 | } |
107 | } |
108 | |
109 | incrementTicks (count) { |
110 | this.ticks += count |
111 | for (const [fromPort, waiter] in this._waitingMap) { |
112 | if (waiter.threshold < this.ticks) { |
113 | this._waitingMap.delete(fromPort) |
114 | waiter.resolve(this.ticks) |
115 | } |
116 | } |
117 | } |
118 | |
119 | createPort (type, name) { |
120 | const VM = this.hypervisor._VMs[type] |
121 | const parentId = this.entryPort ? this.entryPort.id : null |
122 | let nonce = this.state['/'].nonce |
123 | |
124 | const portRef = { |
125 | 'messages': [], |
126 | 'id': { |
127 | '/': { |
128 | nonce: nonce, |
129 | parent: parentId |
130 | } |
131 | }, |
132 | 'type': type, |
133 | 'link': { |
134 | '/': VM.createState() |
135 | } |
136 | } |
137 | |
138 | // create the port instance |
139 | this.ports.set(name, portRef) |
140 | // incerment the nonce |
141 | nonce = new BN(nonce) |
142 | nonce.iaddn(1) |
143 | this.state['/'].nonce = nonce.toArray() |
144 | return portRef |
145 | } |
146 | |
147 | async send (portRef, message) { |
148 | message._fromPort = this.entryPort |
149 | message._ticks = this.ticks |
150 | |
151 | const receiverEntryPort = portRef === this.entryPort ? this.parentPort : portRef |
152 | const vm = await this.hypervisor.getInstance(receiverEntryPort) |
153 | vm.queue(message) |
154 | |
155 | const waiter = this._waitingMap.get(portRef) |
156 | if (waiter) { |
157 | waiter.resolve(this.ticks) |
158 | this._waitingMap.delete(portRef) |
159 | } |
160 | } |
161 | } |
162 |
Built with git-ssb-web