git ssb

0+

wanderer🌟 / js-primea-hypervisor



Tree: f0fbf4a220a3041e504eb1b75fc6e6e2e9488ad6

Files: f0fbf4a220a3041e504eb1b75fc6e6e2e9488ad6 / index.js

5141 bytesRaw
1const Graph = require('ipld-graph-builder')
2const Message = require('primea-message')
3const Kernel = require('./kernel.js')
4const Scheduler = require('./scheduler.js')
5const DFSchecker = require('./dfsChecker.js')
6const chunk = require('chunk')
7
8module.exports = class Hypervisor {
9 /**
10 * The Hypervisor manages the container instances by instantiating them and
11 * destorying them when possible. It also facilitates localating Containers
12 * @param {Graph} dag an instance of [ipfs.dag](https://github.com/ipfs/interface-ipfs-core/tree/master/API/dag#dag-api)
13 * @param {object} state - the starting state
14 */
15 constructor (dag, state = {}) {
16 this.graph = new Graph(dag)
17 this.scheduler = new Scheduler()
18 this.state = state
19 this._containerTypes = {}
20 this._nodesToCheck = new Set()
21
22 this.ROOT_ID = 'zdpuAm6aTdLVMUuiZypxkwtA7sKm7BWERy8MPbaCrFsmiyzxr'
23 this.MAX_DATA_BYTES = 65533
24 }
25
26 /**
27 * add a potaintail node in the state graph to check for garbage collection
28 * @param {string} id
29 */
30 addNodeToCheck (id) {
31 this._nodesToCheck.add(id)
32 }
33
34 /**
35 * given a port, this finds the corridsponeding endpoint port of the channel
36 * @param {object} port
37 * @returns {Promise}
38 */
39 getDestPort (port) {
40 if (port.destPort) {
41 return port.destPort
42 } else {
43 return this.graph.get(this.state, `${port.destId}/ports/${port.destName}`)
44 }
45 }
46
47 async send (port, message) {
48 if (port.destId) {
49 const id = port.destId
50 const instance = await this.getInstance(id)
51 return instance.queue(port.destName, message)
52 } else {
53 // port is unbound
54 port.destPort.messages.push(message)
55 }
56 }
57
58 // loads an instance of a container from the state
59 async _loadInstance (id) {
60 const state = await this.graph.get(this.state, id)
61 const container = this._containerTypes[state.type]
62 let code
63
64 if (state.code && state.code[0]['/']) {
65 await this.graph.tree(state.code, 1)
66 code = state.code.map(a => a['/']).reduce((a, b) => a + b)
67 } else {
68 code = state.code
69 }
70
71 // create a new kernel instance
72 const kernel = new Kernel({
73 hypervisor: this,
74 state: state,
75 code: code,
76 container: container,
77 id: id
78 })
79
80 // save the newly created instance
81 this.scheduler.update(kernel)
82 return kernel
83 }
84
85 // get a hash from a POJO
86 _getHashFromObj (obj) {
87 return this.graph.flush(obj).then(obj => obj['/'])
88 }
89
90 /**
91 * gets an existsing container instances
92 * @param {string} id - the containers ID
93 * @returns {Promise}
94 */
95 async getInstance (id) {
96 let instance = this.scheduler.getInstance(id)
97 if (instance) {
98 return instance
99 } else {
100 const resolve = this.scheduler.getLock(id)
101 const instance = await this._loadInstance(id)
102 await instance.startup()
103 resolve(instance)
104 return instance
105 }
106 }
107
108 /**
109 * creates an new container instances and save it in the state
110 * @param {string} type - the type of container to create
111 * @param {*} code
112 * @param {array} entryPorts
113 * @param {object} id
114 * @param {object} id.nonce
115 * @param {object} id.parent
116 * @returns {Promise}
117 */
118 async createInstance (type, message = new Message(), id = {nonce: 0, parent: null}) {
119 // create a lock to prevent the scheduler from reloving waits before the
120 // new container is loaded
121 const resolve = this.scheduler.getLock(id)
122 const idHash = await this._getHashFromObj(id)
123 // const code = message.data.byteLength ? message.data : undefined
124 const state = {
125 nonce: [0],
126 ports: {},
127 type: type
128 }
129
130 if (message.data.length) {
131 state.code = message.data
132 }
133
134 // save the container in the state
135 await this.graph.set(this.state, idHash, state)
136 // create the container instance
137 const instance = await this._loadInstance(idHash)
138 resolve(instance)
139 // send the intialization message
140 await instance.initialize(message)
141
142 if (state.code && state.code.length > this.MAX_DATA_BYTES) {
143 state.code = chunk(state.code, this.MAX_DATA_BYTES).map(chk => {
144 return {
145 '/': chk
146 }
147 })
148 }
149
150 return instance
151 }
152
153 /**
154 * creates a state root starting from a given container and a given number of
155 * ticks
156 * @param {Number} ticks the number of ticks at which to create the state root
157 * @returns {Promise}
158 */
159 async createStateRoot (ticks) {
160 await this.scheduler.wait(ticks)
161 const unlinked = await DFSchecker(this.graph, this.state, this.ROOT_ID, this._nodesToCheck)
162 unlinked.forEach(id => {
163 delete this.state[id]
164 })
165 return this.graph.flush(this.state)
166 }
167
168 /**
169 * regirsters a container with the hypervisor
170 * @param {String} type - the name of the type
171 * @param {Class} Constructor - a Class for instantiating the container
172 * @param {*} args - any args that the contructor takes
173 */
174 registerContainer (type, Constructor, args) {
175 Constructor.type = type
176 this._containerTypes[type] = {
177 Constructor: Constructor,
178 args: args
179 }
180 }
181}
182

Built with git-ssb-web