git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 46248da0b2e047c143b387676430c8d2d27cf4a3

Files: 46248da0b2e047c143b387676430c8d2d27cf4a3 / exoInterface.js

4739 bytesRaw
1const PortManager = require('./portManager.js')
2const Message = require('primea-message')
3
4module.exports = class ExoInterface {
5 /**
6 * the ExoInterface manages the varous message passing functions and provides
7 * an interface for the containers to use
8 * @param {Object} opts
9 * @param {Object} opts.id
10 * @param {Object} opts.state
11 * @param {Object} opts.hypervisor
12 * @param {Object} opts.Container
13 */
14 constructor (opts) {
15 this.state = opts.state
16 this.hypervisor = opts.hypervisor
17 this.id = opts.id
18 this.container = new opts.container.Constructor(this, opts.container.args)
19
20 this.ticks = 0
21 this.containerState = 'idle'
22 this._pendingSends = new Map()
23
24 // create the port manager
25 this.ports = new PortManager(Object.assign({
26 exInterface: this
27 }, opts))
28 }
29
30 _addWork (promise) {
31 this._outStandingWork = Promise.all([this._outStandingWork, promise])
32 }
33
34 /**
35 * adds a message to this containers message queue
36 * @param {string} portName
37 * @param {object} message
38 */
39 queue (portName, message) {
40 message._hops++
41 this.ports.queue(portName, message)
42 if (this.containerState !== 'running') {
43 this.containerState = 'running'
44 if (portName) {
45 this._runNextMessage()
46 } else {
47 this.run(message, true)
48 }
49 }
50 }
51
52 // waits for the next message
53 async _runNextMessage () {
54 // check if the ports are saturated, if so we don't have to wait on the
55 // scheduler
56 let message = this.ports.peekNextMessage()
57 let saturated = this.ports.isSaturated()
58 let oldestTime = this.hypervisor.scheduler.smallest()
59
60 while (!saturated &&
61 !(message && oldestTime >= message._fromTicks ||
62 !message && oldestTime === this.ticks)) {
63 const ticksToWait = message ? message._fromTicks : this.ticks
64
65 await Promise.race([
66 this.hypervisor.scheduler.wait(ticksToWait, this.id).then(m => {
67 message = this.ports.peekNextMessage()
68 }),
69 this.ports.olderMessage(message).then(m => {
70 message = m
71 }),
72 this.ports.whenSaturated().then(() => {
73 saturated = true
74 message = this.ports.peekNextMessage()
75 })
76 ])
77
78 oldestTime = this.hypervisor.scheduler.smallest()
79 saturated = this.ports.isSaturated()
80 }
81
82 if (!message) {
83 // if no more messages then shut down
84 this.hypervisor.scheduler.done(this)
85 } else {
86 message.fromPort.messages.shift()
87 if (message._fromTicks > this.ticks) {
88 this.ticks = message._fromTicks
89 }
90 this.hypervisor.scheduler.update(this)
91 // run the next message
92 this.run(message)
93 }
94 }
95
96 /**
97 * run the kernels code with a given enviroment
98 * The Kernel Stores all of its state in the Environment. The Interface is used
99 * to by the VM to retrive infromation from the Environment.
100 * @returns {Promise}
101 */
102 async run (message, init = false) {
103 let result
104 message.ports.forEach(port => this.ports._unboundPorts.add(port))
105 if (message.data === 'delete') {
106 this.ports._delete(message.fromName)
107 } else {
108 const method = init ? 'initailize' : 'run'
109
110 try {
111 result = await this.container[method](message) || {}
112 } catch (e) {
113 result = {
114 exception: true,
115 exceptionError: e
116 }
117 }
118 }
119 this.ports.clearUnboundedPorts()
120 // message.response(result)
121 this._runNextMessage()
122 return result
123 }
124
125 /**
126 * updates the number of ticks that the container has run
127 * @param {Number} count - the number of ticks to add
128 */
129 incrementTicks (count) {
130 this.ticks += count
131 this.hypervisor.scheduler.update(this)
132 }
133
134 /**
135 * creates a new message
136 * @param {*} data
137 */
138 createMessage (opts) {
139 const message = new Message(opts)
140 for (const port of message.ports) {
141 if (this.ports.isBound(port)) {
142 throw new Error('message must not contain bound ports')
143 }
144 }
145 return message
146 }
147
148 /**
149 * sends a message to a given port
150 * @param {Object} portRef - the port
151 * @param {Message} message - the message
152 */
153 async send (port, message) {
154 // set the port that the message came from
155 message._fromTicks = this.ticks
156 message.ports.forEach(port => this.ports._unboundPorts.delete(port))
157
158 // if (this.currentMessage !== message && !message.responsePort) {
159 // this.currentMessage._addSubMessage(message)
160 // }
161
162 if (port.destId) {
163 const id = port.destId
164 const instance = await this.hypervisor.getInstance(id)
165 instance.queue(port.destName, message)
166 } else {
167 port.destPort.messages.push(message)
168 }
169 }
170}
171

Built with git-ssb-web