git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 679caf4c89ce01c38751eb18177edee3a845d23e

Files: 679caf4c89ce01c38751eb18177edee3a845d23e / kernel.js

4178 bytesRaw
1const Message = require('primea-message')
2const PortManager = require('./portManager.js')
3const DeleteMessage = require('./deleteMessage')
4
5module.exports = class Kernel {
6 /**
7 * the Kernel manages the varous message passing functions and provides
8 * an interface for the containers to use
9 * @param {Object} opts
10 * @param {Object} opts.id - the UUID of the Kernel
11 * @param {Object} opts.state - the state of the container
12 * @param {Object} opts.hypervisor
13 * @param {Object} opts.container - the container constuctor and argments
14 */
15 constructor (opts) {
16 this.state = opts.state
17 this.hypervisor = opts.hypervisor
18 this.id = opts.id
19 this.container = new opts.container.Constructor(this, opts.container.args)
20
21 this.ticks = 0
22 this.containerState = 'idle'
23
24 // create the port manager
25 this.ports = new PortManager(Object.assign({
26 kernel: this
27 }, opts))
28 }
29
30 /**
31 * adds a message to this containers message queue
32 * @param {string} portName
33 * @param {object} message
34 */
35 queue (portName, message) {
36 this.ports.queue(portName, message)
37 if (this.containerState !== 'running') {
38 this.containerState = 'running'
39 this._runNextMessage()
40 }
41 }
42
43 initialize (message) {
44 this.containerState = 'running'
45 this.run(message, true)
46 }
47
48 // waits for the next message
49 async _runNextMessage () {
50 // check if the ports are saturated, if so we don't have to wait on the
51 // scheduler
52 const message = await this.ports.getNextMessage()
53
54 if (!message) {
55 // if no more messages then shut down
56 this.hypervisor.scheduler.done(this.id)
57 } else {
58 message.fromPort.messages.shift()
59 // if the message we recived had more ticks then we currently have the
60 // update it
61 if (message._fromTicks > this.ticks) {
62 this.ticks = message._fromTicks
63 }
64 this.hypervisor.scheduler.update(this)
65 // run the next message
66 this.run(message)
67 }
68 }
69
70 /**
71 * run the kernels code with a given enviroment
72 * @param {object} message - the message to run
73 * @param {boolean} init - whether or not to run the intialization routine
74 * @returns {Promise}
75 */
76 async run (message, init = false) {
77 let result
78
79 message.ports.forEach(port => this.ports._unboundPorts.add(port))
80 message._hops++
81
82 if (message.constructor === DeleteMessage) {
83 this.ports._delete(message.fromName)
84 } else {
85 const method = init ? 'initialize' : 'run'
86 try {
87 result = await this.container[method](message) || {}
88 } catch (e) {
89 result = {
90 exception: true,
91 exceptionError: e
92 }
93 }
94 }
95 this.ports.clearUnboundedPorts()
96 // const responsePort = this.message.responsePort
97 // if (responsePort) {
98 // this.send(responsePort, new Message({data: result}))
99 // }
100 this._runNextMessage()
101 return result
102 }
103
104 /**
105 * updates the number of ticks that the container has run
106 * @param {Number} count - the number of ticks to add
107 */
108 incrementTicks (count) {
109 this.ticks += count
110 this.hypervisor.scheduler.update(this)
111 }
112
113 /**
114 * creates a new message
115 * @param {*} data
116 */
117 createMessage (opts) {
118 const message = new Message(opts)
119 for (const port of message.ports) {
120 if (this.ports.isBound(port)) {
121 throw new Error('message must not contain bound ports')
122 }
123 }
124 return message
125 }
126
127 /**
128 * sends a message to a given port
129 * @param {Object} portRef - the port
130 * @param {Message} message - the message
131 */
132 async send (port, message) {
133 // set the port that the message came from
134 message._fromTicks = this.ticks
135 message.ports.forEach(port => this.ports._unboundPorts.delete(port))
136
137 // if (this.currentMessage !== message && !message.responsePort) {
138 // this.currentMessage._addSubMessage(message)
139 // }
140
141 if (port.destId) {
142 const id = port.destId
143 const instance = await this.hypervisor.getInstance(id)
144 instance.queue(port.destName, message)
145 } else {
146 // port is unbound
147 port.destPort.messages.push(message)
148 }
149 }
150}
151

Built with git-ssb-web