Files: 1083d2c0b8a6bbe729c7484eea2f03a5b54da970 / kernel.js
3170 bytesRaw
1 | const clearObject = require('object-clear') |
2 | const clone = require('clone') |
3 | const EventEmitter = require('events') |
4 | const PortManager = require('./portManager.js') |
5 | |
6 | module.exports = class Kernel extends EventEmitter { |
7 | constructor (opts) { |
8 | super() |
9 | this.state = opts.state |
10 | this.entryPort = opts.entryPort |
11 | this.hypervisor = opts.hypervisor |
12 | |
13 | this.vmState = 'idle' |
14 | this.ticks = 0 |
15 | |
16 | // create the port manager |
17 | this.ports = new PortManager({ |
18 | kernel: this, |
19 | hypervisor: opts.hypervisor, |
20 | state: opts.state, |
21 | entryPort: opts.entryPort, |
22 | parentPort: opts.parentPort |
23 | }) |
24 | |
25 | this.vm = new opts.VM(this) |
26 | this._waitingMap = new Map() |
27 | |
28 | this.on('result', this._runNextMessage) |
29 | this.on('idle', () => { |
30 | for (const [, waiter] of this._waitingMap) { |
31 | waiter.resolve(this.ticks) |
32 | } |
33 | }) |
34 | } |
35 | |
36 | start () { |
37 | return this.ports.start() |
38 | } |
39 | |
40 | queue (message) { |
41 | message._hops++ |
42 | this.ports.queue(message) |
43 | if (this.vmState !== 'running') { |
44 | this._updateVmState('running') |
45 | this._runNextMessage() |
46 | } |
47 | } |
48 | |
49 | _updateVmState (vmState, message) { |
50 | this.vmState = vmState |
51 | this.emit(vmState, message) |
52 | } |
53 | |
54 | async _runNextMessage () { |
55 | const message = await this.ports.getNextMessage() |
56 | if (message) { |
57 | // run the next message |
58 | this.run(message) |
59 | } else { |
60 | // if no more messages then shut down |
61 | this._updateVmState('idle') |
62 | } |
63 | } |
64 | |
65 | /** |
66 | * run the kernels code with a given enviroment |
67 | * The Kernel Stores all of its state in the Environment. The Interface is used |
68 | * to by the VM to retrive infromation from the Environment. |
69 | */ |
70 | async run (message) { |
71 | const oldState = clone(this.state, false, 3) |
72 | let result |
73 | try { |
74 | result = await this.vm.run(message) || {} |
75 | } catch (e) { |
76 | // revert the state |
77 | clearObject(this.state) |
78 | Object.assign(this.state, oldState) |
79 | |
80 | result = { |
81 | exception: true, |
82 | exceptionError: e |
83 | } |
84 | } |
85 | |
86 | this.emit('result', result) |
87 | return result |
88 | } |
89 | |
90 | // returns a promise that resolves once the kernel hits the threshould tick |
91 | // count |
92 | wait (threshold, fromPort) { |
93 | if (threshold <= this.ticks) { |
94 | return this.ticks |
95 | } else if (this.vmState === 'idle') { |
96 | return this.ports.wait(threshold, fromPort) |
97 | } else { |
98 | return new Promise((resolve, reject) => { |
99 | this._waitingMap.set(fromPort, { |
100 | threshold: threshold, |
101 | resolve: resolve, |
102 | from: fromPort |
103 | }) |
104 | }) |
105 | } |
106 | } |
107 | |
108 | incrementTicks (count) { |
109 | this.ticks += count |
110 | for (const [fromPort, waiter] of this._waitingMap) { |
111 | if (waiter.threshold < this.ticks) { |
112 | this._waitingMap.delete(fromPort) |
113 | waiter.resolve(this.ticks) |
114 | } |
115 | } |
116 | } |
117 | |
118 | async send (portRef, message) { |
119 | message._fromPort = this.entryPort |
120 | message._fromPortTicks = this.ticks |
121 | |
122 | const vm = await this.hypervisor.getInstance(portRef, true, this.entryPort) |
123 | vm.queue(message) |
124 | |
125 | const waiter = this._waitingMap.get(portRef) |
126 | if (waiter) { |
127 | waiter.resolve(this.ticks) |
128 | this._waitingMap.delete(portRef) |
129 | } |
130 | } |
131 | } |
132 |
Built with git-ssb-web