Files: add087e94722a8b77695d21eca754a562d7e79f5 / index.js
5787 bytesRaw
1 | const Tree = require('merkle-radix-tree') |
2 | const Graph = require('ipld-graph-builder') |
3 | const chunk = require('chunk') |
4 | const Message = require('primea-message') |
5 | const Kernel = require('./kernel.js') |
6 | const Scheduler = require('./scheduler.js') |
7 | const DFSchecker = require('./dfsChecker.js') |
8 | |
9 | module.exports = class Hypervisor { |
10 | /** |
11 | * The Hypervisor manages the container instances by instantiating them and |
12 | * destorying them when possible. It also facilitates localating Containers |
13 | * @param {Graph} dag an instance of [ipfs.dag](https://github.com/ipfs/interface-ipfs-core/tree/master/API/dag#dag-api) |
14 | * @param {object} state - the starting state |
15 | */ |
16 | constructor (dag, state = {'/': Tree.emptyTreeState}) { |
17 | this.graph = new Graph(dag) |
18 | this.tree = new Tree({ |
19 | graph: this.graph, |
20 | root: state |
21 | }) |
22 | this.scheduler = new Scheduler() |
23 | this.state = state |
24 | this._containerTypes = {} |
25 | this._nodesToCheck = new Set() |
26 | |
27 | this.ROOT_ID = 'zdpuAm6aTdLVMUuiZypxkwtA7sKm7BWERy8MPbaCrFsmiyzxr' |
28 | this.MAX_DATA_BYTES = 65533 |
29 | } |
30 | |
31 | /** |
32 | * add a potaintail node in the state graph to check for garbage collection |
33 | * @param {string} id |
34 | */ |
35 | addNodeToCheck (id) { |
36 | this._nodesToCheck.add(id) |
37 | } |
38 | |
39 | /** |
40 | * given a port, this finds the corridsponeding endpoint port of the channel |
41 | * @param {object} port |
42 | * @returns {Promise} |
43 | */ |
44 | async getDestPort (port) { |
45 | if (port.destPort) { |
46 | return port.destPort |
47 | } else { |
48 | const containerState = await this.tree.get(port.destId) |
49 | return this.graph.get(containerState, `ports/${port.destName}`) |
50 | } |
51 | } |
52 | |
53 | async send (port, message) { |
54 | if (port.destId) { |
55 | const id = port.destId |
56 | const instance = await this.getInstance(id) |
57 | instance.queue(port.destName, message) |
58 | } else { |
59 | // port is unbound |
60 | port.destPort.messages.push(message) |
61 | } |
62 | } |
63 | |
64 | // loads an instance of a container from the state |
65 | async _loadInstance (id, state) { |
66 | if (!state) { |
67 | state = await this.tree.get(id) |
68 | } |
69 | const container = this._containerTypes[state.type] |
70 | let code |
71 | |
72 | // checks if the code stored in the state is an array and that the elements |
73 | // are merkle link |
74 | if (state.code && state.code[0]['/']) { |
75 | await this.graph.tree(state.code, 1) |
76 | code = state.code.map(a => a['/']).reduce((a, b) => a + b) |
77 | } else { |
78 | code = state.code |
79 | } |
80 | |
81 | // create a new kernel instance |
82 | const kernel = new Kernel({ |
83 | hypervisor: this, |
84 | state: state, |
85 | code: code, |
86 | container: container, |
87 | id: id |
88 | }) |
89 | |
90 | // save the newly created instance |
91 | this.scheduler.update(kernel) |
92 | return kernel |
93 | } |
94 | |
95 | // get a hash from a POJO |
96 | _getHashFromObj (obj) { |
97 | return this.graph.flush(obj).then(obj => obj['/']) |
98 | } |
99 | |
100 | /** |
101 | * gets an existsing container instances |
102 | * @param {string} id - the containers ID |
103 | * @returns {Promise} |
104 | */ |
105 | async getInstance (id) { |
106 | let instance = this.scheduler.getInstance(id) |
107 | if (instance) { |
108 | return instance |
109 | } else { |
110 | const resolve = this.scheduler.getLock(id) |
111 | const instance = await this._loadInstance(id) |
112 | await instance.startup() |
113 | resolve(instance) |
114 | return instance |
115 | } |
116 | } |
117 | |
118 | /** |
119 | * creates an new container instances and save it in the state |
120 | * @param {string} type - the type of container to create |
121 | * @param {*} code |
122 | * @param {array} entryPorts |
123 | * @param {object} id |
124 | * @param {object} id.nonce |
125 | * @param {object} id.parent |
126 | * @returns {Promise} |
127 | */ |
128 | async createInstance (type, message = new Message(), id = {nonce: 0, parent: null}) { |
129 | // create a lock to prevent the scheduler from reloving waits before the |
130 | // new container is loaded |
131 | // const unlock = this.scheduler.getLock(id) |
132 | const idHash = await this._getHashFromObj(id) |
133 | // const code = message.data.byteLength ? message.data : undefined |
134 | const state = { |
135 | nonce: [0], |
136 | ports: {}, |
137 | type: type |
138 | } |
139 | |
140 | if (message.data.length) { |
141 | state.code = message.data |
142 | } |
143 | |
144 | // create the container instance |
145 | const instance = await this._loadInstance(idHash, state) |
146 | |
147 | // send the intialization message |
148 | await instance.create(message) |
149 | |
150 | if (Object.keys(instance.ports.ports).length || instance.id === this.ROOT_ID) { |
151 | if (state.code && state.code.length > this.MAX_DATA_BYTES) { |
152 | state.code = chunk(state.code, this.MAX_DATA_BYTES).map(chk => { |
153 | return { |
154 | '/': chk |
155 | } |
156 | }) |
157 | } |
158 | // save the container in the state |
159 | await this.tree.set(idHash, state) |
160 | } else { |
161 | this.scheduler.done(idHash) |
162 | } |
163 | |
164 | return instance |
165 | } |
166 | |
167 | createChannel () { |
168 | const port1 = { |
169 | messages: [] |
170 | } |
171 | |
172 | const port2 = { |
173 | messages: [], |
174 | destPort: port1 |
175 | } |
176 | |
177 | port1.destPort = port2 |
178 | return [port1, port2] |
179 | } |
180 | |
181 | /** |
182 | * creates a state root starting from a given container and a given number of |
183 | * ticks |
184 | * @param {Number} ticks the number of ticks at which to create the state root |
185 | * @returns {Promise} |
186 | */ |
187 | async createStateRoot (ticks) { |
188 | await this.scheduler.wait(ticks) |
189 | |
190 | const unlinked = await DFSchecker(this.tree, this.ROOT_ID, this._nodesToCheck) |
191 | for (const id of unlinked) { |
192 | await this.tree.delete(id) |
193 | } |
194 | |
195 | return this.graph.flush(this.state) |
196 | } |
197 | |
198 | /** |
199 | * regirsters a container with the hypervisor |
200 | * @param {Class} Constructor - a Class for instantiating the container |
201 | * @param {*} args - any args that the contructor takes |
202 | * @param {interger} typeId - the container's type identification ID |
203 | */ |
204 | registerContainer (Constructor, args, typeId = Constructor.typeId) { |
205 | this._containerTypes[typeId] = { |
206 | Constructor: Constructor, |
207 | args: args |
208 | } |
209 | } |
210 | } |
211 |
Built with git-ssb-web