git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 5a33099559b3af6b0c5c0e444a8d537471853b77

Files: 5a33099559b3af6b0c5c0e444a8d537471853b77 / actor.js

3623 bytesRaw
1const Pipe = require('buffer-pipe')
2const leb128 = require('leb128').unsigned
3
4module.exports = class Actor {
5 /**
6 * the Actor manages the varous message passing functions and provides
7 * an interface for the containers to use
8 * @param {Object} opts
9 * @param {Object} opts.id - the UUID of the Actor
10 * @param {Object} opts.state - the state of the container
11 * @param {Object} opts.hypervisor - the instance of the hypervisor
12 * @param {Object} opts.container - the container constuctor and argments
13 */
14 constructor (opts) {
15 Object.assign(this, opts)
16
17 this.inbox = []
18
19 this.ticks = 0
20 this.running = false
21 }
22
23 /**
24 * adds a message to this actor's message queue
25 * @param {string} portName
26 * @param {object} message
27 */
28 queue (message) {
29 this.inbox.push(message)
30
31 if (!this.running) {
32 this.running = true
33 return this._startMessageLoop()
34 }
35 }
36
37 // waits for the next message
38 async _startMessageLoop () {
39 // this ensure we only every have one loop running at a time
40 while (this.inbox.length) {
41 const message = this.inbox.shift()
42 if (message._fromTicks > this.ticks) {
43 this.hypervisor.scheduler.update(this.ticks, message._fromTicks)
44 this.ticks = message._fromTicks
45 }
46 await this.runMessage(message)
47 }
48 this.running = false
49 }
50
51 serializeMetaData () {
52 return Actor.serializeMetaData(this.type, this.nonce)
53 }
54
55 getFuncRef (name) {
56 return {
57 name,
58 destId: this.id
59 }
60 }
61
62 static serializeMetaData (type, nonce = 0) {
63 const p = new Pipe()
64 leb128.write(type, p)
65 leb128.write(nonce, p)
66 return p.buffer
67 }
68
69 static deserializeMetaData (buffer) {
70 const pipe = new Pipe(buffer)
71 const type = leb128.read(pipe)
72 const nonce = leb128.read(pipe)
73 return {
74 nonce,
75 type
76 }
77 }
78
79 /**
80 * Runs the shutdown routine for the actor
81 */
82 async shutdown () {
83 await this.state.done()
84 this.state.root['/'][3] = this.serializeMetaData()
85 }
86
87 /**
88 * Runs the startup routine for the actor
89 */
90 async startup () {
91 this.instance = await this.container.instance(this)
92 }
93
94 /**
95 * run the Actor with a given message
96 * @param {object} message - the message to run
97 * @param {String} method - which method to run
98 * @returns {Promise}
99 */
100 async runMessage (message) {
101 try {
102 this.currentMessage = message
103 await this.instance.exports[message.funcRef.name](...message.funcArguments)
104 } catch (e) {
105 message.emit('execution:error', e)
106 }
107 message.emit('done')
108 }
109
110 /**
111 * updates the number of ticks that the actor has run
112 * @param {Number} count - the number of ticks to add
113 */
114 incrementTicks (count) {
115 const oldValue = this.ticks
116 this.ticks += count
117 this.hypervisor.scheduler.update(oldValue, this.ticks)
118 }
119
120 /**
121 * creates an actor
122 * @param {Integer} type - the type id for the container
123 * @param {Object} message - an intial [message](https://github.com/primea/js-primea-message) to send newly created actor
124 */
125 createActor (type, code) {
126 const id = this._generateNextId()
127 return this.hypervisor.createActor(type, code, id)
128 }
129
130 _generateNextId () {
131 const id = {
132 nonce: this.nonce,
133 parent: this.id
134 }
135
136 this.nonce++
137 return id
138 }
139
140 /**
141 * sends a message to a given port
142 * @param {Object} portRef - the port
143 * @param {Message} message - the message
144 */
145 send (message) {
146 message._fromTicks = this.ticks
147 message._fromId = this.id
148
149 this.hypervisor.scheduler.queue([message])
150 }
151}
152

Built with git-ssb-web