git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 864bc27090fe788a4bf5a86c00246ca6e4391dd0

Files: 864bc27090fe788a4bf5a86c00246ca6e4391dd0 / index.js

6742 bytesRaw
1const Graph = require('ipld-graph-builder')
2const Message = require('primea-message')
3const ExoInterface = require('./exoInterface.js')
4const Scheduler = require('./scheduler.js')
5
6const ROOT_ID = 'zdpuAm6aTdLVMUuiZypxkwtA7sKm7BWERy8MPbaCrFsmiyzxr'
7
8module.exports = class Hypervisor {
9 /**
10 * The Hypervisor manages the container instances by instantiating them and
11 * destorying them when possible. It also facilitates localating Containers
12 * @param {Graph} dag an instance of [ipfs.dag](https://github.com/ipfs/interface-ipfs-core/tree/master/API/dag#dag-api)
13 */
14 constructor (dag, state = {}) {
15 this.graph = new Graph(dag)
16 this.scheduler = new Scheduler()
17 this.state = state
18 this._containerTypes = {}
19 this._nodesToCheck = new Set()
20 }
21
22 /**
23 * add a potaintail node in the state graph to check for garbage collection
24 * @param {string} id
25 */
26 addNodeToCheck (id) {
27 this._nodesToCheck.add(id)
28 }
29
30 /**
31 * removes a potaintail node in the state graph to check for garbage collection
32 * @param {string} id
33 */
34 removeNodeToCheck (id) {
35 this._nodesToCheck.delete(id)
36 }
37
38 /**
39 * given a port, this finds the corridsponeding endpoint port of the channel
40 * @param {object} port
41 * @returns {Promise}
42 */
43 getDestPort (port) {
44 if (port.destPort) {
45 return port.destPort
46 } else {
47 return this.graph.get(this.state, `${port.destId}/ports/${port.destName}`)
48 }
49 }
50
51 // loads an instance of a container from the state
52 async _loadInstance (id, lock) {
53 const state = await this.graph.get(this.state, id)
54 const container = this._containerTypes[state.type]
55
56 // create a new kernel instance
57 const exoInterface = new ExoInterface({
58 hypervisor: this,
59 state: state,
60 container: container,
61 id: id
62 })
63
64 // save the newly created instance
65 this.scheduler.update(exoInterface)
66 this.scheduler.releaseLock(lock)
67 return exoInterface
68 }
69
70 /**
71 * gets an existsing container instances
72 * @param {string} id - the containers ID
73 * @returns {Promise}
74 */
75 async getInstance (id) {
76 let instance = this.scheduler.getInstance(id)
77 if (instance) {
78 return instance
79 } else {
80 const lock = this.scheduler.getLock()
81 instance = await this._loadInstance(id, lock)
82 return instance
83 }
84 }
85
86 /**
87 * creates an new container instances and save it in the state
88 * @param {string} type - the type of container to create
89 * @param {*} code
90 * @param {array} entryPorts
91 * @param {object} id
92 * @param {object} id.nonce
93 * @param {object} id.parent
94 * @returns {Promise}
95 */
96 async createInstance (type, code, entryPorts = [], id = {nonce: 0, parent: null}) {
97 // create a lock to prevent the scheduler from reloving waits before the
98 // new container is loaded
99 const lock = this.scheduler.getLock()
100 id = await this.getHashFromObj(id)
101 const state = {
102 nonce: [0],
103 ports: {},
104 type: type,
105 code: code
106 }
107
108 // save the container in the state
109 await this.graph.set(this.state, id, state)
110 // create the container instance
111 const exoInterface = await this._loadInstance(id, lock)
112 // send the intialization message
113 exoInterface.queue(null, new Message({
114 ports: entryPorts
115 }))
116
117 return exoInterface
118 }
119
120 /**
121 * deletes container from the state
122 * @param {string} id
123 */
124 deleteInstance (id) {
125 if (id !== ROOT_ID) {
126 this._nodesToCheck.delete(id)
127 delete this.state[id]
128 }
129 }
130
131 /**
132 * creates a state root starting from a given container and a given number of
133 * ticks
134 * @param {Number} ticks the number of ticks at which to create the state root
135 * @returns {Promise}
136 */
137 async createStateRoot (ticks) {
138 await this.scheduler.wait(ticks)
139 const unlinked = await DFSchecker(this.graph, this.state, ROOT_ID, this._nodesToCheck)
140 unlinked.forEach(id => {
141 delete this.state[id]
142 })
143 return this.graph.flush(this.state)
144 }
145
146 /**
147 * regirsters a container with the hypervisor
148 * @param {String} type - the name of the type
149 * @param {Class} Constructor - a Class for instantiating the container
150 * @param {*} args - any args that the contructor takes
151 */
152 registerContainer (type, Constructor, args) {
153 this._containerTypes[type] = {
154 Constructor: Constructor,
155 args: args
156 }
157 }
158
159 /**
160 * get a hash from a POJO
161 * @param {object} obj
162 * @return {Promise}
163 */
164 async getHashFromObj (obj) {
165 return (await this.graph.flush(obj))['/']
166 }
167}
168
169// Implements a parrilizable DFS check for graph connictivity given a set of nodes
170// and a root node. Stating for the set of node to check this does a DFS and
171// will return a set a nodes if any that is not connected to the root node.
172async function DFSchecker (graph, state, root, nodes) {
173 const checkedNodesSet = new Set()
174 let hasRootSet = new Set()
175 const promises = []
176
177 for (const id of nodes) {
178 // create a set for each of the starting nodes to track the nodes the DFS has
179 // has traversed
180 const checkedNodes = new Set()
181 checkedNodesSet.add(checkedNodes)
182 promises.push(check(id, checkedNodes))
183 }
184
185 // wait for all the search to complete
186 await Promise.all(promises)
187 // remove the set of nodes that are connected to the root
188 checkedNodesSet.delete(hasRootSet)
189 let unLinkedNodesArray = []
190
191 // combine the unconnected sets into a single array
192 for (const set of checkedNodesSet) {
193 unLinkedNodesArray = unLinkedNodesArray.concat([...set])
194 }
195 return unLinkedNodesArray
196
197 // does the DFS starting with a single node ID
198 async function check (id, checkedNodes) {
199 if (!checkedNodesSet.has(checkedNodes) || // check if this DFS is still searching
200 checkedNodes.has(id) || // check if this DFS has alread seen the node
201 hasRootSet === checkedNodes) { // check that this DFS has alread found the root node
202 return
203 }
204
205 // check if any of the the other DFSs have seen this node and if so merge
206 // the sets and stop searching
207 for (const set of checkedNodesSet) {
208 if (set.has(id)) {
209 checkedNodes.forEach(id => set.add(id))
210 checkedNodesSet.delete(checkedNodes)
211 return
212 }
213 }
214
215 // mark the node 'checked'
216 checkedNodes.add(id)
217
218 // check to see if we are at the root
219 if (id === root) {
220 hasRootSet = checkedNodes
221 return
222 }
223
224 const node = state[id]['/']
225 const promises = []
226 // iterate through the nodes ports and recursivly check them
227 for (const name in node.ports) {
228 const port = node.ports[name]
229 promises.push(check(port.destId, checkedNodes))
230 }
231 return Promise.all(promises)
232 }
233}
234

Built with git-ssb-web