git ssb

0+

wanderer🌟 / js-primea-hypervisor



Commit 73b5b187d94ac2f96f31922f4249e72df021ec1c

cleanup

wanderer committed on 6/28/2017, 9:12:34 PM
Parent: 3c9e6f994621504f855b1992f1e25e2bb1bf389b

Files changed

exoInterface.jschanged
index.jschanged
portManager.jschanged
scheduler.jschanged
tests/index.jschanged
deleteMessage.jsadded
dfsChecker.jsadded
exoInterface.jsView
@@ -1,6 +1,7 @@
1+const Message = require('primea-message')
12 const PortManager = require('./portManager.js')
2-const Message = require('primea-message')
3+const DeleteMessage = require('./deleteMessage')
34
45 module.exports = class ExoInterface {
56 /**
67 * the ExoInterface manages the varous message passing functions and provides
@@ -18,73 +19,48 @@
1819 this.container = new opts.container.Constructor(this, opts.container.args)
1920
2021 this.ticks = 0
2122 this.containerState = 'idle'
22- this._pendingSends = new Map()
2323
2424 // create the port manager
2525 this.ports = new PortManager(Object.assign({
2626 exInterface: this
2727 }, opts))
2828 }
2929
30- _addWork (promise) {
31- this._outStandingWork = Promise.all([this._outStandingWork, promise])
32- }
33-
3430 /**
3531 * adds a message to this containers message queue
3632 * @param {string} portName
3733 * @param {object} message
3834 */
3935 queue (portName, message) {
4036 message._hops++
41- this.ports.queue(portName, message)
42- if (this.containerState !== 'running') {
43- this.containerState = 'running'
44- if (portName) {
37+ if (portName) {
38+ this.ports.queue(portName, message)
39+ if (this.containerState !== 'running') {
40+ this.containerState = 'running'
4541 this._runNextMessage()
46- } else {
47- this.run(message, true)
4842 }
43+ } else {
44+ // initailiazation message
45+ this.containerState = 'running'
46+ this.run(message, true)
4947 }
5048 }
5149
5250 // waits for the next message
5351 async _runNextMessage () {
5452 // check if the ports are saturated, if so we don't have to wait on the
5553 // scheduler
56- let message = this.ports.peekNextMessage()
57- let saturated = this.ports.isSaturated()
58- let oldestTime = this.hypervisor.scheduler.smallest()
54+ const message = await this.ports.getNextMessage()
5955
60- while (!saturated &&
61- !(message && oldestTime >= message._fromTicks ||
62- !message && oldestTime === this.ticks)) {
63- const ticksToWait = message ? message._fromTicks : this.ticks
64-
65- await Promise.race([
66- this.hypervisor.scheduler.wait(ticksToWait, this.id).then(m => {
67- message = this.ports.peekNextMessage()
68- }),
69- this.ports.olderMessage(message).then(m => {
70- message = m
71- }),
72- this.ports.whenSaturated().then(() => {
73- saturated = true
74- message = this.ports.peekNextMessage()
75- })
76- ])
77-
78- oldestTime = this.hypervisor.scheduler.smallest()
79- saturated = this.ports.isSaturated()
80- }
81-
8256 if (!message) {
8357 // if no more messages then shut down
84- this.hypervisor.scheduler.done(this)
58+ this.hypervisor.scheduler.done(this.id)
8559 } else {
8660 message.fromPort.messages.shift()
61+ // if the message we recived had more ticks then we currently have the
62+ // update it
8763 if (message._fromTicks > this.ticks) {
8864 this.ticks = message._fromTicks
8965 }
9066 this.hypervisor.scheduler.update(this)
@@ -101,13 +77,12 @@
10177 */
10278 async run (message, init = false) {
10379 let result
10480 message.ports.forEach(port => this.ports._unboundPorts.add(port))
105- if (message.data === 'delete') {
81+ if (message.constructor === DeleteMessage) {
10682 this.ports._delete(message.fromName)
10783 } else {
10884 const method = init ? 'initailize' : 'run'
109-
11085 try {
11186 result = await this.container[method](message) || {}
11287 } catch (e) {
11388 result = {
@@ -163,8 +138,9 @@
163138 const id = port.destId
164139 const instance = await this.hypervisor.getInstance(id)
165140 instance.queue(port.destName, message)
166141 } else {
142+ // port is unbound
167143 port.destPort.messages.push(message)
168144 }
169145 }
170146 }
index.jsView
@@ -1,16 +1,18 @@
11 const Graph = require('ipld-graph-builder')
22 const Message = require('primea-message')
33 const ExoInterface = require('./exoInterface.js')
44 const Scheduler = require('./scheduler.js')
5+const DFSchecker = require('./dfsChecker.js')
56
67 const ROOT_ID = 'zdpuAm6aTdLVMUuiZypxkwtA7sKm7BWERy8MPbaCrFsmiyzxr'
78
89 module.exports = class Hypervisor {
910 /**
1011 * The Hypervisor manages the container instances by instantiating them and
1112 * destorying them when possible. It also facilitates localating Containers
1213 * @param {Graph} dag an instance of [ipfs.dag](https://github.com/ipfs/interface-ipfs-core/tree/master/API/dag#dag-api)
14+ * @param {object} state - the starting state
1315 */
1416 constructor (dag, state = {}) {
1517 this.graph = new Graph(dag)
1618 this.scheduler = new Scheduler()
@@ -27,16 +29,8 @@
2729 this._nodesToCheck.add(id)
2830 }
2931
3032 /**
31- * removes a potaintail node in the state graph to check for garbage collection
32- * @param {string} id
33- */
34- removeNodeToCheck (id) {
35- this._nodesToCheck.delete(id)
36- }
37-
38- /**
3933 * given a port, this finds the corridsponeding endpoint port of the channel
4034 * @param {object} port
4135 * @returns {Promise}
4236 */
@@ -119,19 +113,8 @@
119113 return exoInterface
120114 }
121115
122116 /**
123- * deletes container from the state
124- * @param {string} id
125- */
126- deleteInstance (id) {
127- if (id !== ROOT_ID) {
128- this._nodesToCheck.delete(id)
129- delete this.state[id]
130- }
131- }
132-
133- /**
134117 * creates a state root starting from a given container and a given number of
135118 * ticks
136119 * @param {Number} ticks the number of ticks at which to create the state root
137120 * @returns {Promise}
@@ -166,70 +149,4 @@
166149 async getHashFromObj (obj) {
167150 return (await this.graph.flush(obj))['/']
168151 }
169152 }
170-
171-// Implements a parrilizable DFS check for graph connictivity given a set of nodes
172-// and a root node. Stating for the set of node to check this does a DFS and
173-// will return a set a nodes if any that is not connected to the root node.
174-async function DFSchecker (graph, state, root, nodes) {
175- const checkedNodesSet = new Set()
176- let hasRootSet = new Set()
177- const promises = []
178-
179- for (const id of nodes) {
180- // create a set for each of the starting nodes to track the nodes the DFS has
181- // has traversed
182- const checkedNodes = new Set()
183- checkedNodesSet.add(checkedNodes)
184- promises.push(check(id, checkedNodes))
185- }
186-
187- // wait for all the search to complete
188- await Promise.all(promises)
189- // remove the set of nodes that are connected to the root
190- checkedNodesSet.delete(hasRootSet)
191- let unLinkedNodesArray = []
192-
193- // combine the unconnected sets into a single array
194- for (const set of checkedNodesSet) {
195- unLinkedNodesArray = unLinkedNodesArray.concat([...set])
196- }
197- return unLinkedNodesArray
198-
199- // does the DFS starting with a single node ID
200- async function check (id, checkedNodes) {
201- if (!checkedNodesSet.has(checkedNodes) || // check if this DFS is still searching
202- checkedNodes.has(id) || // check if this DFS has alread seen the node
203- hasRootSet === checkedNodes) { // check that this DFS has alread found the root node
204- return
205- }
206-
207- // check if any of the the other DFSs have seen this node and if so merge
208- // the sets and stop searching
209- for (const set of checkedNodesSet) {
210- if (set.has(id)) {
211- checkedNodes.forEach(id => set.add(id))
212- checkedNodesSet.delete(checkedNodes)
213- return
214- }
215- }
216-
217- // mark the node 'checked'
218- checkedNodes.add(id)
219-
220- // check to see if we are at the root
221- if (id === root) {
222- hasRootSet = checkedNodes
223- return
224- }
225-
226- const node = state[id]['/']
227- const promises = []
228- // iterate through the nodes ports and recursivly check them
229- for (const name in node.ports) {
230- const port = node.ports[name]
231- promises.push(check(port.destId, checkedNodes))
232- }
233- return Promise.all(promises)
234- }
235-}
portManager.jsView
@@ -1,6 +1,6 @@
11 const BN = require('bn.js')
2-const Message = require('primea-message')
2+const DeleteMessage = require('./deleteMessage')
33
44 // decides which message to go first
55 function messageArbiter (nameA, nameB) {
66 const a = this.ports[nameA].messages[0]
@@ -26,18 +26,16 @@
2626 * The port manager manages the the ports. This inculdes creation, deletion
2727 * fetching and waiting on ports
2828 * @param {Object} opts
2929 * @param {Object} opts.state
30- * @param {Object} opts.entryPort
31- * @param {Object} opts.parentPort
3230 * @param {Object} opts.hypervisor
3331 * @param {Object} opts.exoInterface
3432 */
3533 constructor (opts) {
3634 Object.assign(this, opts)
3735 this.ports = this.state.ports
36+ // tracks unbounded ports that we have
3837 this._unboundPorts = new Set()
39- this._waitingPorts = {}
4038 this._saturationPromise = new Promise((resolve, reject) => {
4139 this._saturationResolve = resolve
4240 })
4341 this._oldestMessagePromise = new Promise((resolve, reject) => {
@@ -56,10 +54,14 @@
5654 } else if (this.ports[name]) {
5755 throw new Error('cannot bind port to a name that is alread bound')
5856 } else {
5957 this._unboundPorts.delete(port)
60- this.hypervisor.removeNodeToCheck(this.id)
6158
59+ port.messages.forEach(message => {
60+ message._fromPort = port
61+ message.fromName = name
62+ })
63+
6264 // save the port instance
6365 this.ports[name] = port
6466
6567 // update the dest port
@@ -78,15 +80,15 @@
7880 async unbind (name) {
7981 const port = this.ports[name]
8082 delete this.ports[name]
8183 this._unboundPorts.add(port)
84+ this.hypervisor.addNodeToCheck(this.id)
8285
8386 // update the destination port
8487 const destPort = await this.hypervisor.getDestPort(port)
8588 delete destPort.destName
8689 delete destPort.destId
8790 destPort.destPort = port
88- this.hypervisor.addNodeToCheck(this.id)
8991 return port
9092 }
9193
9294 /**
@@ -94,11 +96,9 @@
9496 * @param {string} name
9597 */
9698 delete (name) {
9799 const port = this.ports[name]
98- this.exInterface.send(port, new Message({
99- data: 'delete'
100- }))
100+ this.exInterface.send(port, new DeleteMessage())
101101 this._delete(name)
102102 }
103103
104104 _delete (name) {
@@ -110,15 +110,13 @@
110110 * clears any unbounded ports referances
111111 */
112112 clearUnboundedPorts () {
113113 this._unboundPorts.forEach(port => {
114- this.exInterface.send(port, new Message({
115- data: 'delete'
116- }))
114+ this.exInterface.send(port, new DeleteMessage())
117115 })
118116 this._unboundPorts.clear()
119117 if (Object.keys(this.ports).length === 0) {
120- this.hypervisor.deleteInstance(this.id)
118+ this.hypervisor.addNodeToCheck(this.id)
121119 }
122120 }
123121
124122 /**
@@ -134,26 +132,28 @@
134132 * queues a message on a port
135133 * @param {Message} message
136134 */
137135 queue (name, message) {
138- if (name) {
139- const port = this.ports[name]
140- if (port.messages.push(message) === 1 && message._fromTicks < this._messageTickThreshold) {
141- message._fromPort = port
142- message.fromName = name
136+ const port = this.ports[name]
137+
138+ message._fromPort = port
139+ message.fromName = name
140+
141+ if (port.messages.push(message) === 1) {
142+ if (this._isSaturated()) {
143+ this._saturationResolve()
144+ this._saturationPromise = new Promise((resolve, reject) => {
145+ this._saturationResolve = resolve
146+ })
147+ }
148+
149+ if (message._fromTicks < this._messageTickThreshold) {
143150 this._oldestMessageResolve(message)
144151 this._oldestMessagePromise = new Promise((resolve, reject) => {
145152 this._oldestMessageResolve = resolve
146153 })
147154 this._messageTickThreshold = Infinity
148155 }
149-
150- if (this.isSaturated()) {
151- this._saturationResolve()
152- this._saturationPromise = new Promise((resolve, reject) => {
153- this._saturationResolve = resolve
154- })
155- }
156156 }
157157 }
158158
159159 /**
@@ -186,10 +186,9 @@
186186
187187 // create a new channel for the container
188188 const ports = this.createChannel()
189189 this._unboundPorts.delete(ports[1])
190- const promise = this.hypervisor.createInstance(type, data, [ports[1]], id)
191- this.exInterface._addWork(promise)
190+ this.hypervisor.createInstance(type, data, [ports[1]], id)
192191
193192 return ports[0]
194193 }
195194
@@ -212,41 +211,66 @@
212211 this._unboundPorts.add(port2)
213212 return [port1, port2]
214213 }
215214
216- /**
217- * find and returns the next message
218- * @returns {object}
219- */
220- peekNextMessage () {
215+ // find and returns the next message
216+ _peekNextMessage () {
221217 const names = Object.keys(this.ports)
222218 if (names.length) {
223219 const portName = names.reduce(messageArbiter.bind(this))
224220 const port = this.ports[portName]
225- const message = port.messages[0]
226-
227- if (message) {
228- message._fromPort = port
229- message.fromName = portName
230- return message
231- }
221+ return port.messages[0]
232222 }
233223 }
234224
235225 /**
236- * tests wether or not all the ports have a message
237- * @returns {boolean}
226+ * Waits for the the next message if any
227+ * @returns {Promise}
238228 */
239- isSaturated () {
229+ async getNextMessage () {
230+ let message = this._peekNextMessage()
231+ let saturated = this._isSaturated()
232+ let oldestTime = this.hypervisor.scheduler.oldest()
233+
234+ while (!saturated && // end if there are messages on all the ports
235+ // end if we have a message older then slowest containers
236+ !((message && oldestTime >= message._fromTicks) ||
237+ // end if there are no messages and this container is the oldest contaner
238+ (!message && oldestTime === this.exInterface.ticks))) {
239+ const ticksToWait = message ? message._fromTicks : this.exInterface.ticks
240+
241+ await Promise.race([
242+ this.hypervisor.scheduler.wait(ticksToWait, this.id).then(() => {
243+ message = this._peekNextMessage()
244+ }),
245+ this._olderMessage(message).then(m => {
246+ message = m
247+ }),
248+ this._whenSaturated().then(() => {
249+ saturated = true
250+ message = this._peekNextMessage()
251+ })
252+ ])
253+
254+ oldestTime = this.hypervisor.scheduler.oldest()
255+ }
256+ return message
257+ }
258+
259+ // tests wether or not all the ports have a message
260+ _isSaturated () {
240261 const keys = Object.keys(this.ports)
241262 return keys.length ? keys.every(name => this.ports[name].messages.length) : 0
242263 }
243264
244- whenSaturated () {
265+ // returns a promise that resolve when the ports are saturated
266+ _whenSaturated () {
245267 return this._saturationPromise
246268 }
247269
248- olderMessage (message) {
270+ // returns a promise that resolve when a message older then the given message
271+ // is recived
272+ _olderMessage (message) {
249273 this._messageTickThreshold = message ? message._fromTicks : 0
250274 return this._oldestMessagePromise
251275 }
252276 }
scheduler.jsView
@@ -1,22 +1,23 @@
11 const binarySearchInsert = require('binary-search-insert')
22
3-const comparator = function (a, b) {
4- return a.ticks - b.ticks
5-}
6-
7-const instancesComparator = function (a, b) {
8- return a[1].ticks - b[1].ticks
9-}
10-
113 module.exports = class Scheduler {
4+ /**
5+ * The Sceduler manages the run cycle of the containes and figures out which
6+ * order they should run in
7+ */
128 constructor () {
139 this._waits = []
1410 this._running = new Set()
1511 this._loadingInstances = new Map()
1612 this.instances = new Map()
1713 }
1814
15+ /**
16+ * locks the scheduler from clearing waits untill the lock is resolved
17+ * @param {string} id
18+ * @return {function} the resolve function to call once it to unlock
19+ */
1920 getLock (id) {
2021 let r
2122 const promise = new Promise((resolve, reject) => {
2223 r = resolve
@@ -27,31 +28,56 @@
2728 this._loadingInstances.set(id, promise)
2829 return r
2930 }
3031
32+ /**
33+ * updates an instance with a new tick count
34+ * @param {Object} instance - a container instance
35+ */
3136 update (instance) {
3237 this._update(instance)
3338 this._checkWaits()
3439 }
3540
3641 _update (instance) {
3742 this._running.add(instance.id)
43+ // sorts the container instance map by tick count
3844 this.instances.delete(instance.id)
3945 const instanceArray = [...this.instances]
40- binarySearchInsert(instanceArray, instancesComparator, [instance.id, instance])
46+ binarySearchInsert(instanceArray, comparator, [instance.id, instance])
4147 this.instances = new Map(instanceArray)
48+
49+ function comparator (a, b) {
50+ return a[1].ticks - b[1].ticks
51+ }
4252 }
4353
54+ /**
55+ * returns a container
56+ * @param {string} id
57+ * @return {object}
58+ */
4459 getInstance (id) {
4560 return this.instances.get(id) || this._loadingInstances.get(id)
4661 }
4762
48- done (instance) {
49- this._running.delete(instance.id)
50- this.instances.delete(instance.id)
63+ /**
64+ * deletes an instance from the scheduler
65+ * @param {string} id - the containers id
66+ */
67+ done (id) {
68+ this._running.delete(id)
69+ this.instances.delete(id)
5170 this._checkWaits()
5271 }
5372
73+ /**
74+ * returns a promise that resolves once all containers have reached the given
75+ * number of ticks
76+ * @param {interger} ticks - the number of ticks to wait
77+ * @param {string} id - optional id of the container that is waiting
78+ * @return {Promise}
79+ */
5480 wait (ticks = Infinity, id) {
5581 this._running.delete(id)
5682 return new Promise((resolve, reject) => {
5783 binarySearchInsert(this._waits, comparator, {
@@ -59,38 +85,51 @@
5985 resolve: resolve
6086 })
6187 this._checkWaits()
6288 })
89+
90+ function comparator (a, b) {
91+ return a.ticks - b.ticks
92+ }
6393 }
6494
65- smallest () {
66- return this.instances.size ? [...this.instances][0][1].ticks : 0
95+ /**
96+ * returns the oldest container's ticks
97+ * @return {integer}
98+ */
99+ oldest () {
100+ const nextValue = this.instances.values().next().value
101+ return nextValue ? nextValue.ticks : 0
67102 }
68103
104+ // checks outstanding waits to see if they can be resolved
69105 _checkWaits () {
70106 if (!this._loadingInstances.size) {
71107 // if there are no running containers
72- if (!this.isRunning()) {
108+ if (!this.instances.size) {
73109 // clear any remanding waits
74110 this._waits.forEach(wait => wait.resolve())
75111 this._waits = []
76112 } else if (!this._running.size) {
77- const smallest = this._waits[0].ticks
113+ // if there are no containers running find the oldest wait and update
114+ // the oldest containers to it ticks
115+ const oldest = this._waits[0].ticks
78116 for (let instance of this.instances) {
79117 instance = instance[1]
80- if (instance.ticks > smallest) {
118+ if (instance.ticks > oldest) {
81119 break
82120 } else {
83- instance.ticks = smallest
121+ instance.ticks = oldest
84122 this._update(instance)
85123 }
86124 }
87125 return this._checkWaits()
88126 } else {
89- const smallest = this.smallest()
127+ // find the old container and see if to can resolve any of the waits
128+ const oldest = this.oldest()
90129 for (const index in this._waits) {
91130 const wait = this._waits[index]
92- if (wait.ticks <= smallest) {
131+ if (wait.ticks <= oldest) {
93132 wait.resolve()
94133 } else {
95134 this._waits.splice(0, index)
96135 break
@@ -98,9 +137,5 @@
98137 }
99138 }
100139 }
101140 }
102-
103- isRunning () {
104- return this.instances.size
105- }
106141 }
tests/index.jsView
@@ -46,8 +46,29 @@
4646 const stateRoot = await hypervisor.createStateRoot(Infinity)
4747 t.deepEquals(stateRoot, expectedState, 'expected root!')
4848 })
4949
50+ tape('basic - do not store containers with no ports bound', async t => {
51+ t.plan(1)
52+ const expectedState = {
53+ '/': 'zdpuAx5LRRwTgzPipKEPgh7MHUKu4Pd1BYjDqBcf9whgzvrqf'
54+ }
55+
56+ class testVMContainer extends BaseContainer {
57+ initailize () {}
58+ }
59+
60+ const hypervisor = new Hypervisor(node.dag)
61+ hypervisor.registerContainer('test', testVMContainer)
62+
63+ const root = await hypervisor.createInstance('test')
64+ const port = root.ports.create('test')
65+ root.ports.bind('one', port)
66+
67+ const stateRoot = await hypervisor.createStateRoot(Infinity)
68+ t.deepEquals(stateRoot, expectedState, 'expected root!')
69+ })
70+
5071 tape('one child contract with saturated ports', async t => {
5172 t.plan(2)
5273 let message
5374 const expectedState = {
deleteMessage.jsView
@@ -1,0 +1,3 @@
1+const Message = require('primea-message')
2+
3+module.exports = class DeleteMessage extends Message {}
dfsChecker.jsView
@@ -1,0 +1,71 @@
1+/**
2+ * Implements a parrilizable DFS check for graph connictivity given a set of nodes
3+ * and a root node. Stating for the set of node to check this does a DFS and
4+ * will return a set a nodes if any that is not connected to the root node.
5+ * @param {object} graph - an instance of ipld-graph-builder
6+ * @param {object} state - the state containing all of the containers to search
7+ * @param {string} root - the root id
8+ * @param {Set} nodes - a set of nodes to start searching from
9+ */
10+module.exports = async function DFSchecker (graph, state, root, nodes) {
11+ const checkedNodesSet = new Set()
12+ let hasRootSet = new Set()
13+ const promises = []
14+
15+ for (const id of nodes) {
16+ // create a set for each of the starting nodes to track the nodes the DFS has
17+ // has traversed
18+ const checkedNodes = new Set()
19+ checkedNodesSet.add(checkedNodes)
20+ promises.push(check(id, checkedNodes))
21+ }
22+
23+ // wait for all the search to complete
24+ await Promise.all(promises)
25+ // remove the set of nodes that are connected to the root
26+ checkedNodesSet.delete(hasRootSet)
27+ let unLinkedNodesArray = []
28+
29+ // combine the unconnected sets into a single array
30+ for (const set of checkedNodesSet) {
31+ unLinkedNodesArray = unLinkedNodesArray.concat([...set])
32+ }
33+ return unLinkedNodesArray
34+
35+ // does the DFS starting with a single node ID
36+ async function check (id, checkedNodes) {
37+ if (!checkedNodesSet.has(checkedNodes) || // check if this DFS is still searching
38+ checkedNodes.has(id) || // check if this DFS has alread seen the node
39+ hasRootSet === checkedNodes) { // check that this DFS has alread found the root node
40+ return
41+ }
42+
43+ // check if any of the the other DFSs have seen this node and if so merge
44+ // the sets and stop searching
45+ for (const set of checkedNodesSet) {
46+ if (set.has(id)) {
47+ checkedNodes.forEach(id => set.add(id))
48+ checkedNodesSet.delete(checkedNodes)
49+ return
50+ }
51+ }
52+
53+ // mark the node 'checked'
54+ checkedNodes.add(id)
55+
56+ // check to see if we are at the root
57+ if (id === root) {
58+ hasRootSet = checkedNodes
59+ return
60+ }
61+
62+ const node = state[id]['/']
63+ const promises = []
64+ // iterate through the nodes ports and recursivly check them
65+ for (const name in node.ports) {
66+ const port = node.ports[name]
67+ promises.push(check(port.destId, checkedNodes))
68+ }
69+ return Promise.all(promises)
70+ }
71+}

Built with git-ssb-web