git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 016c6df49758e5db4d0f3caaf955ad714e137afc

Files: 016c6df49758e5db4d0f3caaf955ad714e137afc / kernel.js

4926 bytesRaw
1const Message = require('primea-message')
2const BN = require('bn.js')
3const PortManager = require('./portManager.js')
4const DeleteMessage = require('./deleteMessage')
5
6module.exports = class Kernel {
7 /**
8 * the Kernel manages the varous message passing functions and provides
9 * an interface for the containers to use
10 * @param {Object} opts
11 * @param {Object} opts.id - the UUID of the Kernel
12 * @param {Object} opts.state - the state of the container
13 * @param {Object} opts.hypervisor
14 * @param {Object} opts.container - the container constuctor and argments
15 */
16 constructor (opts) {
17 this.state = opts.state
18 this.hypervisor = opts.hypervisor
19 this.id = opts.id
20 this.container = new opts.container.Constructor(this, opts.container.args)
21
22 this.ticks = 0
23 this.containerState = 'idle'
24
25 // create the port manager
26 this.ports = new PortManager(Object.assign({
27 kernel: this
28 }, opts))
29 }
30
31 /**
32 * adds a message to this containers message queue
33 * @param {string} portName
34 * @param {object} message
35 */
36 queue (portName, message) {
37 this.ports.queue(portName, message)
38 if (this.containerState !== 'running') {
39 this.containerState = 'running'
40 return this._runNextMessage()
41 }
42 }
43
44 initialize (message) {
45 this.containerState = 'running'
46 return this.run(message, 'initialize')
47 }
48
49 // waits for the next message
50 async _runNextMessage () {
51 // check if the ports are saturated, if so we don't have to wait on the
52 // scheduler
53 const message = await this.ports.getNextMessage()
54
55 if (message) {
56 message.fromPort.messages.shift()
57 // if the message we recived had more ticks then we currently have the
58 // update it
59 if (message._fromTicks > this.ticks) {
60 this.ticks = message._fromTicks
61 this.hypervisor.scheduler.update(this)
62 }
63 // run the next message
64 return this.run(message)
65 } else {
66 // if no more messages then shut down
67 this.hypervisor.scheduler.done(this.id)
68 }
69 }
70
71 /**
72 * run the kernels code with a given enviroment
73 * @param {object} message - the message to run
74 * @param {boolean} init - whether or not to run the intialization routine
75 * @returns {Promise}
76 */
77 async run (message, method = 'run') {
78 let result
79
80 const responsePort = message.responsePort
81 delete message.responsePort
82
83 this.ports.addReceivedPorts(message)
84 message._hops++
85
86 if (message.constructor === DeleteMessage) {
87 this.ports._delete(message.fromName)
88 } else {
89 try {
90 result = await this.container[method](message) || {}
91 } catch (e) {
92 result = {
93 exception: true,
94 exceptionError: e
95 }
96 }
97 }
98
99 if (responsePort) {
100 this.send(responsePort, new Message({
101 data: result
102 }))
103 this.ports._unboundPorts.add(responsePort)
104 }
105
106 this.ports.clearUnboundedPorts()
107 return this._runNextMessage()
108 }
109
110 getResponsePort (message) {
111 if (message.responsePort) {
112 return message.responsePort.destPort
113 } else {
114 const [portRef1, portRef2] = this.ports.createChannel()
115 message.responsePort = portRef2
116 this.ports._unboundPorts.delete(portRef2)
117 return portRef1
118 }
119 }
120
121 /**
122 * updates the number of ticks that the container has run
123 * @param {Number} count - the number of ticks to add
124 */
125 incrementTicks (count) {
126 this.ticks += count
127 this.hypervisor.scheduler.update(this)
128 }
129
130 /**
131 * creates a new message
132 * @param {*} data
133 */
134 createMessage (opts) {
135 const message = new Message(opts)
136 this.ports.checkSendingPorts(message)
137 return message
138 }
139
140 /**
141 * creates a new container. Returning a port to it.
142 * @param {String} type
143 * @param {*} data - the data to populate the initail state with
144 * @returns {Object}
145 */
146 createInstance (type, message) {
147 let nonce = this.state.nonce
148
149 const id = {
150 nonce: nonce,
151 parent: this.id
152 }
153
154 // incerment the nonce
155 nonce = new BN(nonce)
156 nonce.iaddn(1)
157 this.state.nonce = nonce.toArray()
158 this.ports.removeSentPorts(message)
159
160 return this.hypervisor.createInstance(type, message, id)
161 }
162
163 /**
164 * sends a message to a given port
165 * @param {Object} portRef - the port
166 * @param {Message} message - the message
167 */
168 async send (port, message) {
169 // set the port that the message came from
170 message._fromTicks = this.ticks
171 this.ports.removeSentPorts(message)
172
173 // if (this.currentMessage !== message && !message.responsePort) {
174 // this.currentMessage._addSubMessage(message)
175 // }
176
177 if (port.destId) {
178 const id = port.destId
179 const instance = await this.hypervisor.getInstance(id)
180 return instance.queue(port.destName, message)
181 } else {
182 // port is unbound
183 port.destPort.messages.push(message)
184 }
185 }
186}
187

Built with git-ssb-web