git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: 007b993aa86e9b1ebcc62315c01a80140cfb60d0

Files: 007b993aa86e9b1ebcc62315c01a80140cfb60d0 / index.js

5240 bytesRaw
1const Graph = require('ipld-graph-builder')
2const Message = require('primea-message')
3const Kernel = require('./kernel.js')
4const Scheduler = require('./scheduler.js')
5const DFSchecker = require('./dfsChecker.js')
6const chunk = require('chunk')
7
8module.exports = class Hypervisor {
9 /**
10 * The Hypervisor manages the container instances by instantiating them and
11 * destorying them when possible. It also facilitates localating Containers
12 * @param {Graph} dag an instance of [ipfs.dag](https://github.com/ipfs/interface-ipfs-core/tree/master/API/dag#dag-api)
13 * @param {object} state - the starting state
14 */
15 constructor (dag, state = {}) {
16 this.graph = new Graph(dag)
17 this.scheduler = new Scheduler()
18 this.state = state
19 this._containerTypes = {}
20 this._nodesToCheck = new Set()
21
22 this.ROOT_ID = 'zdpuAm6aTdLVMUuiZypxkwtA7sKm7BWERy8MPbaCrFsmiyzxr'
23 this.MAX_DATA_BYTES = 65533
24 }
25
26 /**
27 * add a potaintail node in the state graph to check for garbage collection
28 * @param {string} id
29 */
30 addNodeToCheck (id) {
31 this._nodesToCheck.add(id)
32 }
33
34 /**
35 * given a port, this finds the corridsponeding endpoint port of the channel
36 * @param {object} port
37 * @returns {Promise}
38 */
39 getDestPort (port) {
40 if (port.destPort) {
41 return port.destPort
42 } else {
43 return this.graph.get(this.state, `${port.destId}/ports/${port.destName}`)
44 }
45 }
46
47 async send (port, message) {
48 if (port.destId) {
49 const id = port.destId
50 const instance = await this.getInstance(id)
51 return instance.queue(port.destName, message)
52 } else {
53 // port is unbound
54 port.destPort.messages.push(message)
55 }
56 }
57
58 // loads an instance of a container from the state
59 async _loadInstance (id) {
60 const state = await this.graph.get(this.state, id)
61 const container = this._containerTypes[state.type]
62 let code
63
64 // checks if the code stored in the state is an array and that the elements
65 // are merkle link
66 if (state.code && state.code[0]['/']) {
67 await this.graph.tree(state.code, 1)
68 code = state.code.map(a => a['/']).reduce((a, b) => a + b)
69 } else {
70 code = state.code
71 }
72
73 // create a new kernel instance
74 const kernel = new Kernel({
75 hypervisor: this,
76 state: state,
77 code: code,
78 container: container,
79 id: id
80 })
81
82 // save the newly created instance
83 this.scheduler.update(kernel)
84 return kernel
85 }
86
87 // get a hash from a POJO
88 _getHashFromObj (obj) {
89 return this.graph.flush(obj).then(obj => obj['/'])
90 }
91
92 /**
93 * gets an existsing container instances
94 * @param {string} id - the containers ID
95 * @returns {Promise}
96 */
97 async getInstance (id) {
98 let instance = this.scheduler.getInstance(id)
99 if (instance) {
100 return instance
101 } else {
102 const resolve = this.scheduler.getLock(id)
103 const instance = await this._loadInstance(id)
104 await instance.startup()
105 resolve(instance)
106 return instance
107 }
108 }
109
110 /**
111 * creates an new container instances and save it in the state
112 * @param {string} type - the type of container to create
113 * @param {*} code
114 * @param {array} entryPorts
115 * @param {object} id
116 * @param {object} id.nonce
117 * @param {object} id.parent
118 * @returns {Promise}
119 */
120 async createInstance (type, message = new Message(), id = {nonce: 0, parent: null}) {
121 // create a lock to prevent the scheduler from reloving waits before the
122 // new container is loaded
123 const resolve = this.scheduler.getLock(id)
124 const idHash = await this._getHashFromObj(id)
125 // const code = message.data.byteLength ? message.data : undefined
126 const state = {
127 nonce: [0],
128 ports: {},
129 type: type
130 }
131
132 if (message.data.length) {
133 state.code = message.data
134 }
135
136 // save the container in the state
137 await this.graph.set(this.state, idHash, state)
138 // create the container instance
139 const instance = await this._loadInstance(idHash)
140 resolve(instance)
141 // send the intialization message
142 await instance.create(message)
143
144 if (state.code && state.code.length > this.MAX_DATA_BYTES) {
145 state.code = chunk(state.code, this.MAX_DATA_BYTES).map(chk => {
146 return {
147 '/': chk
148 }
149 })
150 }
151
152 return instance
153 }
154
155 /**
156 * creates a state root starting from a given container and a given number of
157 * ticks
158 * @param {Number} ticks the number of ticks at which to create the state root
159 * @returns {Promise}
160 */
161 async createStateRoot (ticks) {
162 await this.scheduler.wait(ticks)
163 const unlinked = await DFSchecker(this.graph, this.state, this.ROOT_ID, this._nodesToCheck)
164 unlinked.forEach(id => {
165 delete this.state[id]
166 })
167 return this.graph.flush(this.state)
168 }
169
170 /**
171 * regirsters a container with the hypervisor
172 * @param {String} type - the name of the type
173 * @param {Class} Constructor - a Class for instantiating the container
174 * @param {*} args - any args that the contructor takes
175 */
176 registerContainer (type, Constructor, args) {
177 Constructor.type = type
178 this._containerTypes[type] = {
179 Constructor: Constructor,
180 args: args
181 }
182 }
183}
184

Built with git-ssb-web