git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: c77c7618fb3abd31a3d1b797af55b9f21a318173

Files: c77c7618fb3abd31a3d1b797af55b9f21a318173 / actor.js

4136 bytesRaw
1const Pipe = require('buffer-pipe')
2const Cap = require('primea-capability')
3const leb128 = require('leb128').unsigned
4const Inbox = require('./inbox.js')
5
6module.exports = class Actor {
7 /**
8 * the Actor manages the varous message passing functions and provides
9 * an interface for the containers to use
10 * @param {Object} opts
11 * @param {Object} opts.id - the UUID of the Actor
12 * @param {Object} opts.state - the state of the container
13 * @param {Object} opts.hypervisor - the instance of the hypervisor
14 * @param {Object} opts.container - the container constuctor and argments
15 */
16 constructor (opts) {
17 Object.assign(this, opts)
18
19 this.container = new opts.container.Constructor(this, opts.container.args)
20 this.inbox = new Inbox({
21 actor: this,
22 hypervisor: opts.hypervisor
23 })
24
25 this.ticks = 0
26 this.running = false
27 }
28
29 /**
30 * Mints a new capabilitly with a given tag
31 * @param {*} tag - a tag which can be used to identify caps
32 * @return {Object}
33 */
34 mintCap (tag = 0, funcIndex = 0) {
35 return new Cap(this.id, tag, funcIndex)
36 }
37
38 /**
39 * adds a message to this actor's message queue
40 * @param {string} portName
41 * @param {object} message
42 */
43 queue (message) {
44 this.inbox.queue(message)
45
46 if (!this.running) {
47 this.running = true
48 this._startMessageLoop()
49 }
50 }
51
52 /**
53 * runs the creation routine for the actor
54 * @param {object} message
55 * @returns {Promise}
56 */
57 create (message) {
58 this.running = true
59 return this.runMessage(message, 'onCreation').then(() => {
60 this._startMessageLoop()
61 })
62 }
63
64 // waits for the next message
65 async _startMessageLoop () {
66 // this ensure we only every have one loop running at a time
67 while (!this.inbox.isEmpty) {
68 const message = await this.inbox.nextMessage(0)
69 await this.runMessage(message)
70 }
71 this.running = false
72 // wait for state ops to finish
73 await this.state.done()
74 setTimeout(() => {
75 if (!this.running) {
76 this.container.onIdle()
77 }
78 }, 0)
79 }
80
81 serializeMetaData () {
82 return Actor.serializeMetaData(this.type, this.nonce)
83 }
84
85 static serializeMetaData (type, nonce = 0) {
86 const p = new Pipe()
87 leb128.write(type, p)
88 leb128.write(nonce, p)
89 return p.buffer
90 }
91
92 static deserializeMetaData (buffer) {
93 const pipe = new Pipe(buffer)
94 const type = leb128.read(pipe)
95 const nonce = leb128.read(pipe)
96 return {
97 nonce,
98 type
99 }
100 }
101
102 /**
103 * Runs the shutdown routine for the actor
104 */
105 shutdown () {
106 this.state.root['/'][3] = this.serializeMetaData()
107 this.hypervisor.scheduler.done(this.id)
108 }
109
110 /**
111 * Runs the startup routine for the actor
112 */
113 startup () {
114 return this.container.onStartup()
115 }
116
117 /**
118 * run the Actor with a given message
119 * @param {object} message - the message to run
120 * @param {String} method - which method to run
121 * @returns {Promise}
122 */
123 async runMessage (message, method = 'onMessage') {
124 try {
125 await this.container[method](message)
126 } catch (e) {
127 message.emit('execution:error', e)
128 }
129 }
130
131 /**
132 * updates the number of ticks that the actor has run
133 * @param {Number} count - the number of ticks to add
134 */
135 incrementTicks (count) {
136 this.ticks += count
137 this.hypervisor.scheduler.update(this)
138 }
139
140 /**
141 * creates an actor
142 * @param {Integer} type - the type id for the container
143 * @param {Object} message - an intial [message](https://github.com/primea/js-primea-message) to send newly created actor
144 */
145 createActor (type, message) {
146 const id = this._generateNextId()
147 return this.hypervisor.createActor(type, message, id)
148 }
149
150 _generateNextId () {
151 const id = {
152 nonce: this.nonce,
153 parent: this.id
154 }
155
156 this.nonce++
157 return id
158 }
159
160 /**
161 * sends a message to a given port
162 * @param {Object} portRef - the port
163 * @param {Message} message - the message
164 */
165 send (cap, message) {
166 message._fromTicks = this.ticks
167 message._fromId = this.id
168
169 return this.hypervisor.send(cap, message)
170 }
171}
172

Built with git-ssb-web