Files: cf3e042658b75b328309f0f7f47d952ea5d5fa72 / index.js
13828 bytesRaw
1 | var pull = require('pull-stream') |
2 | var cat = require('pull-cat') |
3 | var cache = require('pull-cache') |
4 | var buffered = require('pull-buffered') |
5 | var pack = require('pull-git-pack') |
6 | var pktLine = require('./lib/pkt-line') |
7 | var indexPack = require('./lib/index-pack') |
8 | var util = require('./lib/util') |
9 | var multicb = require('multicb') |
10 | var ProgressBar = require('progress') |
11 | |
12 | function handleOption(options, name, value) { |
13 | switch (name) { |
14 | case 'verbosity': |
15 | options.verbosity = +value || 0 |
16 | return true |
17 | case 'progress': |
18 | options.progress = !!value && value !== 'false' |
19 | return true |
20 | default: |
21 | console.error('unknown option', name + ': ' + value) |
22 | return false |
23 | } |
24 | } |
25 | |
26 | function capabilitiesSource() { |
27 | return pull.once([ |
28 | 'option', |
29 | 'connect', |
30 | ].join('\n') + '\n\n') |
31 | } |
32 | |
33 | function optionSource(cmd, options) { |
34 | var args = util.split2(cmd) |
35 | var msg = handleOption(options, args[0], args[1]) |
36 | msg = (msg === true) ? 'ok' |
37 | : (msg === false) ? 'unsupported' |
38 | : 'error ' + msg |
39 | return pull.once(msg + '\n') |
40 | } |
41 | |
42 | // transform ref objects into lines |
43 | function listRefs(read) { |
44 | var ended |
45 | return function (abort, cb) { |
46 | if (ended) return cb(ended) |
47 | read(abort, function (end, ref) { |
48 | ended = end |
49 | if (end === true) cb(null, '\n') |
50 | if (end) cb(end) |
51 | else cb(null, |
52 | [ref.value, ref.name].concat(ref.attrs || []).join(' ') + '\n') |
53 | }) |
54 | } |
55 | } |
56 | |
57 | // upload-pack: fetch to client |
58 | function uploadPack(read, repo, options) { |
59 | /* multi_ack thin-pack side-band side-band-64k ofs-delta shallow no-progress |
60 | * include-tag multi_ack_detailed |
61 | * agent=git/2.7.0 */ |
62 | var sendRefs = receivePackHeader([ |
63 | 'thin-pack', |
64 | ], repo.refs(), repo.symrefs(), false) |
65 | |
66 | var lines = pktLine.decode(read, options) |
67 | var readHave = lines.haves() |
68 | var acked |
69 | var commonHash |
70 | var sendPack |
71 | var wants = {} |
72 | var shallows = {} |
73 | |
74 | // Packfile negotiation |
75 | return cat([ |
76 | pktLine.encode(cat([ |
77 | sendRefs, |
78 | pull.once(''), |
79 | function (abort, cb) { |
80 | if (abort) return |
81 | if (acked) return cb(true) |
82 | |
83 | // read upload request (wants list) from client |
84 | var readWant = lines.wants() |
85 | readWant(null, function (end, want) { |
86 | if (end === true) return cb(true) // early client disconnect |
87 | else if (end) cb(end) |
88 | else nextWant(null, want) |
89 | }) |
90 | function nextWant(end, want) { |
91 | if (end) return wantsDone(end === true ? null : end) |
92 | if (want.type == 'want') { |
93 | wants[want.hash] = true |
94 | } else if (want.type == 'shallow') { |
95 | shallows[want.hash] = true |
96 | } else { |
97 | var err = new Error("Unknown thing", want.type, want.hash) |
98 | return readWant(err, function (e) { cb(e || err) }) |
99 | } |
100 | readWant(null, nextWant) |
101 | } |
102 | |
103 | function wantsDone(err) { |
104 | if (err) return cb(err) |
105 | // Read upload haves (haves list). |
106 | // On first obj-id that we have, ACK |
107 | // If we have none, NAK. |
108 | // TODO: implement multi_ack_detailed |
109 | readHave(null, function next(end, have) { |
110 | if (end === true) { |
111 | // found no common object |
112 | acked = true |
113 | cb(null, 'NAK') |
114 | } else if (end) |
115 | cb(end) |
116 | else if (have.type != 'have') |
117 | cb(new Error('Unknown have' + JSON.stringify(have))) |
118 | else |
119 | repo.hasObject(have.hash, function (err, haveIt) { |
120 | if (err) return cb(err) |
121 | if (!haveIt) |
122 | return readHave(null, next) |
123 | commonHash = haveIt |
124 | acked = true |
125 | cb(null, 'ACK ' + have.hash) |
126 | }) |
127 | }) |
128 | } |
129 | }, |
130 | ])), |
131 | |
132 | function havesDone(abort, cb) { |
133 | if (abort) return cb(abort) |
134 | // send pack file to client |
135 | if (sendPack) |
136 | return sendPack(abort, cb) |
137 | getObjects(repo, commonHash, wants, shallows, |
138 | function (err, numObjects, readObjects) { |
139 | if (err) return cb(err) |
140 | var progress = progressObjects(options) |
141 | progress.setNumObjects(numObjects) |
142 | sendPack = pack.encode(options, numObjects, |
143 | progress(readObjects)) |
144 | havesDone(abort, cb) |
145 | } |
146 | ) |
147 | } |
148 | ]) |
149 | } |
150 | |
151 | // through stream to show a progress bar for objects being read |
152 | function progressObjects(options) { |
153 | // Only show progress bar if it is requested and if it won't interfere with |
154 | // the debug output |
155 | if (!options.progress || options.verbosity > 1) { |
156 | var dummyProgress = function (readObject) { return readObject } |
157 | dummyProgress.setNumObjects = function () {} |
158 | return dummyProgress |
159 | } |
160 | |
161 | var numObjects |
162 | var size = process.stderr.columns |
163 | var bar = new ProgressBar(':percent :bar', { |
164 | total: size, |
165 | clear: true |
166 | }) |
167 | |
168 | var progress = function (readObject) { |
169 | return function (abort, cb) { |
170 | readObject(abort, function next(end, object) { |
171 | if (end === true) { |
172 | bar.terminate() |
173 | } else if (!end) { |
174 | var name = object.type + ' ' + object.length |
175 | bar.tick(size / numObjects) |
176 | } |
177 | |
178 | cb(end, object) |
179 | }) |
180 | } |
181 | } |
182 | // TODO: put the num objects in the objects stream as a header object |
183 | progress.setNumObjects = function (n) { |
184 | numObjects = n |
185 | } |
186 | return progress |
187 | } |
188 | |
189 | function getObjects(repo, commonHash, heads, shallows, cb) { |
190 | // get objects from commonHash to each head, inclusive. |
191 | // if commonHash is falsy, use root |
192 | var objects = [] |
193 | var objectsAdded = {} |
194 | var done = multicb({pluck: 1}) |
195 | var ended |
196 | |
197 | // walk back from heads until get to commonHash |
198 | for (var hash in heads) |
199 | addObject(hash, done()) |
200 | |
201 | // TODO: only add new objects |
202 | |
203 | function addObject(hash, cb) { |
204 | if (ended) return cb(ended) |
205 | if (hash in objectsAdded || hash == commonHash) return cb() |
206 | objectsAdded[hash] = true |
207 | repo.getObject(hash, function (err, object) { |
208 | if (err) return cb(err) |
209 | if (object.type == 'blob') { |
210 | objects.push(object) |
211 | cb() |
212 | } else { |
213 | // object must be read twice, so buffer it |
214 | bufferObject(object, function (err, object) { |
215 | if (err) return cb(err) |
216 | objects.push(object) |
217 | var hashes = getObjectLinks(object) |
218 | for (var sha1 in hashes) |
219 | addObject(sha1, done()) |
220 | cb() |
221 | }) |
222 | } |
223 | }) |
224 | } |
225 | |
226 | done(function (err) { |
227 | if (err) return cb(err) |
228 | cb(null, objects.length, pull.values(objects)) |
229 | }) |
230 | } |
231 | |
232 | function bufferObject(object, cb) { |
233 | pull( |
234 | object.read, |
235 | pull.collect(function (err, bufs) { |
236 | if (err) return cb(err) |
237 | var buf = Buffer.concat(bufs, object.length) |
238 | cb(null, { |
239 | type: object.type, |
240 | length: object.length, |
241 | data: buf, |
242 | read: pull.once(buf) |
243 | }) |
244 | }) |
245 | ) |
246 | } |
247 | |
248 | // get hashes of git objects linked to from other git objects |
249 | function getObjectLinks(object, cb) { |
250 | switch (object.type) { |
251 | case 'blob': |
252 | return {} |
253 | case 'tree': |
254 | return getTreeLinks(object.data) |
255 | case 'tag': |
256 | case 'commit': |
257 | return getCommitOrTagLinks(object.data) |
258 | } |
259 | } |
260 | |
261 | function getTreeLinks(buf) { |
262 | var links = {} |
263 | for (var i = 0, j; j = buf.indexOf(0, i, 'ascii') + 1; i = j + 20) { |
264 | var hash = buf.slice(j, j + 20).toString('hex') |
265 | if (!(hash in links)) |
266 | links[hash] = true |
267 | } |
268 | return links |
269 | } |
270 | |
271 | function getCommitOrTagLinks(buf) { |
272 | var lines = buf.toString('utf8').split('\n') |
273 | var links = {} |
274 | // iterate until reach blank line (indicating start of commit/tag body) |
275 | for (var i = 0; lines[i]; i++) { |
276 | var args = lines[i].split(' ') |
277 | switch (args[0]) { |
278 | case 'tree': |
279 | case 'parent': |
280 | case 'object': |
281 | var hash = args[1] |
282 | if (!(hash in links)) |
283 | links[hash] = true |
284 | } |
285 | } |
286 | return links |
287 | } |
288 | |
289 | /* |
290 | TODO: investigate capabilities |
291 | report-status delete-refs side-band-64k quiet atomic ofs-delta |
292 | */ |
293 | |
294 | // Get a line for each ref that we have. The first line also has capabilities. |
295 | // Wrap with pktLine.encode. |
296 | function receivePackHeader(capabilities, refSource, symrefs, usePlaceholder) { |
297 | var first = true |
298 | var symrefed = {} |
299 | var symrefsObj = {} |
300 | |
301 | return cat([ |
302 | function (end, cb) { |
303 | if (end) cb(true) |
304 | else if (!symrefs) cb(true) |
305 | else pull( |
306 | symrefs, |
307 | pull.map(function (sym) { |
308 | symrefed[sym.ref] = true |
309 | symrefsObj[sym.name] = sym.ref |
310 | return 'symref=' + sym.name + ':' + sym.ref |
311 | }), |
312 | pull.collect(function (err, symrefCaps) { |
313 | if (err) return cb(err) |
314 | capabilities = capabilities.concat(symrefCaps) |
315 | cb(true) |
316 | }) |
317 | ) |
318 | }, |
319 | pull( |
320 | refSource, |
321 | pull.map(function (ref) { |
322 | // insert symrefs next to the refs that they point to |
323 | var out = [ref] |
324 | if (ref.name in symrefed) |
325 | for (var symrefName in symrefsObj) |
326 | if (symrefsObj[symrefName] === ref.name) |
327 | out.push({name: symrefName, hash: ref.hash}) |
328 | return out |
329 | }), |
330 | pull.flatten(), |
331 | pull.map(function (ref) { |
332 | var name = ref.name |
333 | var value = ref.hash |
334 | if (first && usePlaceholder) { |
335 | first = false |
336 | /* |
337 | if (end) { |
338 | // use placeholder data if there are no refs |
339 | value = '0000000000000000000000000000000000000000' |
340 | name = 'capabilities^{}' |
341 | } |
342 | */ |
343 | name += '\0' + capabilities.join(' ') |
344 | } |
345 | return value + ' ' + name |
346 | }) |
347 | ) |
348 | ]) |
349 | } |
350 | |
351 | // receive-pack: push from client |
352 | function receivePack(read, repo, options) { |
353 | var sendRefs = receivePackHeader([ |
354 | 'delete-refs', |
355 | ], repo.refs(), null, true) |
356 | var done = multicb({pluck: 1}) |
357 | |
358 | return pktLine.encode( |
359 | cat([ |
360 | // send our refs |
361 | sendRefs, |
362 | pull.once(''), |
363 | function (abort, cb) { |
364 | if (abort) return |
365 | // receive their refs |
366 | var lines = pktLine.decode(read, options) |
367 | pull( |
368 | lines.updates, |
369 | pull.collect(function (err, updates) { |
370 | if (err) return cb(err) |
371 | if (updates.length === 0) return cb(true) |
372 | var progress = progressObjects(options) |
373 | |
374 | if (repo.uploadPack) { |
375 | var idxCb = done() |
376 | var packfile = cache(lines.passthrough) |
377 | indexPack(packfile(), function (err, idx) { |
378 | if (err) return idxCb(err) |
379 | repo.uploadPack(pull.values(updates), pull.once({ |
380 | pack: pull( |
381 | packfile(), |
382 | // for some reason i was getting zero length buffers which |
383 | // were causing muxrpc to fail, so remove them here. |
384 | pull.filter(function (buf) { |
385 | return buf.length |
386 | }) |
387 | ), |
388 | idx: idx |
389 | }), idxCb) |
390 | }) |
391 | } else { |
392 | repo.update(pull.values(updates), pull( |
393 | lines.passthrough, |
394 | pack.decode({ |
395 | verbosity: options.verbosity, |
396 | onHeader: function (numObjects) { |
397 | progress.setNumObjects(numObjects) |
398 | } |
399 | }, repo, done()), |
400 | progress |
401 | ), done()) |
402 | } |
403 | |
404 | done(function (err) { |
405 | cb(err || true) |
406 | }) |
407 | }) |
408 | ) |
409 | }, |
410 | pull.once('unpack ok') |
411 | ]) |
412 | ) |
413 | } |
414 | |
415 | function prepend(data, read) { |
416 | var done |
417 | return function (end, cb) { |
418 | if (done) { |
419 | read(end, cb) |
420 | } else { |
421 | done = true |
422 | cb(null, data) |
423 | } |
424 | } |
425 | } |
426 | |
427 | module.exports = function (repo) { |
428 | var ended |
429 | var options = { |
430 | verbosity: 1, |
431 | progress: false |
432 | } |
433 | |
434 | function handleConnect(cmd, read) { |
435 | var args = util.split2(cmd) |
436 | switch (args[0]) { |
437 | case 'git-upload-pack': |
438 | return prepend('\n', uploadPack(read, repo, options)) |
439 | case 'git-receive-pack': |
440 | return prepend('\n', receivePack(read, repo, options)) |
441 | default: |
442 | return pull.error(new Error('Unknown service ' + args[0])) |
443 | } |
444 | } |
445 | |
446 | function handleCommand(line, read) { |
447 | var args = util.split2(line) |
448 | switch (args[0]) { |
449 | case 'capabilities': |
450 | return capabilitiesSource() |
451 | case 'list': |
452 | return listRefs(refSource) |
453 | case 'connect': |
454 | return handleConnect(args[1], read) |
455 | case 'option': |
456 | return optionSource(args[1], options) |
457 | default: |
458 | return pull.error(new Error('Unknown command ' + line)) |
459 | } |
460 | } |
461 | |
462 | return function (read) { |
463 | var b = buffered() |
464 | b(read) |
465 | var command |
466 | |
467 | function getCommand(cb) { |
468 | b.lines(null, function next(end, line) { |
469 | if (ended = end) |
470 | return cb(end) |
471 | |
472 | if (line == '') |
473 | return b.lines(null, next) |
474 | |
475 | if (options.verbosity > 1) |
476 | console.error('command:', line) |
477 | |
478 | var cmdSource = handleCommand(line, b.passthrough) |
479 | cb(null, cmdSource) |
480 | }) |
481 | } |
482 | |
483 | return function next(abort, cb) { |
484 | if (ended) return cb(ended) |
485 | |
486 | if (!command) { |
487 | if (abort) return |
488 | getCommand(function (end, cmd) { |
489 | command = cmd |
490 | next(end, cb) |
491 | }) |
492 | return |
493 | } |
494 | |
495 | command(abort, function (err, data) { |
496 | if (err) { |
497 | command = null |
498 | if (err !== true) |
499 | cb(err, data) |
500 | else |
501 | next(abort, cb) |
502 | } else { |
503 | // HACK: silence error when writing to closed stream |
504 | try { |
505 | cb(null, data) |
506 | } catch(e) { |
507 | if (e.message == 'process.stdout cannot be closed.' |
508 | || e.message == 'This socket is closed.') |
509 | process.exit(1) |
510 | throw e |
511 | } |
512 | } |
513 | }) |
514 | } |
515 | } |
516 | } |
517 |
Built with git-ssb-web