git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: de08fdeeb9077d790c5ebdec0cfba0dee14b2e0b

Files: de08fdeeb9077d790c5ebdec0cfba0dee14b2e0b / index.js

2899 bytesRaw
1const EventEmitter = require('events')
2const PortManager = require('./portManager.js')
3const codeHandler = require('./codeHandler.js')
4const AtomicMessage = require('primea-message/atomic')
5
6module.exports = class Kernel extends EventEmitter {
7 constructor (opts = {}) {
8 super()
9 // set up the state
10 this.graph = opts.graph
11 this.path = opts.path || ''
12 this.imports = opts.imports
13 const state = this.state = opts.state || {}
14
15 // set up the vm
16 this._vm = (opts.codeHandler || codeHandler).init(opts.code || state)
17 this._vmstate = 'idle'
18
19 // set up ports
20 this.ports = new PortManager({
21 state: state,
22 graph: this.graph,
23 parentPort: opts.parentPort,
24 Kernel: Kernel,
25 imports: this.imports,
26 path: this.path
27 })
28
29 this.ports.on('message', index => {
30 this.runNextMessage(index)
31 })
32 }
33
34 runNextMessage (index = 0) {
35 // load the next message from port space
36 return this.ports.peek(index).then(message => {
37 if (message &&
38 (this._vmstate === 'idle' ||
39 (AtomicMessage.isAtomic(message) && message.isCyclic(this)))) {
40 this._currentMessage = message
41 this.ports.remove(index)
42 return this.run(message)
43 } else {
44 this._vmstate = 'idle'
45 this.emit('idle')
46 }
47 })
48 }
49
50 /**
51 * run the kernels code with a given enviroment
52 * The Kernel Stores all of its state in the Environment. The Interface is used
53 * to by the VM to retrive infromation from the Environment.
54 */
55 async run (message, imports = this.imports) {
56 const self = this
57 function revert (oldState) {
58 // revert the state
59 clearObject(self.state)
60 Object.assign(self.state, oldState)
61 }
62
63 // shallow copy
64 const oldState = Object.assign({}, this.state)
65 let result
66 this._vmstate = 'running'
67 try {
68 result = await this._vm.run(message, this, imports) || {}
69 } catch (e) {
70 result = {
71 exception: true,
72 exceptionError: e
73 }
74 }
75
76 // if we trapped revert all the sent messages
77 if (result.exception) {
78 // revert to the old state
79 revert(oldState)
80 message._reject(result)
81 } else if (AtomicMessage.isAtomic(message) && !message.hasResponded) {
82 message.respond(result)
83 }
84
85 message._committed().then(() => {
86 this.runNextMessage(0)
87 }).catch((e) => {
88 revert(oldState)
89 })
90 return result
91 }
92
93 async send (portName, message) {
94 if (AtomicMessage.isAtomic(message)) {
95 // record that this message has traveled thourgh this kernel. This is used
96 // to detect re-entry
97 message._visited(this, this._currentMessage)
98 }
99 return this.ports.send(portName, message)
100 }
101
102 shutdown () {
103 this.ports.close()
104 }
105}
106
107function clearObject (myObject) {
108 for (var member in myObject) {
109 delete myObject[member]
110 }
111}
112

Built with git-ssb-web