git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 7dc277be0c3a95ca7e090f1768ddf2c8990e785f

Files: 7dc277be0c3a95ca7e090f1768ddf2c8990e785f / kernel.js

4901 bytesRaw
1const Message = require('primea-message')
2const BN = require('bn.js')
3const PortManager = require('./portManager.js')
4const DeleteMessage = require('./deleteMessage')
5
6module.exports = class Kernel {
7 /**
8 * the Kernel manages the varous message passing functions and provides
9 * an interface for the containers to use
10 * @param {Object} opts
11 * @param {Object} opts.id - the UUID of the Kernel
12 * @param {Object} opts.state - the state of the container
13 * @param {Object} opts.hypervisor
14 * @param {Object} opts.container - the container constuctor and argments
15 */
16 constructor (opts) {
17 this.state = opts.state
18 this.hypervisor = opts.hypervisor
19 this.id = opts.id
20 this.container = new opts.container.Constructor(this, opts.container.args)
21
22 this.ticks = 0
23 this.containerState = 'idle'
24
25 // create the port manager
26 this.ports = new PortManager(Object.assign({
27 kernel: this
28 }, opts))
29 }
30
31 /**
32 * adds a message to this containers message queue
33 * @param {string} portName
34 * @param {object} message
35 */
36 queue (portName, message) {
37 this.ports.queue(portName, message)
38 if (this.containerState !== 'running') {
39 this.containerState = 'running'
40 this._runNextMessage()
41 }
42 }
43
44 initialize (message) {
45 this.containerState = 'running'
46 this.run(message, 'initialize')
47 }
48
49 // waits for the next message
50 async _runNextMessage () {
51 // check if the ports are saturated, if so we don't have to wait on the
52 // scheduler
53 const message = await this.ports.getNextMessage()
54
55 if (!message) {
56 // if no more messages then shut down
57 this.hypervisor.scheduler.done(this.id)
58 } else {
59 message.fromPort.messages.shift()
60 // if the message we recived had more ticks then we currently have the
61 // update it
62 if (message._fromTicks > this.ticks) {
63 this.ticks = message._fromTicks
64 }
65 this.hypervisor.scheduler.update(this)
66 // run the next message
67 this.run(message)
68 }
69 }
70
71 /**
72 * run the kernels code with a given enviroment
73 * @param {object} message - the message to run
74 * @param {boolean} init - whether or not to run the intialization routine
75 * @returns {Promise}
76 */
77 async run (message, method = 'run') {
78 let result
79
80 const responsePort = message.responsePort
81 delete message.responsePort
82
83 this.ports.addReceivedPorts(message)
84 message._hops++
85
86 if (message.constructor === DeleteMessage) {
87 this.ports._delete(message.fromName)
88 } else {
89 try {
90 result = await this.container[method](message) || {}
91 } catch (e) {
92 result = {
93 exception: true,
94 exceptionError: e
95 }
96 }
97 }
98
99 if (responsePort) {
100 this.send(responsePort, new Message({
101 data: result
102 }))
103 this.ports._unboundPorts.add(responsePort)
104 }
105
106 this.ports.clearUnboundedPorts()
107 this._runNextMessage()
108 return result
109 }
110
111 getResponsePort (message) {
112 if (message.responsePort) {
113 return message.responsePort.destPort
114 } else {
115 const [portRef1, portRef2] = this.ports.createChannel()
116 message.responsePort = portRef2
117 this.ports._unboundPorts.delete(portRef2)
118 return portRef1
119 }
120 }
121
122 /**
123 * updates the number of ticks that the container has run
124 * @param {Number} count - the number of ticks to add
125 */
126 incrementTicks (count) {
127 this.ticks += count
128 this.hypervisor.scheduler.update(this)
129 }
130
131 /**
132 * creates a new message
133 * @param {*} data
134 */
135 createMessage (opts) {
136 const message = new Message(opts)
137 this.ports.checkSendingPorts(message)
138 return message
139 }
140
141 /**
142 * creates a new container. Returning a port to it.
143 * @param {String} type
144 * @param {*} data - the data to populate the initail state with
145 * @returns {Object}
146 */
147 createInstance (type, message) {
148 let nonce = this.state.nonce
149
150 const id = {
151 nonce: nonce,
152 parent: this.id
153 }
154
155 // incerment the nonce
156 nonce = new BN(nonce)
157 nonce.iaddn(1)
158 this.state.nonce = nonce.toArray()
159 this.ports.removeSentPorts(message)
160
161 this.hypervisor.createInstance(type, message, id)
162 }
163
164 /**
165 * sends a message to a given port
166 * @param {Object} portRef - the port
167 * @param {Message} message - the message
168 */
169 async send (port, message) {
170 // set the port that the message came from
171 message._fromTicks = this.ticks
172 this.ports.removeSentPorts(message)
173
174 // if (this.currentMessage !== message && !message.responsePort) {
175 // this.currentMessage._addSubMessage(message)
176 // }
177
178 if (port.destId) {
179 const id = port.destId
180 const instance = await this.hypervisor.getInstance(id)
181 instance.queue(port.destName, message)
182 } else {
183 // port is unbound
184 port.destPort.messages.push(message)
185 }
186 }
187}
188

Built with git-ssb-web