git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: f7af9769dc163a3efc6196f8cc06ab9115567a9f

Files: f7af9769dc163a3efc6196f8cc06ab9115567a9f / actor.js

4751 bytesRaw
1const Buffer = require('safe-buffer').Buffer
2const Message = require('primea-message')
3const leb128 = require('leb128').unsigned
4const LockMap = require('lockmap')
5const Inbox = require('./inbox.js')
6
7module.exports = class Actor {
8 /**
9 * the Actor manages the varous message passing functions and provides
10 * an interface for the containers to use
11 * @param {Object} opts
12 * @param {Object} opts.id - the UUID of the Actor
13 * @param {Object} opts.state - the state of the container
14 * @param {Object} opts.hypervisor - the instance of the hypervisor
15 * @param {Object} opts.container - the container constuctor and argments
16 */
17 constructor (opts) {
18 this.state = opts.state
19 this.nonce = leb128.decode(opts.state.root['/'][3].subarray(2))
20 this.hypervisor = opts.hypervisor
21 this.id = opts.id
22 this.container = new opts.container.Constructor(this, opts.container.args)
23 this.inbox = new Inbox({
24 actor: this,
25 hypervisor: opts.hypervisor
26 })
27
28 this.ticks = 0
29 this.running = false
30 this._sending = new LockMap()
31 }
32
33 /**
34 * Mints a new capabilitly with a given tag
35 * @param {*} tag - a tag which can be used to identify caps
36 * @return {Object}
37 */
38 mintCap (tag = 0) {
39 return {
40 destId: this.id,
41 tag: tag
42 }
43 }
44
45 /**
46 * adds a message to this actor's message queue
47 * @param {string} portName
48 * @param {object} message
49 */
50 queue (message) {
51 this.inbox.queue(message)
52 this._startMessageLoop()
53 }
54
55 /**
56 * runs the creation routine for the actor
57 * @param {object} message
58 * @returns {Promise}
59 */
60 create (message) {
61 // start loop before running intializtion message so the the container state
62 // will be "running" incase the actor recievse a message will running
63 // creation code
64 this._startMessageLoop()
65 return this.runMessage(message, 'onCreation')
66 }
67
68 // waits for the next message
69 async _startMessageLoop () {
70 // this ensure we only every have one loop running at a time
71 if (!this.running) {
72 this.running = true
73 while (1) {
74 const message = await this.inbox.nextMessage()
75 if (!message) break
76
77 // if the message we recived had more ticks then we currently have then
78 // update it
79 if (message._fromTicks > this.ticks) {
80 this.ticks = message._fromTicks
81 this.hypervisor.scheduler.update(this)
82 }
83 // run the next message
84 await this.runMessage(message)
85 // wait for state ops to finish
86 await this.state.done()
87 }
88
89 this.running = false
90 this.container.onIdle()
91 }
92 }
93
94 /**
95 * Runs the shutdown routine for the actor
96 */
97 shutdown () {
98 // save the nonce
99 let state = this.state.root['/'][3].subarray(0, 2)
100 this.state.root['/'][3] = Buffer.concat([state, leb128.encode(this.nonce)])
101 this.hypervisor.scheduler.done(this.id)
102 }
103
104 /**
105 * Runs the startup routine for the actor
106 */
107 startup () {
108 return this.container.onStartup()
109 }
110
111 /**
112 * run the Actor with a given message
113 * @param {object} message - the message to run
114 * @param {String} method - which method to run
115 * @returns {Promise}
116 */
117 async runMessage (message, method = 'onMessage') {
118 const responseCap = message.responseCap
119 delete message.responseCap
120
121 let result
122 try {
123 result = await this.container[method](message)
124 } catch (e) {
125 message.emit('execution:error', e)
126 result = {
127 exception: true,
128 exceptionError: e
129 }
130 }
131
132 if (responseCap) {
133 this.send(responseCap, new Message({
134 data: result
135 }))
136 }
137 }
138
139 /**
140 * updates the number of ticks that the actor has run
141 * @param {Number} count - the number of ticks to add
142 */
143 incrementTicks (count) {
144 this.ticks += count
145 this.hypervisor.scheduler.update(this)
146 }
147
148 /**
149 * creates an actor
150 * @param {Integer} type - the type id for the container
151 * @param {Object} message - an intial [message](https://github.com/primea/js-primea-message) to send newly created actor
152 */
153 createActor (type, message) {
154 const id = this._generateNextId()
155 return this.hypervisor.createActor(type, message, id)
156 }
157
158 _generateNextId () {
159 const id = {
160 nonce: this.nonce,
161 parent: this.id
162 }
163
164 this.nonce++
165 return id
166 }
167
168 /**
169 * sends a message to a given port
170 * @param {Object} portRef - the port
171 * @param {Message} message - the message
172 */
173 send (cap, message) {
174 const resolve = this._sending.lock(cap)
175 message._fromTicks = this.ticks
176 message._fromId = this.id
177 message.tag = cap.tag
178
179 return this.hypervisor.send(cap, message).then(() => resolve(cap))
180 }
181}
182

Built with git-ssb-web