git ssb

0+

cel / pull-git-remote-helper



Tree: bab1815e344b8ddb22072833c58c3c05c07ef255

Files: bab1815e344b8ddb22072833c58c3c05c07ef255 / index.js

13897 bytesRaw
1var pull = require('pull-stream')
2var cat = require('pull-cat')
3var cache = require('pull-cache')
4var buffered = require('pull-buffered')
5var Repo = require('pull-git-repo')
6var pack = require('pull-git-pack')
7var pktLine = require('./lib/pkt-line')
8var indexPack = require('pull-git-pack/lib/index-pack')
9var util = require('./lib/util')
10var multicb = require('multicb')
11var ProgressBar = require('progress')
12
13function handleOption(options, name, value) {
14 switch (name) {
15 case 'verbosity':
16 options.verbosity = +value || 0
17 return true
18 case 'progress':
19 options.progress = !!value && value !== 'false'
20 return true
21 default:
22 console.error('unknown option', name + ': ' + value)
23 return false
24 }
25}
26
27function capabilitiesSource() {
28 return pull.once([
29 'option',
30 'connect',
31 ].join('\n') + '\n\n')
32}
33
34function optionSource(cmd, options) {
35 var args = util.split2(cmd)
36 var msg = handleOption(options, args[0], args[1])
37 msg = (msg === true) ? 'ok'
38 : (msg === false) ? 'unsupported'
39 : 'error ' + msg
40 return pull.once(msg + '\n')
41}
42
43// transform ref objects into lines
44function listRefs(read) {
45 var ended
46 return function (abort, cb) {
47 if (ended) return cb(ended)
48 read(abort, function (end, ref) {
49 ended = end
50 if (end === true) cb(null, '\n')
51 if (end) cb(end)
52 else cb(null,
53 [ref.value, ref.name].concat(ref.attrs || []).join(' ') + '\n')
54 })
55 }
56}
57
58// upload-pack: fetch to client
59function uploadPack(read, repo, options) {
60 /* multi_ack thin-pack side-band side-band-64k ofs-delta shallow no-progress
61 * include-tag multi_ack_detailed
62 * agent=git/2.7.0 */
63 var sendRefs = receivePackHeader([
64 'thin-pack',
65 ], repo.refs(), repo.symrefs(), false)
66
67 var lines = pktLine.decode(read, options)
68 var readHave = lines.haves()
69 var acked
70 var commonHash
71 var sendPack
72 var wants = {}
73 var shallows = {}
74
75 // Packfile negotiation
76 return cat([
77 pktLine.encode(cat([
78 sendRefs,
79 pull.once(''),
80 function (abort, cb) {
81 if (abort) return
82 if (acked) return cb(true)
83
84 // read upload request (wants list) from client
85 var readWant = lines.wants()
86 readWant(null, function (end, want) {
87 if (end === true) return cb(true) // early client disconnect
88 else if (end) cb(end)
89 else nextWant(null, want)
90 })
91 function nextWant(end, want) {
92 if (end) return wantsDone(end === true ? null : end)
93 if (want.type == 'want') {
94 wants[want.hash] = true
95 } else if (want.type == 'shallow') {
96 shallows[want.hash] = true
97 } else {
98 var err = new Error("Unknown thing", want.type, want.hash)
99 return readWant(err, function (e) { cb(e || err) })
100 }
101 readWant(null, nextWant)
102 }
103
104 function wantsDone(err) {
105 if (err) return cb(err)
106 // Read upload haves (haves list).
107 // On first obj-id that we have, ACK
108 // If we have none, NAK.
109 // TODO: implement multi_ack_detailed
110 readHave(null, function next(end, have) {
111 if (end === true) {
112 // found no common object
113 acked = true
114 cb(null, 'NAK')
115 } else if (end)
116 cb(end)
117 else if (have.type != 'have')
118 cb(new Error('Unknown have' + JSON.stringify(have)))
119 else
120 repo.hasObjectFromAny(have.hash, function (err, haveIt) {
121 if (err) return cb(err)
122 if (!haveIt)
123 return readHave(null, next)
124 commonHash = haveIt
125 acked = true
126 cb(null, 'ACK ' + have.hash)
127 })
128 })
129 }
130 },
131 ])),
132
133 function havesDone(abort, cb) {
134 if (abort) return cb(abort)
135 // send pack file to client
136 if (sendPack)
137 return sendPack(abort, cb)
138 getObjects(repo, commonHash, wants, shallows,
139 function (err, numObjects, readObjects) {
140 if (err) return cb(err)
141 var progress = progressObjects(options)
142 progress.setNumObjects(numObjects)
143 sendPack = pack.encode(options, numObjects,
144 progress(readObjects))
145 havesDone(abort, cb)
146 }
147 )
148 }
149 ])
150}
151
152// through stream to show a progress bar for objects being read
153function progressObjects(options) {
154 // Only show progress bar if it is requested and if it won't interfere with
155 // the debug output
156 if (!options.progress || options.verbosity > 1) {
157 var dummyProgress = function (readObject) { return readObject }
158 dummyProgress.setNumObjects = function () {}
159 return dummyProgress
160 }
161
162 var numObjects
163 var size = process.stderr.columns
164 var bar = new ProgressBar(':percent :bar', {
165 total: size,
166 clear: true
167 })
168
169 var progress = function (readObject) {
170 return function (abort, cb) {
171 readObject(abort, function next(end, object) {
172 if (end === true) {
173 bar.terminate()
174 } else if (!end) {
175 var name = object.type + ' ' + object.length
176 bar.tick(size / numObjects)
177 }
178
179 cb(end, object)
180 })
181 }
182 }
183 // TODO: put the num objects in the objects stream as a header object
184 progress.setNumObjects = function (n) {
185 numObjects = n
186 }
187 return progress
188}
189
190function getObjects(repo, commonHash, heads, shallows, cb) {
191 // get objects from commonHash to each head, inclusive.
192 // if commonHash is falsy, use root
193 var objects = []
194 var objectsAdded = {}
195 var done = multicb({pluck: 1})
196 var ended
197
198 // walk back from heads until get to commonHash
199 for (var hash in heads)
200 addObject(hash, done())
201
202 // TODO: only add new objects
203
204 function addObject(hash, cb) {
205 if (ended) return cb(ended)
206 if (hash in objectsAdded || hash == commonHash) return cb()
207 objectsAdded[hash] = true
208 repo.getObjectFromAny(hash, function (err, object) {
209 if (err) return cb(err)
210 if (object.type == 'blob') {
211 objects.push(object)
212 cb()
213 } else {
214 // object must be read twice, so buffer it
215 bufferObject(object, function (err, object) {
216 if (err) return cb(err)
217 objects.push(object)
218 var hashes = getObjectLinks(object)
219 for (var sha1 in hashes)
220 addObject(sha1, done())
221 cb()
222 })
223 }
224 })
225 }
226
227 done(function (err) {
228 if (err) return cb(err)
229 cb(null, objects.length, pull.values(objects))
230 })
231}
232
233function bufferObject(object, cb) {
234 pull(
235 object.read,
236 pull.collect(function (err, bufs) {
237 if (err) return cb(err)
238 var buf = Buffer.concat(bufs, object.length)
239 cb(null, {
240 type: object.type,
241 length: object.length,
242 data: buf,
243 read: pull.once(buf)
244 })
245 })
246 )
247}
248
249// get hashes of git objects linked to from other git objects
250function getObjectLinks(object, cb) {
251 switch (object.type) {
252 case 'blob':
253 return {}
254 case 'tree':
255 return getTreeLinks(object.data)
256 case 'tag':
257 case 'commit':
258 return getCommitOrTagLinks(object.data)
259 }
260}
261
262function getTreeLinks(buf) {
263 var links = {}
264 for (var i = 0, j; j = buf.indexOf(0, i, 'ascii') + 1; i = j + 20) {
265 var hash = buf.slice(j, j + 20).toString('hex')
266 if (!(hash in links))
267 links[hash] = true
268 }
269 return links
270}
271
272function getCommitOrTagLinks(buf) {
273 var lines = buf.toString('utf8').split('\n')
274 var links = {}
275 // iterate until reach blank line (indicating start of commit/tag body)
276 for (var i = 0; lines[i]; i++) {
277 var args = lines[i].split(' ')
278 switch (args[0]) {
279 case 'tree':
280 case 'parent':
281 case 'object':
282 var hash = args[1]
283 if (!(hash in links))
284 links[hash] = true
285 }
286 }
287 return links
288}
289
290/*
291TODO: investigate capabilities
292report-status delete-refs side-band-64k quiet atomic ofs-delta
293*/
294
295// Get a line for each ref that we have. The first line also has capabilities.
296// Wrap with pktLine.encode.
297function receivePackHeader(capabilities, refSource, symrefs, usePlaceholder) {
298 var first = true
299 var symrefed = {}
300 var symrefsObj = {}
301
302 return cat([
303 function (end, cb) {
304 if (end) cb(true)
305 else if (!symrefs) cb(true)
306 else pull(
307 symrefs,
308 pull.map(function (sym) {
309 symrefed[sym.ref] = true
310 symrefsObj[sym.name] = sym.ref
311 return 'symref=' + sym.name + ':' + sym.ref
312 }),
313 pull.collect(function (err, symrefCaps) {
314 if (err) return cb(err)
315 capabilities = capabilities.concat(symrefCaps)
316 cb(true)
317 })
318 )
319 },
320 pull(
321 refSource,
322 pull.map(function (ref) {
323 // insert symrefs next to the refs that they point to
324 var out = [ref]
325 if (ref.name in symrefed)
326 for (var symrefName in symrefsObj)
327 if (symrefsObj[symrefName] === ref.name)
328 out.push({name: symrefName, hash: ref.hash})
329 return out
330 }),
331 pull.flatten(),
332 pull.map(function (ref) {
333 var name = ref.name
334 var value = ref.hash
335 if (first && usePlaceholder) {
336 first = false
337 /*
338 if (end) {
339 // use placeholder data if there are no refs
340 value = '0000000000000000000000000000000000000000'
341 name = 'capabilities^{}'
342 }
343 */
344 name += '\0' + capabilities.join(' ')
345 }
346 return value + ' ' + name
347 })
348 )
349 ])
350}
351
352// receive-pack: push from client
353function receivePack(read, repo, options) {
354 var sendRefs = receivePackHeader([
355 'delete-refs',
356 'no-thin',
357 ], repo.refs(), null, true)
358 var done = multicb({pluck: 1})
359
360 return pktLine.encode(
361 cat([
362 // send our refs
363 sendRefs,
364 pull.once(''),
365 function (abort, cb) {
366 if (abort) return
367 // receive their refs
368 var lines = pktLine.decode(read, options)
369 pull(
370 lines.updates,
371 pull.collect(function (err, updates) {
372 if (err) return cb(err)
373 if (updates.length === 0) return cb(true)
374 var progress = progressObjects(options)
375
376 if (repo.uploadPack) {
377 var idxCb = done()
378 indexPack(lines.passthrough, function (err, idx, packfileFixed) {
379 if (err) return idxCb(err)
380 repo.uploadPack(pull.values(updates), pull.once({
381 pack: pull(
382 packfileFixed,
383 // for some reason i was getting zero length buffers which
384 // were causing muxrpc to fail, so remove them here.
385 pull.filter(function (buf) {
386 return buf.length
387 })
388 ),
389 idx: idx
390 }), idxCb)
391 })
392 } else {
393 repo.update(pull.values(updates), pull(
394 lines.passthrough,
395 pack.decode({
396 verbosity: options.verbosity,
397 onHeader: function (numObjects) {
398 progress.setNumObjects(numObjects)
399 }
400 }, repo, done()),
401 progress
402 ), done())
403 }
404
405 done(function (err) {
406 cb(err || true)
407 })
408 })
409 )
410 },
411 pull.once('unpack ok')
412 ])
413 )
414}
415
416function prepend(data, read) {
417 var done
418 return function (end, cb) {
419 if (done) {
420 read(end, cb)
421 } else {
422 done = true
423 cb(null, data)
424 }
425 }
426}
427
428module.exports = function (repo) {
429 var ended
430 var options = {
431 verbosity: 1,
432 progress: false
433 }
434
435 repo = Repo(repo)
436
437 function handleConnect(cmd, read) {
438 var args = util.split2(cmd)
439 switch (args[0]) {
440 case 'git-upload-pack':
441 return prepend('\n', uploadPack(read, repo, options))
442 case 'git-receive-pack':
443 return prepend('\n', receivePack(read, repo, options))
444 default:
445 return pull.error(new Error('Unknown service ' + args[0]))
446 }
447 }
448
449 function handleCommand(line, read) {
450 var args = util.split2(line)
451 switch (args[0]) {
452 case 'capabilities':
453 return capabilitiesSource()
454 case 'list':
455 return listRefs(refSource)
456 case 'connect':
457 return handleConnect(args[1], read)
458 case 'option':
459 return optionSource(args[1], options)
460 default:
461 return pull.error(new Error('Unknown command ' + line))
462 }
463 }
464
465 return function (read) {
466 var b = buffered()
467 b(read)
468 var command
469
470 function getCommand(cb) {
471 b.lines(null, function next(end, line) {
472 if (ended = end)
473 return cb(end)
474
475 if (line == '')
476 return b.lines(null, next)
477
478 if (options.verbosity > 1)
479 console.error('command:', line)
480
481 var cmdSource = handleCommand(line, b.passthrough)
482 cb(null, cmdSource)
483 })
484 }
485
486 return function next(abort, cb) {
487 if (ended) return cb(ended)
488
489 if (!command) {
490 if (abort) return
491 getCommand(function (end, cmd) {
492 command = cmd
493 next(end, cb)
494 })
495 return
496 }
497
498 command(abort, function (err, data) {
499 if (err) {
500 command = null
501 if (err !== true)
502 cb(err, data)
503 else
504 next(abort, cb)
505 } else {
506 // HACK: silence error when writing to closed stream
507 try {
508 cb(null, data)
509 } catch(e) {
510 if (e.message == 'process.stdout cannot be closed.'
511 || e.message == 'This socket is closed.')
512 process.exit(1)
513 throw e
514 }
515 }
516 })
517 }
518 }
519}
520

Built with git-ssb-web