git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: e7eb20431b18e511b25b50b4c4d332767e25bc3d

Files: e7eb20431b18e511b25b50b4c4d332767e25bc3d / index.js

5213 bytesRaw
1const Graph = require('ipld-graph-builder')
2const Message = require('primea-message')
3const Kernel = require('./kernel.js')
4const Scheduler = require('./scheduler.js')
5const DFSchecker = require('./dfsChecker.js')
6const chunk = require('chunk')
7
8module.exports = class Hypervisor {
9 /**
10 * The Hypervisor manages the container instances by instantiating them and
11 * destorying them when possible. It also facilitates localating Containers
12 * @param {Graph} dag an instance of [ipfs.dag](https://github.com/ipfs/interface-ipfs-core/tree/master/API/dag#dag-api)
13 * @param {object} state - the starting state
14 */
15 constructor (dag, state = {}) {
16 this.graph = new Graph(dag)
17 this.scheduler = new Scheduler()
18 this.state = state
19 this._containerTypes = {}
20 this._nodesToCheck = new Set()
21
22 this.ROOT_ID = 'zdpuAm6aTdLVMUuiZypxkwtA7sKm7BWERy8MPbaCrFsmiyzxr'
23 this.MAX_DATA_BYTES = 65533
24 }
25
26 /**
27 * add a potaintail node in the state graph to check for garbage collection
28 * @param {string} id
29 */
30 addNodeToCheck (id) {
31 this._nodesToCheck.add(id)
32 }
33
34 /**
35 * given a port, this finds the corridsponeding endpoint port of the channel
36 * @param {object} port
37 * @returns {Promise}
38 */
39 getDestPort (port) {
40 if (port.destPort) {
41 return port.destPort
42 } else {
43 return this.graph.get(this.state, `${port.destId}/ports/${port.destName}`)
44 }
45 }
46
47 async send (port, message) {
48 if (port.destId) {
49 const id = port.destId
50 const instance = await this.getInstance(id)
51 return instance.queue(port.destName, message)
52 } else {
53 // port is unbound
54 port.destPort.messages.push(message)
55 }
56 }
57
58 // loads an instance of a container from the state
59 async _loadInstance (id) {
60 const state = await this.graph.get(this.state, id)
61 const container = this._containerTypes[state.type]
62 let code
63
64 // checks if the code stored in the state is an array and that the elements
65 // are merkle link
66 if (state.code && state.code[0]['/']) {
67 await this.graph.tree(state.code, 1)
68 code = state.code.map(a => a['/']).reduce((a, b) => a + b)
69 } else {
70 code = state.code
71 }
72
73 // create a new kernel instance
74 const kernel = new Kernel({
75 hypervisor: this,
76 state: state,
77 code: code,
78 container: container,
79 id: id
80 })
81
82 // save the newly created instance
83 this.scheduler.update(kernel)
84 return kernel
85 }
86
87 // get a hash from a POJO
88 _getHashFromObj (obj) {
89 return this.graph.flush(obj).then(obj => obj['/'])
90 }
91
92 /**
93 * gets an existsing container instances
94 * @param {string} id - the containers ID
95 * @returns {Promise}
96 */
97 async getInstance (id) {
98 let instance = this.scheduler.getInstance(id)
99 if (instance) {
100 return instance
101 } else {
102 const resolve = this.scheduler.getLock(id)
103 const instance = await this._loadInstance(id)
104 resolve(instance)
105 return instance
106 }
107 }
108
109 /**
110 * creates an new container instances and save it in the state
111 * @param {string} type - the type of container to create
112 * @param {*} code
113 * @param {array} entryPorts
114 * @param {object} id
115 * @param {object} id.nonce
116 * @param {object} id.parent
117 * @returns {Promise}
118 */
119 async createInstance (type, message = new Message(), id = {nonce: 0, parent: null}) {
120 // create a lock to prevent the scheduler from reloving waits before the
121 // new container is loaded
122 const resolve = this.scheduler.getLock(id)
123 const idHash = await this._getHashFromObj(id)
124 // const code = message.data.byteLength ? message.data : undefined
125 const state = {
126 nonce: [0],
127 ports: {},
128 type: type
129 }
130
131 if (message.data.length) {
132 state.code = message.data
133 }
134
135 // save the container in the state
136 await this.graph.set(this.state, idHash, state)
137 // create the container instance
138 const instance = await this._loadInstance(idHash)
139 resolve(instance)
140 // send the intialization message
141 await instance.initialize(message)
142
143 if (state.code && state.code.length > this.MAX_DATA_BYTES) {
144 state.code = chunk(state.code, this.MAX_DATA_BYTES).map(chk => {
145 return {
146 '/': chk
147 }
148 })
149 }
150
151 return instance
152 }
153
154 /**
155 * creates a state root starting from a given container and a given number of
156 * ticks
157 * @param {Number} ticks the number of ticks at which to create the state root
158 * @returns {Promise}
159 */
160 async createStateRoot (ticks) {
161 await this.scheduler.wait(ticks)
162 const unlinked = await DFSchecker(this.graph, this.state, this.ROOT_ID, this._nodesToCheck)
163 unlinked.forEach(id => {
164 delete this.state[id]
165 })
166 return this.graph.flush(this.state)
167 }
168
169 /**
170 * regirsters a container with the hypervisor
171 * @param {String} type - the name of the type
172 * @param {Class} Constructor - a Class for instantiating the container
173 * @param {*} args - any args that the contructor takes
174 */
175 registerContainer (type, Constructor, args) {
176 Constructor.type = type
177 this._containerTypes[type] = {
178 Constructor: Constructor,
179 args: args
180 }
181 }
182}
183

Built with git-ssb-web