Files: 46248da0b2e047c143b387676430c8d2d27cf4a3 / index.js
6917 bytesRaw
1 | const Graph = require('ipld-graph-builder') |
2 | const Message = require('primea-message') |
3 | const ExoInterface = require('./exoInterface.js') |
4 | const Scheduler = require('./scheduler.js') |
5 | |
6 | const ROOT_ID = 'zdpuAm6aTdLVMUuiZypxkwtA7sKm7BWERy8MPbaCrFsmiyzxr' |
7 | |
8 | module.exports = class Hypervisor { |
9 | /** |
10 | * The Hypervisor manages the container instances by instantiating them and |
11 | * destorying them when possible. It also facilitates localating Containers |
12 | * @param {Graph} dag an instance of [ipfs.dag](https://github.com/ipfs/interface-ipfs-core/tree/master/API/dag#dag-api) |
13 | */ |
14 | constructor (dag, state = {}) { |
15 | this.graph = new Graph(dag) |
16 | this.scheduler = new Scheduler() |
17 | this.state = state |
18 | this._containerTypes = {} |
19 | this._nodesToCheck = new Set() |
20 | } |
21 | |
22 | /** |
23 | * add a potaintail node in the state graph to check for garbage collection |
24 | * @param {string} id |
25 | */ |
26 | addNodeToCheck (id) { |
27 | this._nodesToCheck.add(id) |
28 | } |
29 | |
30 | /** |
31 | * removes a potaintail node in the state graph to check for garbage collection |
32 | * @param {string} id |
33 | */ |
34 | removeNodeToCheck (id) { |
35 | this._nodesToCheck.delete(id) |
36 | } |
37 | |
38 | /** |
39 | * given a port, this finds the corridsponeding endpoint port of the channel |
40 | * @param {object} port |
41 | * @returns {Promise} |
42 | */ |
43 | getDestPort (port) { |
44 | if (port.destPort) { |
45 | return port.destPort |
46 | } else { |
47 | return this.graph.get(this.state, `${port.destId}/ports/${port.destName}`) |
48 | } |
49 | } |
50 | |
51 | // loads an instance of a container from the state |
52 | async _loadInstance (id, lock) { |
53 | const state = await this.graph.get(this.state, id) |
54 | const container = this._containerTypes[state.type] |
55 | |
56 | // create a new kernel instance |
57 | const exoInterface = new ExoInterface({ |
58 | hypervisor: this, |
59 | state: state, |
60 | container: container, |
61 | id: id |
62 | }) |
63 | |
64 | // save the newly created instance |
65 | this.scheduler.update(exoInterface) |
66 | return exoInterface |
67 | } |
68 | |
69 | /** |
70 | * gets an existsing container instances |
71 | * @param {string} id - the containers ID |
72 | * @returns {Promise} |
73 | */ |
74 | getInstance (id) { |
75 | let instance = this.scheduler.getInstance(id) |
76 | if (instance) { |
77 | return instance |
78 | } else { |
79 | const promise = this._loadInstance(id, id) |
80 | promise.then(() => { |
81 | this.scheduler._loadingInstances.delete(id) |
82 | this.scheduler.releaseLock(id) |
83 | }) |
84 | this.scheduler.getLock(id, promise) |
85 | this.scheduler._loadingInstances.set(id, promise) |
86 | return promise |
87 | } |
88 | } |
89 | |
90 | /** |
91 | * creates an new container instances and save it in the state |
92 | * @param {string} type - the type of container to create |
93 | * @param {*} code |
94 | * @param {array} entryPorts |
95 | * @param {object} id |
96 | * @param {object} id.nonce |
97 | * @param {object} id.parent |
98 | * @returns {Promise} |
99 | */ |
100 | async createInstance (type, code, entryPorts = [], id = {nonce: 0, parent: null}) { |
101 | // create a lock to prevent the scheduler from reloving waits before the |
102 | // new container is loaded |
103 | this.scheduler.getLock(id) |
104 | const idHash = await this.getHashFromObj(id) |
105 | const state = { |
106 | nonce: [0], |
107 | ports: {}, |
108 | type: type, |
109 | code: code |
110 | } |
111 | |
112 | // save the container in the state |
113 | await this.graph.set(this.state, idHash, state) |
114 | // create the container instance |
115 | const exoInterface = await this._loadInstance(idHash, id) |
116 | |
117 | this.scheduler.releaseLock(id) |
118 | // send the intialization message |
119 | exoInterface.queue(null, new Message({ |
120 | ports: entryPorts |
121 | })) |
122 | |
123 | return exoInterface |
124 | } |
125 | |
126 | /** |
127 | * deletes container from the state |
128 | * @param {string} id |
129 | */ |
130 | deleteInstance (id) { |
131 | if (id !== ROOT_ID) { |
132 | this._nodesToCheck.delete(id) |
133 | delete this.state[id] |
134 | } |
135 | } |
136 | |
137 | /** |
138 | * creates a state root starting from a given container and a given number of |
139 | * ticks |
140 | * @param {Number} ticks the number of ticks at which to create the state root |
141 | * @returns {Promise} |
142 | */ |
143 | async createStateRoot (ticks) { |
144 | await this.scheduler.wait(ticks) |
145 | const unlinked = await DFSchecker(this.graph, this.state, ROOT_ID, this._nodesToCheck) |
146 | unlinked.forEach(id => { |
147 | delete this.state[id] |
148 | }) |
149 | return this.graph.flush(this.state) |
150 | } |
151 | |
152 | /** |
153 | * regirsters a container with the hypervisor |
154 | * @param {String} type - the name of the type |
155 | * @param {Class} Constructor - a Class for instantiating the container |
156 | * @param {*} args - any args that the contructor takes |
157 | */ |
158 | registerContainer (type, Constructor, args) { |
159 | this._containerTypes[type] = { |
160 | Constructor: Constructor, |
161 | args: args |
162 | } |
163 | } |
164 | |
165 | /** |
166 | * get a hash from a POJO |
167 | * @param {object} obj |
168 | * @return {Promise} |
169 | */ |
170 | async getHashFromObj (obj) { |
171 | return (await this.graph.flush(obj))['/'] |
172 | } |
173 | } |
174 | |
175 | // Implements a parrilizable DFS check for graph connictivity given a set of nodes |
176 | // and a root node. Stating for the set of node to check this does a DFS and |
177 | // will return a set a nodes if any that is not connected to the root node. |
178 | async function DFSchecker (graph, state, root, nodes) { |
179 | const checkedNodesSet = new Set() |
180 | let hasRootSet = new Set() |
181 | const promises = [] |
182 | |
183 | for (const id of nodes) { |
184 | // create a set for each of the starting nodes to track the nodes the DFS has |
185 | // has traversed |
186 | const checkedNodes = new Set() |
187 | checkedNodesSet.add(checkedNodes) |
188 | promises.push(check(id, checkedNodes)) |
189 | } |
190 | |
191 | // wait for all the search to complete |
192 | await Promise.all(promises) |
193 | // remove the set of nodes that are connected to the root |
194 | checkedNodesSet.delete(hasRootSet) |
195 | let unLinkedNodesArray = [] |
196 | |
197 | // combine the unconnected sets into a single array |
198 | for (const set of checkedNodesSet) { |
199 | unLinkedNodesArray = unLinkedNodesArray.concat([...set]) |
200 | } |
201 | return unLinkedNodesArray |
202 | |
203 | // does the DFS starting with a single node ID |
204 | async function check (id, checkedNodes) { |
205 | if (!checkedNodesSet.has(checkedNodes) || // check if this DFS is still searching |
206 | checkedNodes.has(id) || // check if this DFS has alread seen the node |
207 | hasRootSet === checkedNodes) { // check that this DFS has alread found the root node |
208 | return |
209 | } |
210 | |
211 | // check if any of the the other DFSs have seen this node and if so merge |
212 | // the sets and stop searching |
213 | for (const set of checkedNodesSet) { |
214 | if (set.has(id)) { |
215 | checkedNodes.forEach(id => set.add(id)) |
216 | checkedNodesSet.delete(checkedNodes) |
217 | return |
218 | } |
219 | } |
220 | |
221 | // mark the node 'checked' |
222 | checkedNodes.add(id) |
223 | |
224 | // check to see if we are at the root |
225 | if (id === root) { |
226 | hasRootSet = checkedNodes |
227 | return |
228 | } |
229 | |
230 | const node = state[id]['/'] |
231 | const promises = [] |
232 | // iterate through the nodes ports and recursivly check them |
233 | for (const name in node.ports) { |
234 | const port = node.ports[name] |
235 | promises.push(check(port.destId, checkedNodes)) |
236 | } |
237 | return Promise.all(promises) |
238 | } |
239 | } |
240 |
Built with git-ssb-web