Files: 3d425eb5e60c74393a2ae33abb20a5df794791b3 / index.js
2899 bytesRaw
1 | const EventEmitter = require('events') |
2 | const PortManager = require('./portManager.js') |
3 | const codeHandler = require('./codeHandler.js') |
4 | const AtomicMessage = require('primea-message/atomic') |
5 | |
6 | module.exports = class Kernel extends EventEmitter { |
7 | constructor (opts = {}) { |
8 | super() |
9 | // set up the state |
10 | this.graph = opts.graph |
11 | this.path = opts.path || '' |
12 | this.imports = opts.imports |
13 | const state = this.state = opts.state || {} |
14 | |
15 | // set up the vm |
16 | this._vm = (opts.codeHandler || codeHandler).init(opts.code || state) |
17 | this._vmstate = 'idle' |
18 | |
19 | // set up ports |
20 | this.ports = new PortManager({ |
21 | state: state, |
22 | graph: this.graph, |
23 | parentPort: opts.parentPort, |
24 | Kernel: Kernel, |
25 | imports: this.imports, |
26 | path: this.path |
27 | }) |
28 | |
29 | this.ports.on('message', index => { |
30 | this.runNextMessage(index) |
31 | }) |
32 | } |
33 | |
34 | runNextMessage (index = 0) { |
35 | // load the next message from port space |
36 | return this.ports.peek(index).then(message => { |
37 | if (message && |
38 | (this._vmstate === 'idle' || |
39 | (AtomicMessage.isAtomic(message) && message.isCyclic(this)))) { |
40 | this._currentMessage = message |
41 | this.ports.remove(index) |
42 | return this.run(message) |
43 | } else { |
44 | this._vmstate = 'idle' |
45 | this.emit('idle') |
46 | } |
47 | }) |
48 | } |
49 | |
50 | /** |
51 | * run the kernels code with a given enviroment |
52 | * The Kernel Stores all of its state in the Environment. The Interface is used |
53 | * to by the VM to retrive infromation from the Environment. |
54 | */ |
55 | async run (message, imports = this.imports) { |
56 | const self = this |
57 | function revert (oldState) { |
58 | // revert the state |
59 | clearObject(self.state) |
60 | Object.assign(self.state, oldState) |
61 | } |
62 | |
63 | // shallow copy |
64 | const oldState = Object.assign({}, this.state) |
65 | let result |
66 | this._vmstate = 'running' |
67 | try { |
68 | result = await this._vm.run(message, this, imports) || {} |
69 | } catch (e) { |
70 | result = { |
71 | exception: true, |
72 | exceptionError: e |
73 | } |
74 | } |
75 | |
76 | // if we trapped revert all the sent messages |
77 | if (result.exception) { |
78 | // revert to the old state |
79 | revert(oldState) |
80 | message._reject(result) |
81 | } else if (AtomicMessage.isAtomic(message) && !message.hasResponded) { |
82 | message.respond(result) |
83 | } |
84 | |
85 | message._committed().then(() => { |
86 | this.runNextMessage(0) |
87 | }).catch((e) => { |
88 | revert(oldState) |
89 | }) |
90 | return result |
91 | } |
92 | |
93 | async send (portName, message) { |
94 | if (AtomicMessage.isAtomic(message)) { |
95 | // record that this message has traveled thourgh this kernel. This is used |
96 | // to detect re-entry |
97 | message._visited(this, this._currentMessage) |
98 | } |
99 | return this.ports.send(portName, message) |
100 | } |
101 | |
102 | shutdown () { |
103 | this.ports.close() |
104 | } |
105 | } |
106 | |
107 | function clearObject (myObject) { |
108 | for (var member in myObject) { |
109 | delete myObject[member] |
110 | } |
111 | } |
112 |
Built with git-ssb-web