git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 1aa22ba2b0c933edd5df2a76889d2cd6751baca0

Files: 1aa22ba2b0c933edd5df2a76889d2cd6751baca0 / kernel.js

5044 bytesRaw
1const Message = require('primea-message')
2const BN = require('bn.js')
3const PortManager = require('./portManager.js')
4const DeleteMessage = require('./deleteMessage')
5
6module.exports = class Kernel {
7 /**
8 * the Kernel manages the varous message passing functions and provides
9 * an interface for the containers to use
10 * @param {Object} opts
11 * @param {Object} opts.id - the UUID of the Kernel
12 * @param {Object} opts.state - the state of the container
13 * @param {Object} opts.hypervisor
14 * @param {Object} opts.container - the container constuctor and argments
15 */
16 constructor (opts) {
17 this.state = opts.state
18 this.hypervisor = opts.hypervisor
19 this.id = opts.id
20 this.container = new opts.container.Constructor(this, opts.container.args)
21
22 this.ticks = 0
23 this.containerState = 'idle'
24
25 // create the port manager
26 this.ports = new PortManager(Object.assign({
27 kernel: this
28 }, opts))
29 }
30
31 /**
32 * adds a message to this containers message queue
33 * @param {string} portName
34 * @param {object} message
35 */
36 queue (portName, message) {
37 this.ports.queue(portName, message)
38 if (this.containerState !== 'running') {
39 this.containerState = 'running'
40 this._runNextMessage()
41 }
42 }
43
44 initialize (message) {
45 this.containerState = 'running'
46 this.run(message, 'initialize')
47 }
48
49 // waits for the next message
50 async _runNextMessage () {
51 // check if the ports are saturated, if so we don't have to wait on the
52 // scheduler
53 const message = await this.ports.getNextMessage()
54
55 if (!message) {
56 // if no more messages then shut down
57 this.hypervisor.scheduler.done(this.id)
58 } else {
59 message.fromPort.messages.shift()
60 // if the message we recived had more ticks then we currently have the
61 // update it
62 if (message._fromTicks > this.ticks) {
63 this.ticks = message._fromTicks
64 }
65 this.hypervisor.scheduler.update(this)
66 // run the next message
67 this.run(message)
68 }
69 }
70
71 /**
72 * run the kernels code with a given enviroment
73 * @param {object} message - the message to run
74 * @param {boolean} init - whether or not to run the intialization routine
75 * @returns {Promise}
76 */
77 async run (message, method = 'run') {
78 let result
79
80 const responsePort = message.responsePort
81 delete message.responsePort
82
83 message.ports.forEach(port => this.ports._unboundPorts.add(port))
84 message._hops++
85
86 if (message.constructor === DeleteMessage) {
87 this.ports._delete(message.fromName)
88 } else {
89 try {
90 result = await this.container[method](message) || {}
91 } catch (e) {
92 result = {
93 exception: true,
94 exceptionError: e
95 }
96 }
97 }
98
99 if (responsePort) {
100 this.send(responsePort, new Message({
101 data: result
102 }))
103 this.ports._unboundPorts.add(responsePort)
104 }
105
106 this.ports.clearUnboundedPorts()
107 this._runNextMessage()
108 return result
109 }
110
111 getResponsePort (message) {
112 if (message.responsePort) {
113 return message.responsePort.destPort
114 } else {
115 const [portRef1, portRef2] = this.ports.createChannel()
116 message.responsePort = portRef2
117 this.ports._unboundPorts.delete(portRef2)
118 return portRef1
119 }
120 }
121
122 /**
123 * updates the number of ticks that the container has run
124 * @param {Number} count - the number of ticks to add
125 */
126 incrementTicks (count) {
127 this.ticks += count
128 this.hypervisor.scheduler.update(this)
129 }
130
131 /**
132 * creates a new message
133 * @param {*} data
134 */
135 createMessage (opts) {
136 const message = new Message(opts)
137 for (const port of message.ports) {
138 if (this.ports.isBound(port)) {
139 throw new Error('message must not contain bound ports')
140 }
141 }
142 return message
143 }
144
145 /**
146 * creates a new container. Returning a port to it.
147 * @param {String} type
148 * @param {*} data - the data to populate the initail state with
149 * @returns {Object}
150 */
151 createInstance (type, message) {
152 let nonce = this.state.nonce
153
154 const id = {
155 nonce: nonce,
156 parent: this.id
157 }
158
159 // incerment the nonce
160 nonce = new BN(nonce)
161 nonce.iaddn(1)
162 this.state.nonce = nonce.toArray()
163 this.ports.removeSentPorts(message)
164
165 this.hypervisor.createInstance(type, message, id)
166 }
167
168 /**
169 * sends a message to a given port
170 * @param {Object} portRef - the port
171 * @param {Message} message - the message
172 */
173 async send (port, message) {
174 // set the port that the message came from
175 message._fromTicks = this.ticks
176 this.ports.removeSentPorts(message)
177
178 // if (this.currentMessage !== message && !message.responsePort) {
179 // this.currentMessage._addSubMessage(message)
180 // }
181
182 if (port.destId) {
183 const id = port.destId
184 const instance = await this.hypervisor.getInstance(id)
185 instance.queue(port.destName, message)
186 } else {
187 // port is unbound
188 port.destPort.messages.push(message)
189 }
190 }
191}
192

Built with git-ssb-web