git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 46248da0b2e047c143b387676430c8d2d27cf4a3

Files: 46248da0b2e047c143b387676430c8d2d27cf4a3 / index.js

6917 bytesRaw
1const Graph = require('ipld-graph-builder')
2const Message = require('primea-message')
3const ExoInterface = require('./exoInterface.js')
4const Scheduler = require('./scheduler.js')
5
6const ROOT_ID = 'zdpuAm6aTdLVMUuiZypxkwtA7sKm7BWERy8MPbaCrFsmiyzxr'
7
8module.exports = class Hypervisor {
9 /**
10 * The Hypervisor manages the container instances by instantiating them and
11 * destorying them when possible. It also facilitates localating Containers
12 * @param {Graph} dag an instance of [ipfs.dag](https://github.com/ipfs/interface-ipfs-core/tree/master/API/dag#dag-api)
13 */
14 constructor (dag, state = {}) {
15 this.graph = new Graph(dag)
16 this.scheduler = new Scheduler()
17 this.state = state
18 this._containerTypes = {}
19 this._nodesToCheck = new Set()
20 }
21
22 /**
23 * add a potaintail node in the state graph to check for garbage collection
24 * @param {string} id
25 */
26 addNodeToCheck (id) {
27 this._nodesToCheck.add(id)
28 }
29
30 /**
31 * removes a potaintail node in the state graph to check for garbage collection
32 * @param {string} id
33 */
34 removeNodeToCheck (id) {
35 this._nodesToCheck.delete(id)
36 }
37
38 /**
39 * given a port, this finds the corridsponeding endpoint port of the channel
40 * @param {object} port
41 * @returns {Promise}
42 */
43 getDestPort (port) {
44 if (port.destPort) {
45 return port.destPort
46 } else {
47 return this.graph.get(this.state, `${port.destId}/ports/${port.destName}`)
48 }
49 }
50
51 // loads an instance of a container from the state
52 async _loadInstance (id, lock) {
53 const state = await this.graph.get(this.state, id)
54 const container = this._containerTypes[state.type]
55
56 // create a new kernel instance
57 const exoInterface = new ExoInterface({
58 hypervisor: this,
59 state: state,
60 container: container,
61 id: id
62 })
63
64 // save the newly created instance
65 this.scheduler.update(exoInterface)
66 return exoInterface
67 }
68
69 /**
70 * gets an existsing container instances
71 * @param {string} id - the containers ID
72 * @returns {Promise}
73 */
74 getInstance (id) {
75 let instance = this.scheduler.getInstance(id)
76 if (instance) {
77 return instance
78 } else {
79 const promise = this._loadInstance(id, id)
80 promise.then(() => {
81 this.scheduler._loadingInstances.delete(id)
82 this.scheduler.releaseLock(id)
83 })
84 this.scheduler.getLock(id, promise)
85 this.scheduler._loadingInstances.set(id, promise)
86 return promise
87 }
88 }
89
90 /**
91 * creates an new container instances and save it in the state
92 * @param {string} type - the type of container to create
93 * @param {*} code
94 * @param {array} entryPorts
95 * @param {object} id
96 * @param {object} id.nonce
97 * @param {object} id.parent
98 * @returns {Promise}
99 */
100 async createInstance (type, code, entryPorts = [], id = {nonce: 0, parent: null}) {
101 // create a lock to prevent the scheduler from reloving waits before the
102 // new container is loaded
103 this.scheduler.getLock(id)
104 const idHash = await this.getHashFromObj(id)
105 const state = {
106 nonce: [0],
107 ports: {},
108 type: type,
109 code: code
110 }
111
112 // save the container in the state
113 await this.graph.set(this.state, idHash, state)
114 // create the container instance
115 const exoInterface = await this._loadInstance(idHash, id)
116
117 this.scheduler.releaseLock(id)
118 // send the intialization message
119 exoInterface.queue(null, new Message({
120 ports: entryPorts
121 }))
122
123 return exoInterface
124 }
125
126 /**
127 * deletes container from the state
128 * @param {string} id
129 */
130 deleteInstance (id) {
131 if (id !== ROOT_ID) {
132 this._nodesToCheck.delete(id)
133 delete this.state[id]
134 }
135 }
136
137 /**
138 * creates a state root starting from a given container and a given number of
139 * ticks
140 * @param {Number} ticks the number of ticks at which to create the state root
141 * @returns {Promise}
142 */
143 async createStateRoot (ticks) {
144 await this.scheduler.wait(ticks)
145 const unlinked = await DFSchecker(this.graph, this.state, ROOT_ID, this._nodesToCheck)
146 unlinked.forEach(id => {
147 delete this.state[id]
148 })
149 return this.graph.flush(this.state)
150 }
151
152 /**
153 * regirsters a container with the hypervisor
154 * @param {String} type - the name of the type
155 * @param {Class} Constructor - a Class for instantiating the container
156 * @param {*} args - any args that the contructor takes
157 */
158 registerContainer (type, Constructor, args) {
159 this._containerTypes[type] = {
160 Constructor: Constructor,
161 args: args
162 }
163 }
164
165 /**
166 * get a hash from a POJO
167 * @param {object} obj
168 * @return {Promise}
169 */
170 async getHashFromObj (obj) {
171 return (await this.graph.flush(obj))['/']
172 }
173}
174
175// Implements a parrilizable DFS check for graph connictivity given a set of nodes
176// and a root node. Stating for the set of node to check this does a DFS and
177// will return a set a nodes if any that is not connected to the root node.
178async function DFSchecker (graph, state, root, nodes) {
179 const checkedNodesSet = new Set()
180 let hasRootSet = new Set()
181 const promises = []
182
183 for (const id of nodes) {
184 // create a set for each of the starting nodes to track the nodes the DFS has
185 // has traversed
186 const checkedNodes = new Set()
187 checkedNodesSet.add(checkedNodes)
188 promises.push(check(id, checkedNodes))
189 }
190
191 // wait for all the search to complete
192 await Promise.all(promises)
193 // remove the set of nodes that are connected to the root
194 checkedNodesSet.delete(hasRootSet)
195 let unLinkedNodesArray = []
196
197 // combine the unconnected sets into a single array
198 for (const set of checkedNodesSet) {
199 unLinkedNodesArray = unLinkedNodesArray.concat([...set])
200 }
201 return unLinkedNodesArray
202
203 // does the DFS starting with a single node ID
204 async function check (id, checkedNodes) {
205 if (!checkedNodesSet.has(checkedNodes) || // check if this DFS is still searching
206 checkedNodes.has(id) || // check if this DFS has alread seen the node
207 hasRootSet === checkedNodes) { // check that this DFS has alread found the root node
208 return
209 }
210
211 // check if any of the the other DFSs have seen this node and if so merge
212 // the sets and stop searching
213 for (const set of checkedNodesSet) {
214 if (set.has(id)) {
215 checkedNodes.forEach(id => set.add(id))
216 checkedNodesSet.delete(checkedNodes)
217 return
218 }
219 }
220
221 // mark the node 'checked'
222 checkedNodes.add(id)
223
224 // check to see if we are at the root
225 if (id === root) {
226 hasRootSet = checkedNodes
227 return
228 }
229
230 const node = state[id]['/']
231 const promises = []
232 // iterate through the nodes ports and recursivly check them
233 for (const name in node.ports) {
234 const port = node.ports[name]
235 promises.push(check(port.destId, checkedNodes))
236 }
237 return Promise.all(promises)
238 }
239}
240

Built with git-ssb-web