Files: 3679fc587de81f2895a4acc725caff62e8e644f0 / index.js
2889 bytesRaw
1 | const EventEmitter = require('events') |
2 | const Vertex = require('merkle-trie') |
3 | const PortManager = require('./portManager.js') |
4 | const codeHandler = require('./codeHandler.js') |
5 | |
6 | module.exports = class Kernel extends EventEmitter { |
7 | constructor (opts = {}) { |
8 | super() |
9 | // set up the state |
10 | const state = this.state = opts.state || new Vertex() |
11 | this.path = state.path |
12 | |
13 | // set up the vm |
14 | this.imports = opts.imports |
15 | this._vm = (opts.codeHandler || codeHandler).init(opts.code || state.value) |
16 | this._vmstate = 'idle' |
17 | |
18 | // set up ports |
19 | this.ports = new PortManager(state, opts.parentPort, Kernel, this.imports) |
20 | this.ports.on('message', index => { |
21 | this.runNextMessage(index) |
22 | }) |
23 | this._sentAtomicMessages = [] |
24 | } |
25 | |
26 | runNextMessage (index = 0) { |
27 | // load the next message from port space |
28 | return this.ports.peek(index).then(message => { |
29 | if (message && (message._isCyclic(this) || this._vmstate === 'idle')) { |
30 | this._currentMessage = message |
31 | this.ports.remove(index) |
32 | return this.run(message) |
33 | } else { |
34 | this._vmstate = 'idle' |
35 | this.emit('idle') |
36 | } |
37 | }) |
38 | } |
39 | |
40 | /** |
41 | * run the kernels code with a given enviroment |
42 | * The Kernel Stores all of its state in the Environment. The Interface is used |
43 | * to by the VM to retrive infromation from the Environment. |
44 | */ |
45 | async run (message, imports = this.imports) { |
46 | function revert (oldState) { |
47 | // revert the state |
48 | this.state.set([], oldState) |
49 | // revert all the sent messages |
50 | for (let msg in this._sentAtomicMessages) { |
51 | msg.revert() |
52 | } |
53 | } |
54 | |
55 | const oldState = this.state.copy() |
56 | let result |
57 | this._vmstate = 'running' |
58 | try { |
59 | result = await this._vm.run(message, this, imports) || {} |
60 | } catch (e) { |
61 | result = { |
62 | exception: true, |
63 | exceptionError: e |
64 | } |
65 | } |
66 | |
67 | if (message.atomic) { |
68 | // if we trapped revert all the sent messages |
69 | if (result.execption) { |
70 | // revert to the old state |
71 | revert(oldState) |
72 | } |
73 | message._finish() |
74 | message.result().then(result => { |
75 | if (result.execption) { |
76 | revert() |
77 | } else { |
78 | this.runNextMessage(0) |
79 | } |
80 | }) |
81 | |
82 | if (message.hops === message.to.length || result.exception) { |
83 | message._respond(result) |
84 | } |
85 | } else { |
86 | // non-atomic messages |
87 | this.runNextMessage(0) |
88 | } |
89 | return result |
90 | } |
91 | |
92 | async send (message) { |
93 | if (message.atomic) { |
94 | // record that this message has traveled thourgh this kernel. This is used |
95 | // to detect re-entry |
96 | message._visited(this, this._currentMessage) |
97 | // recoded that this message was sent, so that we can revert it if needed |
98 | this._sentAtomicMessages.push(message) |
99 | } |
100 | return this.ports.send(message) |
101 | } |
102 | |
103 | shutdown () { |
104 | this.ports.close() |
105 | } |
106 | } |
107 |
Built with git-ssb-web