git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 86f9d39232c23d80576543898b260fdb486becd1

Files: 86f9d39232c23d80576543898b260fdb486becd1 / actor.js

4782 bytesRaw
1const Buffer = require('safe-buffer').Buffer
2const Message = require('primea-message')
3const leb128 = require('leb128').unsigned
4const LockMap = require('lockmap')
5const Inbox = require('./inbox.js')
6
7module.exports = class Actor {
8 /**
9 * the Actor manages the varous message passing functions and provides
10 * an interface for the containers to use
11 * @param {Object} opts
12 * @param {Object} opts.id - the UUID of the Actor
13 * @param {Object} opts.state - the state of the container
14 * @param {Object} opts.hypervisor - the instance of the hypervisor
15 * @param {Object} opts.container - the container constuctor and argments
16 */
17 constructor (opts) {
18 // console.log(opts.state)
19 this.state = opts.state
20 this.nonce = leb128.decode(opts.state.root['/'][3].subarray(2))
21 this.hypervisor = opts.hypervisor
22 this.id = opts.id
23 this.container = new opts.container.Constructor(this, opts.container.args)
24 this.inbox = new Inbox({
25 actor: this,
26 hypervisor: opts.hypervisor
27 })
28
29 this.ticks = 0
30 this.running = false
31 this._sending = new LockMap()
32 }
33
34 /**
35 * Mints a new capabilitly with a given tag
36 * @param {*} tag - a tag which can be used to identify caps
37 * @return {Object}
38 */
39 mintCap (tag = 0) {
40 return {
41 destId: this.id,
42 tag: tag
43 }
44 }
45
46 /**
47 * adds a message to this actor's message queue
48 * @param {string} portName
49 * @param {object} message
50 */
51 queue (message) {
52 this.inbox.queue(message)
53 this._startMessageLoop()
54 }
55
56 /**
57 * runs the creation routine for the actor
58 * @param {object} message
59 * @returns {Promise}
60 */
61 create (message) {
62 // start loop before running intializtion message so the the container state
63 // will be "running" incase the actor recievse a message will running
64 // creation code
65 this._startMessageLoop()
66 return this.runMessage(message, 'onCreation')
67 }
68
69 // waits for the next message
70 async _startMessageLoop () {
71 // this ensure we only every have one loop running at a time
72 if (!this.running) {
73 this.running = true
74 while (1) {
75 const message = await this.inbox.nextMessage()
76 if (!message) break
77
78 // if the message we recived had more ticks then we currently have then
79 // update it
80 if (message._fromTicks > this.ticks) {
81 this.ticks = message._fromTicks
82 this.hypervisor.scheduler.update(this)
83 }
84 // run the next message
85 await this.runMessage(message)
86 // wait for state ops to finish
87 await this.state.done()
88 }
89
90 this.running = false
91 this.container.onIdle()
92 }
93 }
94
95 /**
96 * Runs the shutdown routine for the actor
97 */
98 shutdown () {
99 // save the nonce
100 let state = this.state.root['/'][3].subarray(0, 2)
101 this.state.root['/'][3] = Buffer.concat([state, leb128.encode(this.nonce)])
102 this.hypervisor.scheduler.done(this.id)
103 }
104
105 /**
106 * Runs the startup routine for the actor
107 */
108 startup () {
109 return this.container.onStartup()
110 }
111
112 /**
113 * run the Actor with a given message
114 * @param {object} message - the message to run
115 * @param {String} method - which method to run
116 * @returns {Promise}
117 */
118 async runMessage (message, method = 'onMessage') {
119 const responseCap = message.responseCap
120 delete message.responseCap
121
122 let result
123 try {
124 result = await this.container[method](message)
125 } catch (e) {
126 message.emit('execution:error', e)
127 result = {
128 exception: true,
129 exceptionError: e
130 }
131 }
132
133 if (responseCap) {
134 this.send(responseCap, new Message({
135 data: result
136 }))
137 }
138 }
139
140 /**
141 * updates the number of ticks that the actor has run
142 * @param {Number} count - the number of ticks to add
143 */
144 incrementTicks (count) {
145 this.ticks += count
146 this.hypervisor.scheduler.update(this)
147 }
148
149 /**
150 * creates an actor
151 * @param {Integer} type - the type id for the container
152 * @param {Object} message - an intial [message](https://github.com/primea/js-primea-message) to send newly created actor
153 */
154 createActor (type, message) {
155 const id = this._generateNextId()
156 return this.hypervisor.createActor(type, message, id)
157 }
158
159 _generateNextId () {
160 const id = {
161 nonce: this.nonce,
162 parent: this.id
163 }
164
165 this.nonce++
166 return id
167 }
168
169 /**
170 * sends a message to a given port
171 * @param {Object} portRef - the port
172 * @param {Message} message - the message
173 */
174 send (cap, message) {
175 const resolve = this._sending.lock(cap)
176 message._fromTicks = this.ticks
177 message._fromId = this.id
178 message.tag = cap.tag
179
180 return this.hypervisor.send(cap, message).then(() => resolve(cap))
181 }
182}
183

Built with git-ssb-web