git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: c779344f6653a07ed5e2695bca74c9cbd0c66a19

Files: c779344f6653a07ed5e2695bca74c9cbd0c66a19 / actor.js

3622 bytesRaw
1const Pipe = require('buffer-pipe')
2const leb128 = require('leb128').unsigned
3
4module.exports = class Actor {
5 /**
6 * the Actor manages the varous message passing functions and provides
7 * an interface for the containers to use
8 * @param {Object} opts
9 * @param {Object} opts.id - the UUID of the Actor
10 * @param {Object} opts.state - the state of the container
11 * @param {Object} opts.hypervisor - the instance of the hypervisor
12 * @param {Object} opts.container - the container constuctor and argments
13 */
14 constructor (opts) {
15 Object.assign(this, opts)
16
17 this.inbox = []
18 this.ticks = 0
19 this.running = false
20 }
21
22 /**
23 * adds a message to this actor's message queue
24 * @param {string} portName
25 * @param {object} message
26 */
27 queue (message) {
28 this.inbox.push(message)
29
30 if (!this.running) {
31 this.running = true
32 return this._startMessageLoop()
33 }
34 }
35
36 // waits for the next message
37 async _startMessageLoop () {
38 // this ensure we only every have one loop running at a time
39 while (this.inbox.length) {
40 const message = this.inbox.shift()
41 if (message._fromTicks > this.ticks) {
42 this.hypervisor.scheduler.update(this.ticks, message._fromTicks)
43 this.ticks = message._fromTicks
44 }
45 await this.runMessage(message)
46 }
47 this.running = false
48 }
49
50 serializeMetaData () {
51 return Actor.serializeMetaData(this.type, this.nonce)
52 }
53
54 getFuncRef (name) {
55 return {
56 name,
57 destId: this.id
58 }
59 }
60
61 static serializeMetaData (type, nonce = 0) {
62 const p = new Pipe()
63 leb128.write(type, p)
64 leb128.write(nonce, p)
65 return p.buffer
66 }
67
68 static deserializeMetaData (buffer) {
69 const pipe = new Pipe(buffer)
70 const type = leb128.read(pipe)
71 const nonce = leb128.read(pipe)
72 return {
73 nonce,
74 type
75 }
76 }
77
78 /**
79 * Runs the shutdown routine for the actor
80 */
81 async shutdown () {
82 await this.state.done()
83 this.state.root['/'][3] = this.serializeMetaData()
84 }
85
86 /**
87 * Runs the startup routine for the actor
88 */
89 async startup () {
90 this.instance = await this.container.instance(this)
91 }
92
93 /**
94 * run the Actor with a given message
95 * @param {object} message - the message to run
96 * @param {String} method - which method to run
97 * @returns {Promise}
98 */
99 async runMessage (message) {
100 try {
101 this.currentMessage = message
102 await this.instance.exports[message.funcRef.name](...message.funcArguments)
103 } catch (e) {
104 message.emit('execution:error', e)
105 }
106 message.emit('done')
107 }
108
109 /**
110 * updates the number of ticks that the actor has run
111 * @param {Number} count - the number of ticks to add
112 */
113 incrementTicks (count) {
114 const oldValue = this.ticks
115 this.ticks += count
116 this.hypervisor.scheduler.update(oldValue, this.ticks)
117 }
118
119 /**
120 * creates an actor
121 * @param {Integer} type - the type id for the container
122 * @param {Object} message - an intial [message](https://github.com/primea/js-primea-message) to send newly created actor
123 */
124 createActor (type, code) {
125 const id = this._generateNextId()
126 return this.hypervisor.createActor(type, code, id)
127 }
128
129 _generateNextId () {
130 const id = {
131 nonce: this.nonce,
132 parent: this.id
133 }
134
135 this.nonce++
136 return id
137 }
138
139 /**
140 * sends a message to a given port
141 * @param {Object} portRef - the port
142 * @param {Message} message - the message
143 */
144 send (message) {
145 message._fromTicks = this.ticks
146 message._fromId = this.id
147
148 this.hypervisor.scheduler.queue([message])
149 }
150}
151

Built with git-ssb-web