git ssb

0+

wanderer🌟 / js-primea-hypervisor



Commit fb80c5e51c634a7cd505d71b34160b19f451cd16

added init implemention of ports

wanderer committed on 2/21/2017, 10:19:12 PM
Parent: 7cdddef81c118db3b188949697b28350cbe2e343

Files changed

codeHandler.jschanged
defaultAgent.jschanged
index.jschanged
message.jschanged
messageQueue.jschanged
port.jschanged
runTx.jschanged
tests/apiTests.jsadded
hypervisor.jsadded
portManager.jsadded
codeHandler.jsView
@@ -18,17 +18,29 @@
1818 return new Wasm(code)
1919 }
2020 }
2121
22+const javascript = {
23+ test: (code) => {
24+ return typeof code === 'object'
25+ },
26+ init: (code) => {
27+ return code
28+ }
29+}
30+
2231 let codeHandlers = exports.handlers = {
2332 default: defaultHandler,
24- wasm: wasm
33+ wasm: wasm,
34+ javascript: javascript
2535 }
2636
2737 exports.init = (code) => {
2838 for (let name in codeHandlers) {
29- const handler = codeHandlers[name]
30- if (handler.test(code)) {
31- return handler.init(code)
32- }
39+ try {
40+ const handler = codeHandlers[name]
41+ if (handler.test(code)) {
42+ return handler.init(code)
43+ }
44+ } catch (e) {}
3345 }
3446 }
defaultAgent.jsView
@@ -1,8 +1,8 @@
11 exports.run = async (message, kernel) => {
22 const to = message.to[message.hops - 1]
33 if (to) {
4- return kernel.send(to, message)
4+ return kernel.send(message)
55 } else if (message.data.getValue) {
66 return (await kernel.state.get(message.data.getValue)).value
77 }
88 }
index.jsView
@@ -1,56 +1,83 @@
1+const EventEmitter = require('events')
12 const Vertex = require('merkle-trie')
2-const Port = require('./port.js')
3+const PortManager = require('./portManager.js')
34 const imports = require('./EVMinterface.js')
45 const codeHandler = require('./codeHandler.js')
5-const MessageQueue = require('./messageQueue')
66 const common = require('./common.js')
77
8-module.exports = class Kernel {
8+module.exports = class Kernel extends EventEmitter {
99 constructor (opts = {}) {
10+ super()
1011 const state = this.state = opts.state || new Vertex()
1112 this.code = opts.code || state.value
1213 this.path = state.path
1314 this.imports = opts.imports || [imports]
14- // RENAME agent
15+ this.ports = new PortManager(state, opts.parent, Kernel)
16+ // rename sandbox?
1517 this._vm = (opts.codeHandler || codeHandler).init(this.code)
16- this._messageQueue = new MessageQueue(this)
17- this.ports = new Port(state, Kernel)
18+ this._state = 'idle'
19+ this.ports.on('message', message => {
20+ // was this kernel already visted?
21+ if (message.isCyclic(this) || this._state === 'idle') {
22+ this.run(message)
23+ }
24+ })
1825 }
1926
27+ runNextMessage () {
28+ this.ports.dequeue().then(message => {
29+ if (message) {
30+ this.run(message)
31+ } else {
32+ this._state = 'idle'
33+ this.emit('idle')
34+ }
35+ })
36+ }
37+
2038 /**
2139 * run the kernels code with a given enviroment
2240 * The Kernel Stores all of its state in the Environment. The Interface is used
2341 * to by the VM to retrive infromation from the Environment.
2442 */
2543 async run (message, imports = this.imports) {
26- // const state = this.state.copy()
27- const result = await this._vm.run(message, this, imports)
28- // if (!result.execption) {
29- // // update the state
30- // this.state.set([], state)
31- // }
32- message.finished()
33- return result
34- }
44+ this._state = 'running'
45+ const oldState = this.state.copy()
46+ const result = await this._vm.run(message, this, imports) || {}
3547
36- async recieve (message) {
37- if (message.isCyclic(this)) {
38- const result = await this.run(message)
39- return result
48+ function revert () {
49+ // revert the state
50+ this.state.set([], oldState)
51+ // revert all the sent messages
52+ this.ports.outbox.revert()
53+ this.runNextMessage()
54+ }
55+
56+ if (result.execption) {
57+ // failed messages
58+ revert()
59+ } else if (message.atomic) {
60+ // messages
61+ message.finished().then(this.runNextMessage).catch(revert)
4062 } else {
41- return this._messageQueue.add(message)
63+ // non-atomic messages
64+ this.runNextMessage()
4265 }
66+ return result
4367 }
4468
45- async send (port, message) {
46- message.addVistedKernel(this)
69+ async send (message) {
70+ let portName = message.nextPort()
71+ message.addVistedKernel(message)
72+ this.lastMessage = message
4773 // replace root with parent path to root
48- if (port === common.ROOT) {
49- port = common.PARENT
50- message.to = new Array(this.state.path.length).fill(common.PARENT).concat(message.to)
74+ if (portName === common.ROOT) {
75+ portName = common.PARENT
76+ message.to = new Array(this.path.length).fill(common.PARENT).concat(message.to)
5177 }
52- return this.ports.send(port, message)
78+ const port = await this.ports.get(portName)
79+ return port.send(message)
5380 }
5481
5582 setValue (name, value) {
5683 this.state.set(name, value)
@@ -62,5 +89,8 @@
6289
6390 deleteValue (name) {
6491 return this.state.del(name)
6592 }
93+
94+ // remove from cache
95+ shutdown () {}
6696 }
message.jsView
@@ -6,33 +6,37 @@
66 // call infromation
77 to: [],
88 from: [],
99 data: new Uint8Array(),
10- sync: true,
10+ atomic: true,
1111 // resource info
1212 gas: new U256(0),
1313 gasPrices: new U256(0)
1414 }
1515 Object.assign(this, defaults, opts)
1616 this.hops = 0
17- this._vistedAgents = []
17+ this._vistedKernels = []
1818 }
1919
2020 finished () {
21- if (this.sync) {
22- this._vistedAgents.pop()
21+ if (this.atomic) {
22+ this._vistedKernels.pop()
2323 }
24+ return new Promise((resolve, reject) => {
25+
26+ })
2427 }
2528
29+ nextPort () {
30+ return this.to[this.hops++]
31+ }
32+
2633 addVistedKernel (kernel) {
27- const parentMessage = kernel._messageQueue.currentMessage
28- this.hops++
29- if (this.sync && parentMessage) {
30- this._vistedAgents = parentMessage._vistedAgents
31- this._vistedAgents.push(kernel)
34+ if (this.atomic) {
35+ this._vistedKernels.push(kernel)
3236 }
3337 }
3438
3539 isCyclic (kernel) {
36- return this.sync && this._vistedAgents.some(process => process === kernel)
40+ return this.sync && this._vistedKernels.some(process => process === kernel)
3741 }
3842 }
messageQueue.jsView
@@ -1,10 +1,11 @@
11 module.exports = class MessageQueue {
22 constructor (kernel) {
3- this.kernel = kernel
3+ this._queue = []
44 }
55
66 add (message) {
77 this.currentMessage = message
88 return this.kernel.run(message)
99 }
10+
1011 }
port.jsView
@@ -1,47 +1,26 @@
1-const Cache = require('imperative-trie')
2-const common = require('./common')
1+const EventEmitter = require('events')
32
4-module.exports = class Port {
5- constructor (state, constructor) {
6- this.state = state
7- this.Kernel = constructor
8- this.cache = new Cache()
3+module.exports = class Port extends EventEmitter {
4+ constructor () {
5+ super()
6+ this.queue = []
97 }
108
11- async send (name, message) {
12- if (name === common.PARENT) {
13- message.from.push(this.state.name)
14- } else {
15- message.from.push(common.PARENT)
16- }
9+ connect (destPort) {
10+ this.destPort = destPort
11+ destPort.destPort = this
12+ }
1713
18- const dest = await this.get(name)
19- return dest.recieve(message)
14+ async send (message) {
15+ return this.destPort.recieve(message)
2016 }
2117
22- async get (name) {
23- const vertex = name === common.PARENT ? this.cache.parent : this.cache.get(name)
18+ async recieve (message) {
19+ this.emit('message', message)
20+ this.queue.push(message)
21+ }
2422
25- if (vertex) {
26- return vertex.value
27- } else {
28- const destState = await (
29- name === common.PARENT
30- ? this.state.getParent()
31- : this.state.get([name]))
32-
33- const kernel = new this.Kernel({
34- state: destState
35- })
36-
37- const cache = new Cache(kernel)
38- kernel.ports.cache = cache
39- if (name === common.PARENT) {
40- cache.set(this.state.name, this.cache)
41- } else {
42- this.cache.set(name, cache)
43- }
44- return kernel
45- }
23+ dequeue () {
24+ return this.queue.unshift()
4625 }
4726 }
runTx.jsView
@@ -1,5 +1,12 @@
1+class runtx extends Kernel {
2+ run (tx) {
3+
14
5+ }
6+}
7+
8+
29 // run tx; the tx message handler
310 // runTx (tx, environment = new Environment()) {
411 // this.environment = environment
512
tests/apiTests.jsView
@@ -1,0 +1,24 @@
1+const tape = require('tape')
2+const Hypervisor = require('../hypervisor.js')
3+const Message = require('../message.js')
4+
5+tape('send and reciving messages', async t => {
6+ try {
7+ const hypervisor = new Hypervisor()
8+ const path = ['one', 'two', 'three']
9+ hypervisor.set(path, {
10+ run: message => {
11+ t.pass('got message')
12+ t.end()
13+ return {}
14+ }
15+ })
16+ hypervisor.send(new Message({
17+ to: path
18+ })).catch(e => {
19+ console.log(e)
20+ })
21+ } catch (e) {
22+ console.log(e)
23+ }
24+})
hypervisor.jsView
@@ -1,0 +1,51 @@
1+const Kernel = require('./index.js')
2+const Vertex = require('merkle-trie')
3+const Block = require('./deps/block.js')
4+const blockchain = require('./fakeBlockChain.js')
5+const codeHandlers = require('./codeHandler.js')
6+
7+module.exports = class Hypervisor {
8+ constructor (state = new Vertex()) {
9+ this.state = state
10+ if (state.isEmpty) {
11+ state.set('block', new Vertex({
12+ value: new Block()
13+ }))
14+ state.set('blockchain', new Vertex({
15+ value: blockchain
16+ }))
17+ }
18+ this.root = new Kernel({
19+ state: state
20+ })
21+ }
22+
23+ set (path, kernel) {
24+ this.state.set(path, new Vertex({
25+ value: kernel
26+ }))
27+ }
28+
29+ send (message) {
30+ return this.root.send(message)
31+ }
32+
33+ async get (path) {
34+ let lastKernel = this.root
35+ let state = this.state
36+ while (path.length) {
37+ const name = path.unshift()
38+ state = await state.get(name)
39+ const kernel = new Kernel({
40+ state: state,
41+ parent: lastKernel
42+ })
43+ lastKernel = kernel
44+ }
45+ return lastKernel
46+ }
47+
48+ addVM (type, handler) {
49+ codeHandlers.handlers.type = handler
50+ }
51+}
portManager.jsView
@@ -1,0 +1,67 @@
1+const EventEmitter = require('events')
2+const Port = require('./port.js')
3+const common = require('./common.js')
4+
5+module.exports = class PortManager extends EventEmitter {
6+ constructor (state, destParentPort, KernelContructor) {
7+ super()
8+ this.state = state
9+ this.sentMessage = []
10+ this.Kernel = KernelContructor
11+ // set up the parent port
12+ const parentPort = new Port()
13+ parentPort.on('message', message => {
14+ this.emit('message', message)
15+ })
16+ // create the cache
17+ this.cache = new Map()
18+ this.cache.set(common.PARENT, parentPort)
19+ }
20+
21+ async get (name) {
22+ let port = this.cache.get(name)
23+ if (!port) {
24+ port = new Port()
25+ port.on('message', message => {
26+ this.emit('message', message)
27+ })
28+ // create destination kernel
29+ const state = await this.state.get(name)
30+ const destKernel = new this.Kernel({
31+ state: state,
32+ parent: port
33+ })
34+
35+ // shutdown the kernel when it is done doing it work
36+ destKernel.on('idle', () => {
37+ destKernel.shutdown()
38+ this.cache.delete(name)
39+ })
40+
41+ // find and connect the destination port
42+ const destPort = await destKernel.ports.get(common.PARENT)
43+ port.connect(destPort)
44+ this.cache.set(name, port)
45+ }
46+ return port
47+ }
48+
49+ // dequeues the first message that is waiting on a port
50+ async dequeue () {
51+ // clear the outbox
52+ this.sentMessage = []
53+ for (let port in this.cache) {
54+ const message = port.dequeue()
55+ if (message) {
56+ return message
57+ }
58+ }
59+ }
60+
61+ close () {
62+ for (let port in this.cache) {
63+ port.emit('close')
64+ }
65+ this.cache.clear()
66+ }
67+}

Built with git-ssb-web