Files: 60b86f48100a3b8a9db14f54689cabe117acdff3 / index.js
2960 bytesRaw
1 | const EventEmitter = require('events') |
2 | const Vertex = require('merkle-trie') |
3 | const PortManager = require('./portManager.js') |
4 | const StateInterface = require('./stateInterface.js') |
5 | const imports = require('./EVMinterface.js') |
6 | const codeHandler = require('./codeHandler.js') |
7 | |
8 | module.exports = class Kernel extends EventEmitter { |
9 | constructor (opts = {}) { |
10 | super() |
11 | // set up the state |
12 | const state = this.state = opts.state || new Vertex() |
13 | this.stateInterface = new StateInterface(state) |
14 | this.path = state.path |
15 | |
16 | // set up the vm |
17 | this.imports = opts.imports || [imports] |
18 | this._vm = (opts.codeHandler || codeHandler).init(opts.code || state.value) |
19 | this._state = 'idle' |
20 | |
21 | // set up ports |
22 | this.ports = new PortManager(state, opts.parentPort, Kernel) |
23 | this._sentAtomicMessages = [] |
24 | this.ports.on('message', index => { |
25 | this.runNextMessage(index) |
26 | }) |
27 | } |
28 | |
29 | runNextMessage (index = 0) { |
30 | return this.ports.peek(index).then(message => { |
31 | if (message && (message._isCyclic(this) || this._state === 'idle')) { |
32 | this._currentMessage = message |
33 | this.ports.remove(index) |
34 | return this.run(message) |
35 | } else { |
36 | this._state = 'idle' |
37 | this.emit('idle') |
38 | } |
39 | }) |
40 | } |
41 | |
42 | /** |
43 | * run the kernels code with a given enviroment |
44 | * The Kernel Stores all of its state in the Environment. The Interface is used |
45 | * to by the VM to retrive infromation from the Environment. |
46 | */ |
47 | async run (message, imports = this.imports) { |
48 | function revert (oldState) { |
49 | // revert the state |
50 | this.state.set([], oldState) |
51 | // revert all the sent messages |
52 | for (let msg in this._sentAtomicMessages) { |
53 | msg.revert() |
54 | } |
55 | } |
56 | |
57 | const oldState = this.state.copy() |
58 | let result |
59 | this._state = 'running' |
60 | try { |
61 | result = await this._vm.run(message, this, imports) || {} |
62 | } catch (e) { |
63 | result = { |
64 | exception: true, |
65 | exceptionError: e |
66 | } |
67 | } |
68 | |
69 | if (message.atomic) { |
70 | // if we trapped revert all the sent messages |
71 | if (result.execption) { |
72 | // revert to the old state |
73 | revert(oldState) |
74 | } |
75 | message._finish() |
76 | message.result().then(result => { |
77 | if (result.execption) { |
78 | revert() |
79 | } else { |
80 | this.runNextMessage(0) |
81 | } |
82 | }) |
83 | |
84 | if (message.hops === message.to.length || result.exception) { |
85 | message._respond(result) |
86 | } |
87 | } else { |
88 | // non-atomic messages |
89 | this.runNextMessage(0) |
90 | } |
91 | return result |
92 | } |
93 | |
94 | async send (message) { |
95 | if (message.atomic) { |
96 | // record that this message has traveled thourgh this kernel. This is used |
97 | // to detect re-entry |
98 | message._visited(this, this._currentMessage) |
99 | // recoded that this message was sent, so that we can revert it if needed |
100 | this._sentAtomicMessages.push(message) |
101 | } |
102 | return this.ports.send(message) |
103 | } |
104 | |
105 | shutdown () {} |
106 | } |
107 |
Built with git-ssb-web