Files: 5069c93603388ea483de7c8c854b35783f6ec016 / index.js
2712 bytesRaw
1 | const EventEmitter = require('events') |
2 | const PortManager = require('./portManager.js') |
3 | const codeHandler = require('./codeHandler.js') |
4 | |
5 | module.exports = class Kernel extends EventEmitter { |
6 | constructor (opts = {}) { |
7 | super() |
8 | // set up the state |
9 | this.graph = opts.graph |
10 | this.path = opts.path || '' |
11 | this.imports = opts.imports |
12 | const state = this.state = opts.state || {} |
13 | |
14 | // set up the vm |
15 | this._vm = (opts.codeHandler || codeHandler).init(opts.code || state) |
16 | this._vmstate = 'idle' |
17 | |
18 | // set up ports |
19 | this.ports = new PortManager({ |
20 | state: state, |
21 | graph: this.graph, |
22 | parentPort: opts.parentPort, |
23 | Kernel: Kernel, |
24 | imports: this.imports, |
25 | path: this.path |
26 | }) |
27 | |
28 | this.ports.on('message', index => { |
29 | this.runNextMessage(index) |
30 | }) |
31 | } |
32 | |
33 | runNextMessage (index = 0) { |
34 | // load the next message from port space |
35 | return this.ports.peek(index).then(message => { |
36 | if (message && (message.isCyclic(this) || this._vmstate === 'idle')) { |
37 | this._currentMessage = message |
38 | this.ports.remove(index) |
39 | return this.run(message) |
40 | } else { |
41 | this._vmstate = 'idle' |
42 | this.emit('idle') |
43 | } |
44 | }) |
45 | } |
46 | |
47 | /** |
48 | * run the kernels code with a given enviroment |
49 | * The Kernel Stores all of its state in the Environment. The Interface is used |
50 | * to by the VM to retrive infromation from the Environment. |
51 | */ |
52 | async run (message, imports = this.imports) { |
53 | const self = this |
54 | function revert (oldState) { |
55 | // revert the state |
56 | clearObject(self.state) |
57 | Object.assign(self.state, oldState) |
58 | } |
59 | |
60 | // shallow copy |
61 | const oldState = Object.assign({}, this.state) |
62 | let result |
63 | this._vmstate = 'running' |
64 | try { |
65 | result = await this._vm.run(message, this, imports) || {} |
66 | } catch (e) { |
67 | result = { |
68 | exception: true, |
69 | exceptionError: e |
70 | } |
71 | } |
72 | |
73 | // if we trapped revert all the sent messages |
74 | if (result.exception) { |
75 | // revert to the old state |
76 | revert(oldState) |
77 | message.reject(result) |
78 | } else if (!message.hasResponded) { |
79 | message.respond(result) |
80 | } |
81 | |
82 | message.committed().then(() => { |
83 | this.runNextMessage(0) |
84 | }).catch((e) => { |
85 | revert(oldState) |
86 | }) |
87 | return result |
88 | } |
89 | |
90 | async send (message) { |
91 | if (message.atomic) { |
92 | // record that this message has traveled thourgh this kernel. This is used |
93 | // to detect re-entry |
94 | message._visited(this, this._currentMessage) |
95 | } |
96 | return this.ports.send(message) |
97 | } |
98 | |
99 | shutdown () { |
100 | this.ports.close() |
101 | } |
102 | } |
103 | |
104 | function clearObject (myObject) { |
105 | for (var member in myObject) { |
106 | delete myObject[member] |
107 | } |
108 | } |
109 |
Built with git-ssb-web