git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: a9dfa953b57feff50f135a8191e5ee17d3adfc83

Files: a9dfa953b57feff50f135a8191e5ee17d3adfc83 / actor.js

4573 bytesRaw
1const Buffer = require('safe-buffer').Buffer
2const Pipe = require('buffer-pipe')
3const Cap = require('primea-capability')
4const Message = require('primea-message')
5const leb128 = require('leb128').unsigned
6const Inbox = require('./inbox.js')
7
8module.exports = class Actor {
9 /**
10 * the Actor manages the varous message passing functions and provides
11 * an interface for the containers to use
12 * @param {Object} opts
13 * @param {Object} opts.id - the UUID of the Actor
14 * @param {Object} opts.state - the state of the container
15 * @param {Object} opts.hypervisor - the instance of the hypervisor
16 * @param {Object} opts.container - the container constuctor and argments
17 */
18 constructor (opts) {
19 Object.assign(this, opts)
20
21 this.container = new opts.container.Constructor(this, opts.container.args)
22 this.inbox = new Inbox({
23 actor: this,
24 hypervisor: opts.hypervisor
25 })
26
27 this.ticks = 0
28 this.running = false
29 }
30
31 /**
32 * Mints a new capabilitly with a given tag
33 * @param {*} tag - a tag which can be used to identify caps
34 * @return {Object}
35 */
36 mintCap (tag = 0, funcIndex = 0) {
37 return new Cap(this.id, tag, funcIndex)
38 }
39
40 /**
41 * adds a message to this actor's message queue
42 * @param {string} portName
43 * @param {object} message
44 */
45 queue (message) {
46 this.inbox.queue(message)
47 this._startMessageLoop()
48 }
49
50 /**
51 * runs the creation routine for the actor
52 * @param {object} message
53 * @returns {Promise}
54 */
55 create (message) {
56 this.running = true
57 return this.runMessage(message, 'onCreation').then(() => {
58 this.running = false
59 this._startMessageLoop()
60 })
61 }
62
63 // waits for the next message
64 async _startMessageLoop () {
65 // this ensure we only every have one loop running at a time
66 if (!this.running) {
67 this.running = true
68 while (1) {
69 const message = await this.inbox.nextMessage(0, true)
70 if (!message) break
71
72 // run the next message
73 await this.runMessage(message)
74 // wait for state ops to finish
75 await this.state.done()
76 }
77
78 this.running = false
79 this.container.onIdle()
80 }
81 }
82
83 serializeMetaData () {
84 return Actor.serializeMetaData(this.type, this.transparent, this.nonce)
85 }
86
87 static serializeMetaData (type, transparent = 0, nonce = 0) {
88 const p = new Pipe()
89 leb128.write(type, p)
90 p.write(Buffer.from([0]))
91 leb128.write(nonce, p)
92 return p.buffer
93 }
94
95 static deserializeMetaData (buffer) {
96 const pipe = new Pipe(buffer)
97 const type = leb128.read(pipe)
98 pipe.read(1)
99 const nonce = leb128.read(pipe)
100 return {
101 nonce,
102 type
103 }
104 }
105
106 /**
107 * Runs the shutdown routine for the actor
108 */
109 shutdown () {
110 this.state.root['/'][3] = this.serializeMetaData()
111 this.hypervisor.scheduler.done(this.id)
112 }
113
114 /**
115 * Runs the startup routine for the actor
116 */
117 startup () {
118 return this.container.onStartup()
119 }
120
121 /**
122 * run the Actor with a given message
123 * @param {object} message - the message to run
124 * @param {String} method - which method to run
125 * @returns {Promise}
126 */
127 async runMessage (message, method = 'onMessage') {
128 let result
129 try {
130 result = await this.container[method](message)
131 } catch (e) {
132 message.emit('execution:error', e)
133 result = {
134 exception: true,
135 exceptionError: e
136 }
137 }
138
139 if (message.responseCap) {
140 this.send(message.responseCap, new Message({
141 data: result
142 }))
143 }
144 }
145
146 /**
147 * updates the number of ticks that the actor has run
148 * @param {Number} count - the number of ticks to add
149 */
150 incrementTicks (count) {
151 this.ticks += count
152 this.hypervisor.scheduler.update(this)
153 }
154
155 /**
156 * creates an actor
157 * @param {Integer} type - the type id for the container
158 * @param {Object} message - an intial [message](https://github.com/primea/js-primea-message) to send newly created actor
159 */
160 createActor (type, message) {
161 const id = this._generateNextId()
162 return this.hypervisor.createActor(type, message, id)
163 }
164
165 _generateNextId () {
166 const id = {
167 nonce: this.nonce,
168 parent: this.id
169 }
170
171 this.nonce++
172 return id
173 }
174
175 /**
176 * sends a message to a given port
177 * @param {Object} portRef - the port
178 * @param {Message} message - the message
179 */
180 send (cap, message) {
181 message._fromTicks = this.ticks
182 message._fromId = this.id
183 message.tag = cap.tag
184
185 return this.hypervisor.send(cap, message)
186 }
187}
188

Built with git-ssb-web