git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 74ccd6e33a63430ec22a6fcbfd0fd40f10c349c5

Files: 74ccd6e33a63430ec22a6fcbfd0fd40f10c349c5 / kernel.js

4112 bytesRaw
1const Message = require('primea-message')
2const PortManager = require('./portManager.js')
3const DeleteMessage = require('./deleteMessage')
4
5module.exports = class Kernel {
6 /**
7 * the Kernel manages the varous message passing functions and provides
8 * an interface for the containers to use
9 * @param {Object} opts
10 * @param {Object} opts.id - the UUID of the Kernel
11 * @param {Object} opts.state - the state of the container
12 * @param {Object} opts.hypervisor
13 * @param {Object} opts.container - the container constuctor and argments
14 */
15 constructor (opts) {
16 this.state = opts.state
17 this.hypervisor = opts.hypervisor
18 this.id = opts.id
19 this.container = new opts.container.Constructor(this, opts.container.args)
20
21 this.ticks = 0
22 this.containerState = 'idle'
23
24 // create the port manager
25 this.ports = new PortManager(Object.assign({
26 kernel: this
27 }, opts))
28 }
29
30 /**
31 * adds a message to this containers message queue
32 * @param {string} portName
33 * @param {object} message
34 */
35 queue (portName, message) {
36 message._hops++
37 if (portName) {
38 this.ports.queue(portName, message)
39 if (this.containerState !== 'running') {
40 this.containerState = 'running'
41 this._runNextMessage()
42 }
43 } else {
44 // initailiazation message
45 this.containerState = 'running'
46 this.run(message, true)
47 }
48 }
49
50 // waits for the next message
51 async _runNextMessage () {
52 // check if the ports are saturated, if so we don't have to wait on the
53 // scheduler
54 const message = await this.ports.getNextMessage()
55
56 if (!message) {
57 // if no more messages then shut down
58 this.hypervisor.scheduler.done(this.id)
59 } else {
60 message.fromPort.messages.shift()
61 // if the message we recived had more ticks then we currently have the
62 // update it
63 if (message._fromTicks > this.ticks) {
64 this.ticks = message._fromTicks
65 }
66 this.hypervisor.scheduler.update(this)
67 // run the next message
68 this.run(message)
69 }
70 }
71
72 /**
73 * run the kernels code with a given enviroment
74 * @param {object} message - the message to run
75 * @param {boolean} init - whether or not to run the intialization routine
76 * @returns {Promise}
77 */
78 async run (message, init = false) {
79 let result
80 message.ports.forEach(port => this.ports._unboundPorts.add(port))
81 if (message.constructor === DeleteMessage) {
82 this.ports._delete(message.fromName)
83 } else {
84 const method = init ? 'initailize' : 'run'
85 try {
86 result = await this.container[method](message) || {}
87 } catch (e) {
88 result = {
89 exception: true,
90 exceptionError: e
91 }
92 }
93 }
94 this.ports.clearUnboundedPorts()
95 // message.response(result)
96 this._runNextMessage()
97 return result
98 }
99
100 /**
101 * updates the number of ticks that the container has run
102 * @param {Number} count - the number of ticks to add
103 */
104 incrementTicks (count) {
105 this.ticks += count
106 this.hypervisor.scheduler.update(this)
107 }
108
109 /**
110 * creates a new message
111 * @param {*} data
112 */
113 createMessage (opts) {
114 const message = new Message(opts)
115 for (const port of message.ports) {
116 if (this.ports.isBound(port)) {
117 throw new Error('message must not contain bound ports')
118 }
119 }
120 return message
121 }
122
123 /**
124 * sends a message to a given port
125 * @param {Object} portRef - the port
126 * @param {Message} message - the message
127 */
128 async send (port, message) {
129 // set the port that the message came from
130 message._fromTicks = this.ticks
131 message.ports.forEach(port => this.ports._unboundPorts.delete(port))
132
133 // if (this.currentMessage !== message && !message.responsePort) {
134 // this.currentMessage._addSubMessage(message)
135 // }
136
137 if (port.destId) {
138 const id = port.destId
139 const instance = await this.hypervisor.getInstance(id)
140 instance.queue(port.destName, message)
141 } else {
142 // port is unbound
143 port.destPort.messages.push(message)
144 }
145 }
146}
147

Built with git-ssb-web