git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 4c6125763fffeac43ae5302bb5541ac7f124c0a3

Files: 4c6125763fffeac43ae5302bb5541ac7f124c0a3 / actor.js

3657 bytesRaw
1const Pipe = require('buffer-pipe')
2const leb128 = require('leb128').unsigned
3const Inbox = require('./inbox.js')
4
5module.exports = class Actor {
6 /**
7 * the Actor manages the varous message passing functions and provides
8 * an interface for the containers to use
9 * @param {Object} opts
10 * @param {Object} opts.id - the UUID of the Actor
11 * @param {Object} opts.state - the state of the container
12 * @param {Object} opts.hypervisor - the instance of the hypervisor
13 * @param {Object} opts.container - the container constuctor and argments
14 */
15 constructor (opts) {
16 Object.assign(this, opts)
17
18 this.inbox = new Inbox({
19 actor: this,
20 hypervisor: opts.hypervisor
21 })
22
23 this.ticks = 0
24 this.running = false
25 }
26
27 /**
28 * adds a message to this actor's message queue
29 * @param {string} portName
30 * @param {object} message
31 */
32 queue (message) {
33 this.inbox.queue(message)
34
35 if (!this.running) {
36 this.running = true
37 this._startMessageLoop()
38 }
39 }
40
41 // waits for the next message
42 async _startMessageLoop () {
43 // this ensure we only every have one loop running at a time
44 while (!this.inbox.isEmpty) {
45 const message = await this.inbox.nextMessage()
46 await this.runMessage(message)
47 }
48 this.running = false
49 // wait for state ops to finish
50 await this.state.done()
51 setTimeout(() => {
52 if (!this.running) {
53 this.shutdown()
54 }
55 }, 0)
56 }
57
58 serializeMetaData () {
59 return Actor.serializeMetaData(this.type, this.nonce)
60 }
61
62 getFuncRef (name) {
63 return {
64 name,
65 destId: this.id
66 }
67 }
68
69 static serializeMetaData (type, nonce = 0) {
70 const p = new Pipe()
71 leb128.write(type, p)
72 leb128.write(nonce, p)
73 return p.buffer
74 }
75
76 static deserializeMetaData (buffer) {
77 const pipe = new Pipe(buffer)
78 const type = leb128.read(pipe)
79 const nonce = leb128.read(pipe)
80 return {
81 nonce,
82 type
83 }
84 }
85
86 /**
87 * Runs the shutdown routine for the actor
88 */
89 shutdown () {
90 this.state.root['/'][3] = this.serializeMetaData()
91 this.hypervisor.scheduler.done(this.id)
92 }
93
94 /**
95 * Runs the startup routine for the actor
96 */
97 async startup () {
98 this.instance = await this.container.instance(this)
99 }
100
101 /**
102 * run the Actor with a given message
103 * @param {object} message - the message to run
104 * @param {String} method - which method to run
105 * @returns {Promise}
106 */
107 async runMessage (message) {
108 try {
109 this.currentMessage = message
110 await this.instance.exports[message.funcRef.name](...message.funcArguments)
111 } catch (e) {
112 message.emit('execution:error', e)
113 }
114 }
115
116 /**
117 * updates the number of ticks that the actor has run
118 * @param {Number} count - the number of ticks to add
119 */
120 incrementTicks (count) {
121 this.ticks += count
122 this.hypervisor.scheduler.update(this)
123 }
124
125 /**
126 * creates an actor
127 * @param {Integer} type - the type id for the container
128 * @param {Object} message - an intial [message](https://github.com/primea/js-primea-message) to send newly created actor
129 */
130 createActor (type, code) {
131 const id = this._generateNextId()
132 return this.hypervisor.createActor(type, code, id)
133 }
134
135 _generateNextId () {
136 const id = {
137 nonce: this.nonce,
138 parent: this.id
139 }
140
141 this.nonce++
142 return id
143 }
144
145 /**
146 * sends a message to a given port
147 * @param {Object} portRef - the port
148 * @param {Message} message - the message
149 */
150 send (message) {
151 message._fromTicks = this.ticks
152 message._fromId = this.id
153
154 return this.hypervisor.send(message)
155 }
156}
157

Built with git-ssb-web