git ssb

0+

wanderer🌟 / js-primea-hypervisor



Commit 864bc27090fe788a4bf5a86c00246ca6e4391dd0

docs

wanderer committed on 6/21/2017, 10:29:16 PM
Parent: 4fdd4668b21eec221f7e9be2df9b212b29feffdc

Files changed

exoInterface.jschanged
index.jschanged
portManager.jschanged
scheduler.jschanged
tests/index.jschanged
exoInterface.jsView
@@ -18,9 +18,8 @@
1818 this.container = new opts.container.Constructor(this, opts.container.args)
1919
2020 this.ticks = 0
2121 this.containerState = 'idle'
22- this._waitingMap = new Map()
2322
2423 // create the port manager
2524 this.ports = new PortManager(Object.assign({
2625 exInterface: this
@@ -28,9 +27,10 @@
2827 }
2928
3029 /**
3130 * adds a message to this containers message queue
32- * @param {Message} message
31+ * @param {string} portName
32+ * @param {object} message
3333 */
3434 queue (portName, message) {
3535 message._hops++
3636 this.ports.queue(portName, message)
@@ -43,24 +43,28 @@
4343 }
4444 }
4545 }
4646
47+ // waits for the next message
4748 async _runNextMessage () {
49+ // check if the ports are saturated, if so we don't have to wait on the
50+ // scheduler
4851 if (!this.ports.isSaturated()) {
4952 await this.hypervisor.scheduler.wait(this.ticks, this.id)
5053 }
5154
52- if (this.ports.hasMessages()) {
53- let message = this.ports.peekNextMessage()
55+ let message = this.ports.peekNextMessage()
56+ if (message) {
5457 if (this.ticks < message._fromTicks) {
5558 this.ticks = message._fromTicks
5659 // check for tie messages
5760 this.hypervisor.scheduler.update(this)
58- await this.hypervisor.scheduler.wait(this.ticks, this.id)
61+ if (!this.ports.isSaturated()) {
62+ await this.hypervisor.scheduler.wait(this.ticks, this.id)
63+ message = this.ports.peekNextMessage()
64+ }
5965 }
60- message = this.ports.nextMessage()
61- this.currentMessage = message
62-
66+ message.fromPort.messages.shift()
6367 // run the next message
6468 this.run(message)
6569 } else {
6670 // if no more messages then shut down
@@ -75,11 +79,9 @@
7579 * @returns {Promise}
7680 */
7781 async run (message, init = false) {
7882 let result
79- message.ports.forEach(port => {
80- this.ports._unboundPorts.add(port)
81- })
83+ message.ports.forEach(port => this.ports._unboundPorts.add(port))
8284 if (message.data === 'delete') {
8385 this.ports._delete(message.fromName)
8486 } else {
8587 const method = init ? 'initailize' : 'run'
@@ -134,8 +136,9 @@
134136
135137 // if (this.currentMessage !== message && !message.responsePort) {
136138 // this.currentMessage._addSubMessage(message)
137139 // }
140+
138141 if (port.destId) {
139142 const id = port.destId
140143 const instance = await this.hypervisor.getInstance(id)
141144 instance.queue(port.destName, message)
index.jsView
@@ -13,36 +13,45 @@
1313 */
1414 constructor (dag, state = {}) {
1515 this.graph = new Graph(dag)
1616 this.scheduler = new Scheduler()
17- this._state = state
17+ this.state = state
1818 this._containerTypes = {}
1919 this._nodesToCheck = new Set()
2020 }
2121
22- getDestPort (port) {
23- if (port.destPort) {
24- return port.destPort
25- } else {
26- return this.graph.get(this._state, `${port.destId}/ports/${port.destName}`)
27- }
22+ /**
23+ * add a potaintail node in the state graph to check for garbage collection
24+ * @param {string} id
25+ */
26+ addNodeToCheck (id) {
27+ this._nodesToCheck.add(id)
2828 }
2929
3030 /**
31+ * removes a potaintail node in the state graph to check for garbage collection
32+ * @param {string} id
3133 */
32- async getInstance (id) {
33- let instance = this.scheduler.getInstance(id)
34- if (instance) {
35- return instance
34+ removeNodeToCheck (id) {
35+ this._nodesToCheck.delete(id)
36+ }
37+
38+ /**
39+ * given a port, this finds the corridsponeding endpoint port of the channel
40+ * @param {object} port
41+ * @returns {Promise}
42+ */
43+ getDestPort (port) {
44+ if (port.destPort) {
45+ return port.destPort
3646 } else {
37- const lock = this.scheduler.getLock()
38- instance = await this._loadInstance(id, lock)
39- return instance
47+ return this.graph.get(this.state, `${port.destId}/ports/${port.destName}`)
4048 }
4149 }
4250
51+ // loads an instance of a container from the state
4352 async _loadInstance (id, lock) {
44- const state = await this.graph.get(this._state, id)
53+ const state = await this.graph.get(this.state, id)
4554 const container = this._containerTypes[state.type]
4655
4756 // create a new kernel instance
4857 const exoInterface = new ExoInterface({
@@ -57,9 +66,37 @@
5766 this.scheduler.releaseLock(lock)
5867 return exoInterface
5968 }
6069
70+ /**
71+ * gets an existsing container instances
72+ * @param {string} id - the containers ID
73+ * @returns {Promise}
74+ */
75+ async getInstance (id) {
76+ let instance = this.scheduler.getInstance(id)
77+ if (instance) {
78+ return instance
79+ } else {
80+ const lock = this.scheduler.getLock()
81+ instance = await this._loadInstance(id, lock)
82+ return instance
83+ }
84+ }
85+
86+ /**
87+ * creates an new container instances and save it in the state
88+ * @param {string} type - the type of container to create
89+ * @param {*} code
90+ * @param {array} entryPorts
91+ * @param {object} id
92+ * @param {object} id.nonce
93+ * @param {object} id.parent
94+ * @returns {Promise}
95+ */
6196 async createInstance (type, code, entryPorts = [], id = {nonce: 0, parent: null}) {
97+ // create a lock to prevent the scheduler from reloving waits before the
98+ // new container is loaded
6299 const lock = this.scheduler.getLock()
63100 id = await this.getHashFromObj(id)
64101 const state = {
65102 nonce: [0],
@@ -67,21 +104,28 @@
67104 type: type,
68105 code: code
69106 }
70107
71- await this.graph.set(this._state, id, state)
108+ // save the container in the state
109+ await this.graph.set(this.state, id, state)
110+ // create the container instance
72111 const exoInterface = await this._loadInstance(id, lock)
112+ // send the intialization message
73113 exoInterface.queue(null, new Message({
74114 ports: entryPorts
75115 }))
76116
77117 return exoInterface
78118 }
79119
120+ /**
121+ * deletes container from the state
122+ * @param {string} id
123+ */
80124 deleteInstance (id) {
81125 if (id !== ROOT_ID) {
82126 this._nodesToCheck.delete(id)
83- delete this._state[id]
127+ delete this.state[id]
84128 }
85129 }
86130
87131 /**
@@ -89,15 +133,15 @@
89133 * ticks
90134 * @param {Number} ticks the number of ticks at which to create the state root
91135 * @returns {Promise}
92136 */
93- async createStateRoot (ticks = Infinity) {
137+ async createStateRoot (ticks) {
94138 await this.scheduler.wait(ticks)
95- const unlinked = await DFSchecker(this.graph, this._state, ROOT_ID, this._nodesToCheck)
139+ const unlinked = await DFSchecker(this.graph, this.state, ROOT_ID, this._nodesToCheck)
96140 unlinked.forEach(id => {
97- delete this._state[id]
141+ delete this.state[id]
98142 })
99- return this.graph.flush(this._state)
143+ return this.graph.flush(this.state)
100144 }
101145
102146 /**
103147 * regirsters a container with the hypervisor
@@ -111,55 +155,76 @@
111155 args: args
112156 }
113157 }
114158
159+ /**
160+ * get a hash from a POJO
161+ * @param {object} obj
162+ * @return {Promise}
163+ */
115164 async getHashFromObj (obj) {
116165 return (await this.graph.flush(obj))['/']
117166 }
118167 }
119168
169+// Implements a parrilizable DFS check for graph connictivity given a set of nodes
170+// and a root node. Stating for the set of node to check this does a DFS and
171+// will return a set a nodes if any that is not connected to the root node.
120172 async function DFSchecker (graph, state, root, nodes) {
121173 const checkedNodesSet = new Set()
122174 let hasRootSet = new Set()
123175 const promises = []
124176
125177 for (const id of nodes) {
178+ // create a set for each of the starting nodes to track the nodes the DFS has
179+ // has traversed
126180 const checkedNodes = new Set()
127181 checkedNodesSet.add(checkedNodes)
128182 promises.push(check(id, checkedNodes))
129183 }
130184
185+ // wait for all the search to complete
131186 await Promise.all(promises)
187+ // remove the set of nodes that are connected to the root
132188 checkedNodesSet.delete(hasRootSet)
133189 let unLinkedNodesArray = []
134190
191+ // combine the unconnected sets into a single array
135192 for (const set of checkedNodesSet) {
136193 unLinkedNodesArray = unLinkedNodesArray.concat([...set])
137194 }
138195 return unLinkedNodesArray
139196
197+ // does the DFS starting with a single node ID
140198 async function check (id, checkedNodes) {
141- if (!checkedNodesSet.has(checkedNodes) || checkedNodes.has(id) || hasRootSet === checkedNodes) {
199+ if (!checkedNodesSet.has(checkedNodes) || // check if this DFS is still searching
200+ checkedNodes.has(id) || // check if this DFS has alread seen the node
201+ hasRootSet === checkedNodes) { // check that this DFS has alread found the root node
142202 return
143203 }
144204
205+ // check if any of the the other DFSs have seen this node and if so merge
206+ // the sets and stop searching
145207 for (const set of checkedNodesSet) {
146208 if (set.has(id)) {
147209 checkedNodes.forEach(id => set.add(id))
148210 checkedNodesSet.delete(checkedNodes)
149211 return
150212 }
151213 }
152214
215+ // mark the node 'checked'
153216 checkedNodes.add(id)
154217
218+ // check to see if we are at the root
155219 if (id === root) {
156220 hasRootSet = checkedNodes
157221 return
158222 }
159223
160- const node = await graph.get(state, id)
224+ const node = state[id]['/']
161225 const promises = []
226+ // iterate through the nodes ports and recursivly check them
162227 for (const name in node.ports) {
163228 const port = node.ports[name]
164229 promises.push(check(port.destId, checkedNodes))
165230 }
portManager.jsView
@@ -50,8 +50,9 @@
5050 } else if (this.ports[name]) {
5151 throw new Error('cannot bind port to a name that is alread bound')
5252 } else {
5353 this._unboundPorts.delete(port)
54+ this.hypervisor.removeNodeToCheck(this.id)
5455
5556 // save the port instance
5657 this.ports[name] = port
5758
@@ -64,38 +65,45 @@
6465 }
6566
6667 /**
6768 * unbinds a port given its name
68- * @param {String} name
69- * @returns {boolean} whether or not the port was deleted
69+ * @param {string} name
70+ * @returns {Promise}
7071 */
7172 async unbind (name) {
7273 const port = this.ports[name]
7374 delete this.ports[name]
7475 this._unboundPorts.add(port)
7576
76- let destPort = await this.hypervisor.getDestPort(port)
77-
77+ // update the destination port
78+ const destPort = await this.hypervisor.getDestPort(port)
7879 delete destPort.destName
7980 delete destPort.destId
8081 destPort.destPort = port
81- this.hypervisor._nodesToCheck.add(this.id)
82+ this.hypervisor.addNodeToCheck(this.id)
8283 return port
8384 }
8485
86+ /**
87+ * delete an port given the name it is bound to
88+ * @param {string} name
89+ */
8590 delete (name) {
8691 const port = this.ports[name]
87- this._delete(name)
8892 this.exInterface.send(port, new Message({
8993 data: 'delete'
9094 }))
95+ this._delete(name)
9196 }
9297
9398 _delete (name) {
94- this.hypervisor._nodesToCheck.add(this.id)
99+ this.hypervisor.addNodeToCheck(this.id)
95100 delete this.ports[name]
96101 }
97102
103+ /**
104+ * clears any unbounded ports referances
105+ */
98106 clearUnboundedPorts () {
99107 this._unboundPorts.forEach(port => {
100108 this.exInterface.send(port, new Message({
101109 data: 'delete'
@@ -135,33 +143,38 @@
135143 return this.ports[name]
136144 }
137145
138146 /**
139- * creates a new Port given the container type
147+ * creates a new container. Returning a port to it.
140148 * @param {String} type
141149 * @param {*} data - the data to populate the initail state with
142- * @returns {Promise}
150+ * @returns {Object}
143151 */
144152 create (type, data) {
145- // const container = this.hypervisor._containerTypes[type]
146153 let nonce = this.state.nonce
147154
148155 const id = {
149156 nonce: nonce,
150157 parent: this.id
151158 }
152159
160+ // incerment the nonce
161+ nonce = new BN(nonce)
162+ nonce.iaddn(1)
163+ this.state.nonce = nonce.toArray()
164+
165+ // create a new channel for the container
153166 const ports = this.createChannel()
154167 this._unboundPorts.delete(ports[1])
155168 this.hypervisor.createInstance(type, data, [ports[1]], id)
156169
157- // incerment the nonce
158- nonce = new BN(nonce)
159- nonce.iaddn(1)
160- this.state.nonce = nonce.toArray()
161170 return ports[0]
162171 }
163172
173+ /**
174+ * creates a channel returns the created ports in an Array
175+ * @returns {array}
176+ */
164177 createChannel () {
165178 const port1 = {
166179 messages: []
167180 }
@@ -177,31 +190,29 @@
177190 return [port1, port2]
178191 }
179192
180193 /**
181- * gets the next canonical message given the an array of ports to choose from
182- * @param {Array} ports
183- * @returns {Promise}
194+ * find and returns the next message
195+ * @returns {object}
184196 */
185- nextMessage () {
186- const message = this.peekNextMessage()
187- message._fromPort.messages.shift()
188- return message
189- }
190-
191197 peekNextMessage () {
192- const portName = Object.keys(this.ports).reduce(messageArbiter.bind(this))
193- const port = this.ports[portName]
194- const message = port.messages[0]
195- message._fromPort = port
196- message.fromName = portName
197- return message
198+ const names = Object.keys(this.ports)
199+ if (names.length) {
200+ const portName = names.reduce(messageArbiter.bind(this))
201+ const port = this.ports[portName]
202+ const message = port.messages[0]
203+ if (message) {
204+ message._fromPort = port
205+ message.fromName = portName
206+ return message
207+ }
208+ }
198209 }
199210
200- hasMessages () {
201- return Object.keys(this.ports).some(name => this.ports[name].messages.length)
202- }
203-
211+ /**
212+ * tests wether or not all the ports have a message
213+ * @returns {boolean}
214+ */
204215 isSaturated () {
205216 return Object.keys(this.ports).every(name => this.ports[name].messages.length)
206217 }
207218 }
scheduler.jsView
@@ -43,17 +43,16 @@
4343 this.instances.delete(instance.id)
4444 this._checkWaits()
4545 }
4646
47- wait (ticks, id) {
47+ wait (ticks = Infinity) {
4848 if (!this.locks.size && ticks <= this.smallest()) {
4949 return
5050 } else {
5151 return new Promise((resolve, reject) => {
5252 binarySearchInsert(this._waits, comparator, {
5353 ticks: ticks,
54- resolve: resolve,
55- id: id
54+ resolve: resolve
5655 })
5756 })
5857 }
5958 }
tests/index.jsView
@@ -46,8 +46,45 @@
4646 const stateRoot = await hypervisor.createStateRoot(Infinity)
4747 t.deepEquals(stateRoot, expectedState, 'expected root!')
4848 })
4949
50+ tape('one child contract with saturated ports', async t => {
51+ t.plan(2)
52+ let message
53+ const expectedState = {
54+ '/': 'zdpuAtVcH6MUnvt2RXnLsDXyLB3CBSQ7aydfh2ogSKGCejJCQ'
55+ }
56+
57+ class testVMContainer2 extends BaseContainer {
58+ run (m) {
59+ t.true(m === message, 'should recive a message')
60+ }
61+ }
62+
63+ class testVMContainer extends BaseContainer {
64+ run (m) {
65+ const port = this.exInterface.ports.create('test2')
66+ this.exInterface.ports.bind('child', port)
67+ this.exInterface.incrementTicks(2)
68+ this.exInterface.send(port, m)
69+ }
70+ }
71+
72+ const hypervisor = new Hypervisor(node.dag)
73+ hypervisor.registerContainer('test', testVMContainer)
74+ hypervisor.registerContainer('test2', testVMContainer2)
75+
76+ let root = await hypervisor.createInstance('test')
77+ let port = root.ports.create('test')
78+
79+ root.ports.bind('first', port)
80+ message = root.createMessage()
81+
82+ root.send(port, message)
83+ const stateRoot = await hypervisor.createStateRoot(Infinity)
84+ t.deepEquals(stateRoot, expectedState, 'expected state')
85+ })
86+
5087 tape('one child contract', async t => {
5188 t.plan(4)
5289 let message
5390 const expectedState = {
@@ -150,9 +187,8 @@
150187 this.exInterface.ports.bind('two', two)
151188
152189 this.exInterface.send(one, this.exInterface.createMessage())
153190 this.exInterface.send(two, this.exInterface.createMessage())
154-
155191 } else if (runs === 1) {
156192 runs++
157193 t.equals(m.data, 'second', 'should recived the second message')
158194 } else if (runs === 2) {
@@ -486,8 +522,61 @@
486522
487523 t.end()
488524 })
489525
526+ tape('should not remove connected nodes', async t => {
527+ const expectedSr = {
528+ '/': 'zdpuAwsZTd5mRZBCYA1FJSHrpYDPgSZSiaTQp9xkUeajaoMHM'
529+ }
530+ class Root extends BaseContainer {
531+ run (m) {
532+ if (m.ports.length) {
533+ const port = this.exInterface.ports.get('test1')
534+ this.exInterface.send(port, m)
535+ this.exInterface.ports.unbind('test1')
536+ // this.exInterface.ports.unbind('test2')
537+ } else {
538+ const port1 = this.exInterface.ports.create('sub')
539+ this.exInterface.ports.bind('test1', port1)
540+ const port2 = this.exInterface.ports.create('sub')
541+ this.exInterface.ports.bind('test2', port2)
542+ this.exInterface.send(port2, this.exInterface.createMessage({data: 'getChannel'}))
543+ }
544+ }
545+ }
546+
547+ class Sub extends BaseContainer {
548+ run (message) {
549+ if (message.data === 'getChannel') {
550+ const ports = this.exInterface.ports.createChannel()
551+ this.exInterface.ports.bind('channel', ports[0])
552+ this.exInterface.send(message.fromPort, this.exInterface.createMessage({
553+ data: 'bindPort',
554+ ports: [ports[1]]
555+ }))
556+ } else if (message.data === 'bindPort') {
557+ this.exInterface.ports.bind('channel', message.ports[0])
558+ }
559+ }
560+ }
561+
562+ const hypervisor = new Hypervisor(node.dag)
563+
564+ hypervisor.registerContainer('root', Root)
565+ hypervisor.registerContainer('sub', Sub)
566+
567+ const root = await hypervisor.createInstance('root')
568+ const port = root.ports.create('root')
569+ root.ports.bind('first', port)
570+ root.send(port, root.createMessage())
571+ const sr = await hypervisor.createStateRoot()
572+ t.deepEquals(sr, expectedSr, 'should produce the corret state root')
573+ // await hypervisor.graph.tree(sr, Infinity)
574+ // console.log(JSON.stringify(sr, null, 2))
575+
576+ t.end()
577+ })
578+
490579 tape('should remove multiple subgraphs', async t => {
491580 const expectedSr = {
492581 '/': 'zdpuAmi9tkYTpoVsZvqQgxpQFRhCgYFVv4W3fjjfVhf1j8swv'
493582 }

Built with git-ssb-web