git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 0e2eb764d9e711043dde259c1e74c27d2b613c09

Files: 0e2eb764d9e711043dde259c1e74c27d2b613c09 / actor.js

5086 bytesRaw
1const Buffer = require('safe-buffer').Buffer
2const Pipe = require('buffer-pipe')
3const Cap = require('primea-capability')
4const Message = require('primea-message')
5const leb128 = require('leb128').unsigned
6const LockMap = require('lockmap')
7const Inbox = require('./inbox.js')
8
9module.exports = class Actor {
10 /**
11 * the Actor manages the varous message passing functions and provides
12 * an interface for the containers to use
13 * @param {Object} opts
14 * @param {Object} opts.id - the UUID of the Actor
15 * @param {Object} opts.state - the state of the container
16 * @param {Object} opts.hypervisor - the instance of the hypervisor
17 * @param {Object} opts.container - the container constuctor and argments
18 */
19 constructor (opts) {
20 Object.assign(this, opts)
21
22 this.container = new opts.container.Constructor(this, opts.container.args)
23 this.inbox = new Inbox({
24 actor: this,
25 hypervisor: opts.hypervisor
26 })
27
28 this.ticks = 0
29 this.running = false
30 this._sending = new LockMap()
31 }
32
33 /**
34 * Mints a new capabilitly with a given tag
35 * @param {*} tag - a tag which can be used to identify caps
36 * @return {Object}
37 */
38 mintCap (tag = 0) {
39 return new Cap(this.id, tag)
40 }
41
42 /**
43 * adds a message to this actor's message queue
44 * @param {string} portName
45 * @param {object} message
46 */
47 queue (message) {
48 this.inbox.queue(message)
49 this._startMessageLoop()
50 }
51
52 /**
53 * runs the creation routine for the actor
54 * @param {object} message
55 * @returns {Promise}
56 */
57 create (message) {
58 // start loop before running intializtion message so the the container state
59 // will be "running" incase the actor recievse a message will running
60 // creation code
61 this._startMessageLoop()
62 return this.runMessage(message, 'onCreation')
63 }
64
65 // waits for the next message
66 async _startMessageLoop () {
67 // this ensure we only every have one loop running at a time
68 if (!this.running) {
69 this.running = true
70 while (1) {
71 const message = await this.inbox.nextMessage()
72 if (!message) break
73
74 // if the message we recived had more ticks then we currently have then
75 // update it
76 if (message._fromTicks > this.ticks) {
77 this.ticks = message._fromTicks
78 this.hypervisor.scheduler.update(this)
79 }
80 // run the next message
81 await this.runMessage(message)
82 // wait for state ops to finish
83 await this.state.done()
84 }
85
86 this.running = false
87 this.container.onIdle()
88 }
89 }
90
91 serializeMetaData () {
92 return Actor.serializeMetaData(this.type, this.transparent, this.nonce)
93 }
94
95 static serializeMetaData (type, transparent = 0, nonce = 0) {
96 const p = new Pipe()
97 leb128.write(type, p)
98 p.write(Buffer.from([transparent]))
99 leb128.write(nonce, p)
100 return p.buffer
101 }
102
103 static deserializeMetaData (buffer) {
104 const pipe = new Pipe(buffer)
105 return {
106 type: leb128.read(pipe),
107 nonce: leb128.read(pipe),
108 transparent: pipe.read(1)[0]
109 }
110 }
111
112 /**
113 * Runs the shutdown routine for the actor
114 */
115 shutdown () {
116 this.state.root['/'][3] = this.serializeMetaData()
117 this.hypervisor.scheduler.done(this.id)
118 }
119
120 /**
121 * Runs the startup routine for the actor
122 */
123 startup () {
124 return this.container.onStartup()
125 }
126
127 /**
128 * run the Actor with a given message
129 * @param {object} message - the message to run
130 * @param {String} method - which method to run
131 * @returns {Promise}
132 */
133 async runMessage (message, method = 'onMessage') {
134 const responseCap = message.responseCap
135 delete message.responseCap
136
137 let result
138 try {
139 result = await this.container[method](message)
140 } catch (e) {
141 message.emit('execution:error', e)
142 result = {
143 exception: true,
144 exceptionError: e
145 }
146 }
147
148 if (responseCap) {
149 this.send(responseCap, new Message({
150 data: result
151 }))
152 }
153 }
154
155 /**
156 * updates the number of ticks that the actor has run
157 * @param {Number} count - the number of ticks to add
158 */
159 incrementTicks (count) {
160 this.ticks += count
161 this.hypervisor.scheduler.update(this)
162 }
163
164 /**
165 * creates an actor
166 * @param {Integer} type - the type id for the container
167 * @param {Object} message - an intial [message](https://github.com/primea/js-primea-message) to send newly created actor
168 */
169 createActor (type, message) {
170 const id = this._generateNextId()
171 return this.hypervisor.createActor(type, message, id)
172 }
173
174 _generateNextId () {
175 const id = {
176 nonce: this.nonce,
177 parent: this.id
178 }
179
180 this.nonce++
181 return id
182 }
183
184 /**
185 * sends a message to a given port
186 * @param {Object} portRef - the port
187 * @param {Message} message - the message
188 */
189 send (cap, message) {
190 const resolve = this._sending.lock(cap)
191 message._fromTicks = this.ticks
192 message._fromId = this.id
193 message.tag = cap.tag
194
195 return this.hypervisor.send(cap, message).then(() => resolve(cap))
196 }
197}
198

Built with git-ssb-web