git ssb

0+

cel / pull-git-remote-helper



Tree: cf3e042658b75b328309f0f7f47d952ea5d5fa72

Files: cf3e042658b75b328309f0f7f47d952ea5d5fa72 / index.js

13828 bytesRaw
1var pull = require('pull-stream')
2var cat = require('pull-cat')
3var cache = require('pull-cache')
4var buffered = require('pull-buffered')
5var pack = require('pull-git-pack')
6var pktLine = require('./lib/pkt-line')
7var indexPack = require('./lib/index-pack')
8var util = require('./lib/util')
9var multicb = require('multicb')
10var ProgressBar = require('progress')
11
12function handleOption(options, name, value) {
13 switch (name) {
14 case 'verbosity':
15 options.verbosity = +value || 0
16 return true
17 case 'progress':
18 options.progress = !!value && value !== 'false'
19 return true
20 default:
21 console.error('unknown option', name + ': ' + value)
22 return false
23 }
24}
25
26function capabilitiesSource() {
27 return pull.once([
28 'option',
29 'connect',
30 ].join('\n') + '\n\n')
31}
32
33function optionSource(cmd, options) {
34 var args = util.split2(cmd)
35 var msg = handleOption(options, args[0], args[1])
36 msg = (msg === true) ? 'ok'
37 : (msg === false) ? 'unsupported'
38 : 'error ' + msg
39 return pull.once(msg + '\n')
40}
41
42// transform ref objects into lines
43function listRefs(read) {
44 var ended
45 return function (abort, cb) {
46 if (ended) return cb(ended)
47 read(abort, function (end, ref) {
48 ended = end
49 if (end === true) cb(null, '\n')
50 if (end) cb(end)
51 else cb(null,
52 [ref.value, ref.name].concat(ref.attrs || []).join(' ') + '\n')
53 })
54 }
55}
56
57// upload-pack: fetch to client
58function uploadPack(read, repo, options) {
59 /* multi_ack thin-pack side-band side-band-64k ofs-delta shallow no-progress
60 * include-tag multi_ack_detailed
61 * agent=git/2.7.0 */
62 var sendRefs = receivePackHeader([
63 'thin-pack',
64 ], repo.refs(), repo.symrefs(), false)
65
66 var lines = pktLine.decode(read, options)
67 var readHave = lines.haves()
68 var acked
69 var commonHash
70 var sendPack
71 var wants = {}
72 var shallows = {}
73
74 // Packfile negotiation
75 return cat([
76 pktLine.encode(cat([
77 sendRefs,
78 pull.once(''),
79 function (abort, cb) {
80 if (abort) return
81 if (acked) return cb(true)
82
83 // read upload request (wants list) from client
84 var readWant = lines.wants()
85 readWant(null, function (end, want) {
86 if (end === true) return cb(true) // early client disconnect
87 else if (end) cb(end)
88 else nextWant(null, want)
89 })
90 function nextWant(end, want) {
91 if (end) return wantsDone(end === true ? null : end)
92 if (want.type == 'want') {
93 wants[want.hash] = true
94 } else if (want.type == 'shallow') {
95 shallows[want.hash] = true
96 } else {
97 var err = new Error("Unknown thing", want.type, want.hash)
98 return readWant(err, function (e) { cb(e || err) })
99 }
100 readWant(null, nextWant)
101 }
102
103 function wantsDone(err) {
104 if (err) return cb(err)
105 // Read upload haves (haves list).
106 // On first obj-id that we have, ACK
107 // If we have none, NAK.
108 // TODO: implement multi_ack_detailed
109 readHave(null, function next(end, have) {
110 if (end === true) {
111 // found no common object
112 acked = true
113 cb(null, 'NAK')
114 } else if (end)
115 cb(end)
116 else if (have.type != 'have')
117 cb(new Error('Unknown have' + JSON.stringify(have)))
118 else
119 repo.hasObject(have.hash, function (err, haveIt) {
120 if (err) return cb(err)
121 if (!haveIt)
122 return readHave(null, next)
123 commonHash = haveIt
124 acked = true
125 cb(null, 'ACK ' + have.hash)
126 })
127 })
128 }
129 },
130 ])),
131
132 function havesDone(abort, cb) {
133 if (abort) return cb(abort)
134 // send pack file to client
135 if (sendPack)
136 return sendPack(abort, cb)
137 getObjects(repo, commonHash, wants, shallows,
138 function (err, numObjects, readObjects) {
139 if (err) return cb(err)
140 var progress = progressObjects(options)
141 progress.setNumObjects(numObjects)
142 sendPack = pack.encode(options, numObjects,
143 progress(readObjects))
144 havesDone(abort, cb)
145 }
146 )
147 }
148 ])
149}
150
151// through stream to show a progress bar for objects being read
152function progressObjects(options) {
153 // Only show progress bar if it is requested and if it won't interfere with
154 // the debug output
155 if (!options.progress || options.verbosity > 1) {
156 var dummyProgress = function (readObject) { return readObject }
157 dummyProgress.setNumObjects = function () {}
158 return dummyProgress
159 }
160
161 var numObjects
162 var size = process.stderr.columns
163 var bar = new ProgressBar(':percent :bar', {
164 total: size,
165 clear: true
166 })
167
168 var progress = function (readObject) {
169 return function (abort, cb) {
170 readObject(abort, function next(end, object) {
171 if (end === true) {
172 bar.terminate()
173 } else if (!end) {
174 var name = object.type + ' ' + object.length
175 bar.tick(size / numObjects)
176 }
177
178 cb(end, object)
179 })
180 }
181 }
182 // TODO: put the num objects in the objects stream as a header object
183 progress.setNumObjects = function (n) {
184 numObjects = n
185 }
186 return progress
187}
188
189function getObjects(repo, commonHash, heads, shallows, cb) {
190 // get objects from commonHash to each head, inclusive.
191 // if commonHash is falsy, use root
192 var objects = []
193 var objectsAdded = {}
194 var done = multicb({pluck: 1})
195 var ended
196
197 // walk back from heads until get to commonHash
198 for (var hash in heads)
199 addObject(hash, done())
200
201 // TODO: only add new objects
202
203 function addObject(hash, cb) {
204 if (ended) return cb(ended)
205 if (hash in objectsAdded || hash == commonHash) return cb()
206 objectsAdded[hash] = true
207 repo.getObject(hash, function (err, object) {
208 if (err) return cb(err)
209 if (object.type == 'blob') {
210 objects.push(object)
211 cb()
212 } else {
213 // object must be read twice, so buffer it
214 bufferObject(object, function (err, object) {
215 if (err) return cb(err)
216 objects.push(object)
217 var hashes = getObjectLinks(object)
218 for (var sha1 in hashes)
219 addObject(sha1, done())
220 cb()
221 })
222 }
223 })
224 }
225
226 done(function (err) {
227 if (err) return cb(err)
228 cb(null, objects.length, pull.values(objects))
229 })
230}
231
232function bufferObject(object, cb) {
233 pull(
234 object.read,
235 pull.collect(function (err, bufs) {
236 if (err) return cb(err)
237 var buf = Buffer.concat(bufs, object.length)
238 cb(null, {
239 type: object.type,
240 length: object.length,
241 data: buf,
242 read: pull.once(buf)
243 })
244 })
245 )
246}
247
248// get hashes of git objects linked to from other git objects
249function getObjectLinks(object, cb) {
250 switch (object.type) {
251 case 'blob':
252 return {}
253 case 'tree':
254 return getTreeLinks(object.data)
255 case 'tag':
256 case 'commit':
257 return getCommitOrTagLinks(object.data)
258 }
259}
260
261function getTreeLinks(buf) {
262 var links = {}
263 for (var i = 0, j; j = buf.indexOf(0, i, 'ascii') + 1; i = j + 20) {
264 var hash = buf.slice(j, j + 20).toString('hex')
265 if (!(hash in links))
266 links[hash] = true
267 }
268 return links
269}
270
271function getCommitOrTagLinks(buf) {
272 var lines = buf.toString('utf8').split('\n')
273 var links = {}
274 // iterate until reach blank line (indicating start of commit/tag body)
275 for (var i = 0; lines[i]; i++) {
276 var args = lines[i].split(' ')
277 switch (args[0]) {
278 case 'tree':
279 case 'parent':
280 case 'object':
281 var hash = args[1]
282 if (!(hash in links))
283 links[hash] = true
284 }
285 }
286 return links
287}
288
289/*
290TODO: investigate capabilities
291report-status delete-refs side-band-64k quiet atomic ofs-delta
292*/
293
294// Get a line for each ref that we have. The first line also has capabilities.
295// Wrap with pktLine.encode.
296function receivePackHeader(capabilities, refSource, symrefs, usePlaceholder) {
297 var first = true
298 var symrefed = {}
299 var symrefsObj = {}
300
301 return cat([
302 function (end, cb) {
303 if (end) cb(true)
304 else if (!symrefs) cb(true)
305 else pull(
306 symrefs,
307 pull.map(function (sym) {
308 symrefed[sym.ref] = true
309 symrefsObj[sym.name] = sym.ref
310 return 'symref=' + sym.name + ':' + sym.ref
311 }),
312 pull.collect(function (err, symrefCaps) {
313 if (err) return cb(err)
314 capabilities = capabilities.concat(symrefCaps)
315 cb(true)
316 })
317 )
318 },
319 pull(
320 refSource,
321 pull.map(function (ref) {
322 // insert symrefs next to the refs that they point to
323 var out = [ref]
324 if (ref.name in symrefed)
325 for (var symrefName in symrefsObj)
326 if (symrefsObj[symrefName] === ref.name)
327 out.push({name: symrefName, hash: ref.hash})
328 return out
329 }),
330 pull.flatten(),
331 pull.map(function (ref) {
332 var name = ref.name
333 var value = ref.hash
334 if (first && usePlaceholder) {
335 first = false
336 /*
337 if (end) {
338 // use placeholder data if there are no refs
339 value = '0000000000000000000000000000000000000000'
340 name = 'capabilities^{}'
341 }
342 */
343 name += '\0' + capabilities.join(' ')
344 }
345 return value + ' ' + name
346 })
347 )
348 ])
349}
350
351// receive-pack: push from client
352function receivePack(read, repo, options) {
353 var sendRefs = receivePackHeader([
354 'delete-refs',
355 ], repo.refs(), null, true)
356 var done = multicb({pluck: 1})
357
358 return pktLine.encode(
359 cat([
360 // send our refs
361 sendRefs,
362 pull.once(''),
363 function (abort, cb) {
364 if (abort) return
365 // receive their refs
366 var lines = pktLine.decode(read, options)
367 pull(
368 lines.updates,
369 pull.collect(function (err, updates) {
370 if (err) return cb(err)
371 if (updates.length === 0) return cb(true)
372 var progress = progressObjects(options)
373
374 if (repo.uploadPack) {
375 var idxCb = done()
376 var packfile = cache(lines.passthrough)
377 indexPack(packfile(), function (err, idx) {
378 if (err) return idxCb(err)
379 repo.uploadPack(pull.values(updates), pull.once({
380 pack: pull(
381 packfile(),
382 // for some reason i was getting zero length buffers which
383 // were causing muxrpc to fail, so remove them here.
384 pull.filter(function (buf) {
385 return buf.length
386 })
387 ),
388 idx: idx
389 }), idxCb)
390 })
391 } else {
392 repo.update(pull.values(updates), pull(
393 lines.passthrough,
394 pack.decode({
395 verbosity: options.verbosity,
396 onHeader: function (numObjects) {
397 progress.setNumObjects(numObjects)
398 }
399 }, repo, done()),
400 progress
401 ), done())
402 }
403
404 done(function (err) {
405 cb(err || true)
406 })
407 })
408 )
409 },
410 pull.once('unpack ok')
411 ])
412 )
413}
414
415function prepend(data, read) {
416 var done
417 return function (end, cb) {
418 if (done) {
419 read(end, cb)
420 } else {
421 done = true
422 cb(null, data)
423 }
424 }
425}
426
427module.exports = function (repo) {
428 var ended
429 var options = {
430 verbosity: 1,
431 progress: false
432 }
433
434 function handleConnect(cmd, read) {
435 var args = util.split2(cmd)
436 switch (args[0]) {
437 case 'git-upload-pack':
438 return prepend('\n', uploadPack(read, repo, options))
439 case 'git-receive-pack':
440 return prepend('\n', receivePack(read, repo, options))
441 default:
442 return pull.error(new Error('Unknown service ' + args[0]))
443 }
444 }
445
446 function handleCommand(line, read) {
447 var args = util.split2(line)
448 switch (args[0]) {
449 case 'capabilities':
450 return capabilitiesSource()
451 case 'list':
452 return listRefs(refSource)
453 case 'connect':
454 return handleConnect(args[1], read)
455 case 'option':
456 return optionSource(args[1], options)
457 default:
458 return pull.error(new Error('Unknown command ' + line))
459 }
460 }
461
462 return function (read) {
463 var b = buffered()
464 b(read)
465 var command
466
467 function getCommand(cb) {
468 b.lines(null, function next(end, line) {
469 if (ended = end)
470 return cb(end)
471
472 if (line == '')
473 return b.lines(null, next)
474
475 if (options.verbosity > 1)
476 console.error('command:', line)
477
478 var cmdSource = handleCommand(line, b.passthrough)
479 cb(null, cmdSource)
480 })
481 }
482
483 return function next(abort, cb) {
484 if (ended) return cb(ended)
485
486 if (!command) {
487 if (abort) return
488 getCommand(function (end, cmd) {
489 command = cmd
490 next(end, cb)
491 })
492 return
493 }
494
495 command(abort, function (err, data) {
496 if (err) {
497 command = null
498 if (err !== true)
499 cb(err, data)
500 else
501 next(abort, cb)
502 } else {
503 // HACK: silence error when writing to closed stream
504 try {
505 cb(null, data)
506 } catch(e) {
507 if (e.message == 'process.stdout cannot be closed.'
508 || e.message == 'This socket is closed.')
509 process.exit(1)
510 throw e
511 }
512 }
513 })
514 }
515 }
516}
517

Built with git-ssb-web