git ssb

0+

cel / pull-git-remote-helper



Tree: c6fc135c4c7adad4f6abd11c17e4b23ae6544228

Files: c6fc135c4c7adad4f6abd11c17e4b23ae6544228 / index.js

14119 bytesRaw
1var pull = require('pull-stream')
2var cat = require('pull-cat')
3var cache = require('pull-cache')
4var buffered = require('pull-buffered')
5var Repo = require('pull-git-repo')
6var pack = require('pull-git-pack')
7var pktLine = require('./lib/pkt-line')
8var indexPack = require('pull-git-pack/lib/index-pack')
9var util = require('./lib/util')
10var multicb = require('multicb')
11var ProgressBar = require('progress')
12
13function handleOption(options, name, value) {
14 switch (name) {
15 case 'verbosity':
16 options.verbosity = +value || 0
17 return true
18 case 'progress':
19 options.progress = !!value && value !== 'false'
20 return true
21 default:
22 console.error('unknown option', name + ': ' + value)
23 return false
24 }
25}
26
27function capabilitiesSource() {
28 return pull.once([
29 'option',
30 'connect',
31 ].join('\n') + '\n\n')
32}
33
34function optionSource(cmd, options) {
35 var args = util.split2(cmd)
36 var msg = handleOption(options, args[0], args[1])
37 msg = (msg === true) ? 'ok'
38 : (msg === false) ? 'unsupported'
39 : 'error ' + msg
40 return pull.once(msg + '\n')
41}
42
43// transform ref objects into lines
44function listRefs(read) {
45 var ended
46 return function (abort, cb) {
47 if (ended) return cb(ended)
48 read(abort, function (end, ref) {
49 ended = end
50 if (end === true) cb(null, '\n')
51 if (end) cb(end)
52 else cb(null,
53 [ref.value, ref.name].concat(ref.attrs || []).join(' ') + '\n')
54 })
55 }
56}
57
58// upload-pack: fetch to client
59function uploadPack(read, repo, options) {
60 /* multi_ack thin-pack side-band side-band-64k ofs-delta shallow no-progress
61 * include-tag multi_ack_detailed
62 * agent=git/2.7.0 */
63 var sendRefs = receivePackHeader([
64 'thin-pack',
65 ], repo.refs(), repo.symrefs(), false)
66
67 var lines = pktLine.decode(read, options)
68 var readHave = lines.haves()
69 var acked
70 var commonHash
71 var sendPack
72 var wants = {}
73 var shallows = {}
74 var aborted
75
76 // Packfile negotiation
77 return cat([
78 pktLine.encode(cat([
79 sendRefs,
80 pull.once(''),
81 function (abort, cb) {
82 if (abort) return
83 if (acked) return cb(true)
84
85 // read upload request (wants list) from client
86 var readWant = lines.wants()
87 readWant(null, function (end, want) {
88 if (end === true) return cb(aborted = true) // early client disconnect
89 else if (end) cb(end)
90 else nextWant(null, want)
91 })
92 function nextWant(end, want) {
93 if (end) return wantsDone(end === true ? null : end)
94 if (want.type == 'want') {
95 wants[want.hash] = true
96 } else if (want.type == 'shallow') {
97 shallows[want.hash] = true
98 } else {
99 var err = new Error("Unknown thing", want.type, want.hash)
100 return readWant(err, function (e) { cb(e || err) })
101 }
102 readWant(null, nextWant)
103 }
104
105 function wantsDone(err) {
106 if (err) return cb(err)
107 // Read upload haves (haves list).
108 // On first obj-id that we have, ACK
109 // If we have none, NAK.
110 // TODO: implement multi_ack_detailed
111 readHave(null, function next(end, have) {
112 if (end === true) {
113 // found no common object
114 acked = true
115 cb(null, 'NAK')
116 } else if (end)
117 cb(end)
118 else if (have.type != 'have')
119 cb(new Error('Unknown have' + JSON.stringify(have)))
120 else
121 repo.hasObjectFromAny(have.hash, function (err, haveIt) {
122 if (err) return cb(err)
123 if (!haveIt)
124 return readHave(null, next)
125 commonHash = haveIt
126 acked = true
127 cb(null, 'ACK ' + have.hash)
128 })
129 })
130 }
131 },
132 ])),
133
134 function havesDone(abort, cb) {
135 if (abort || aborted) return cb(abort || aborted)
136 // send pack file to client
137 if (sendPack)
138 return sendPack(abort, cb)
139 getObjects(repo, commonHash, wants, shallows,
140 function (err, numObjects, readObjects) {
141 if (err) return cb(err)
142 var progress = progressObjects(options)
143 progress.setNumObjects(numObjects)
144 sendPack = pack.encode(options, numObjects,
145 progress(readObjects))
146 havesDone(abort, cb)
147 }
148 )
149 }
150 ])
151}
152
153// through stream to show a progress bar for objects being read
154function progressObjects(options) {
155 // Only show progress bar if it is requested and if it won't interfere with
156 // the debug output
157 if (!options.progress || options.verbosity > 1) {
158 var dummyProgress = function (readObject) { return readObject }
159 dummyProgress.setNumObjects = function () {}
160 return dummyProgress
161 }
162
163 var numObjects
164 var size = process.stderr.columns
165 var bar = new ProgressBar(':percent :bar', {
166 total: size,
167 clear: true
168 })
169
170 var progress = function (readObject) {
171 return function (abort, cb) {
172 readObject(abort, function next(end, object) {
173 if (end === true) {
174 bar.terminate()
175 } else if (!end) {
176 var name = object.type + ' ' + object.length
177 bar.tick(size / numObjects)
178 }
179
180 cb(end, object)
181 })
182 }
183 }
184 // TODO: put the num objects in the objects stream as a header object
185 progress.setNumObjects = function (n) {
186 numObjects = n
187 }
188 return progress
189}
190
191function getObjects(repo, commonHash, heads, shallows, cb) {
192 // get objects from commonHash to each head, inclusive.
193 // if commonHash is falsy, use root
194 var objects = []
195 var objectsAdded = {}
196 var done = multicb({pluck: 1})
197 var ended
198
199 // walk back from heads until get to commonHash
200 for (var hash in heads)
201 addObject(hash, done())
202
203 // TODO: only add new objects
204
205 function addObject(hash, cb) {
206 if (ended) return cb(ended)
207 if (hash in objectsAdded || hash == commonHash) return cb()
208 objectsAdded[hash] = true
209 repo.getObjectFromAny(hash, function (err, object) {
210 if (err) return cb(err)
211 if (object.type == 'blob') {
212 objects.push(object)
213 cb()
214 } else {
215 // object must be read twice, so buffer it
216 bufferObject(object, function (err, object) {
217 if (err) return cb(err)
218 objects.push(object)
219 var hashes = getObjectLinks(object)
220 for (var sha1 in hashes)
221 addObject(sha1, done())
222 cb()
223 })
224 }
225 })
226 }
227
228 done(function (err) {
229 if (err) return cb(err)
230 cb(null, objects.length, pull.values(objects))
231 })
232}
233
234function bufferObject(object, cb) {
235 pull(
236 object.read,
237 pull.collect(function (err, bufs) {
238 if (err) return cb(err)
239 var buf = Buffer.concat(bufs, object.length)
240 cb(null, {
241 type: object.type,
242 length: object.length,
243 data: buf,
244 read: pull.once(buf)
245 })
246 })
247 )
248}
249
250// get hashes of git objects linked to from other git objects
251function getObjectLinks(object, cb) {
252 switch (object.type) {
253 case 'blob':
254 return {}
255 case 'tree':
256 return getTreeLinks(object.data)
257 case 'tag':
258 case 'commit':
259 return getCommitOrTagLinks(object.data)
260 }
261}
262
263function getTreeLinks(buf) {
264 var links = {}
265 for (var i = 0, j; j = buf.indexOf(0, i, 'ascii') + 1; i = j + 20) {
266 var hash = buf.slice(j, j + 20).toString('hex')
267 var mode = parseInt(buf.slice(i, j).toString('ascii'), 8)
268 if (mode == 0160000) {
269 // skip link to git commit since it may not be in this repo
270 continue
271 }
272 if (!(hash in links))
273 links[hash] = true
274 }
275 return links
276}
277
278function getCommitOrTagLinks(buf) {
279 var lines = buf.toString('utf8').split('\n')
280 var links = {}
281 // iterate until reach blank line (indicating start of commit/tag body)
282 for (var i = 0; lines[i]; i++) {
283 var args = lines[i].split(' ')
284 switch (args[0]) {
285 case 'tree':
286 case 'parent':
287 case 'object':
288 var hash = args[1]
289 if (!(hash in links))
290 links[hash] = true
291 }
292 }
293 return links
294}
295
296/*
297TODO: investigate capabilities
298report-status delete-refs side-band-64k quiet atomic ofs-delta
299*/
300
301// Get a line for each ref that we have. The first line also has capabilities.
302// Wrap with pktLine.encode.
303function receivePackHeader(capabilities, refSource, symrefs, usePlaceholder) {
304 var first = true
305 var symrefed = {}
306 var symrefsObj = {}
307
308 return cat([
309 function (end, cb) {
310 if (end) cb(true)
311 else if (!symrefs) cb(true)
312 else pull(
313 symrefs,
314 pull.map(function (sym) {
315 symrefed[sym.ref] = true
316 symrefsObj[sym.name] = sym.ref
317 return 'symref=' + sym.name + ':' + sym.ref
318 }),
319 pull.collect(function (err, symrefCaps) {
320 if (err) return cb(err)
321 capabilities = capabilities.concat(symrefCaps)
322 cb(true)
323 })
324 )
325 },
326 pull(
327 refSource,
328 pull.map(function (ref) {
329 // insert symrefs next to the refs that they point to
330 var out = [ref]
331 if (ref.name in symrefed)
332 for (var symrefName in symrefsObj)
333 if (symrefsObj[symrefName] === ref.name)
334 out.push({name: symrefName, hash: ref.hash})
335 return out
336 }),
337 pull.flatten(),
338 pull.map(function (ref) {
339 var name = ref.name
340 var value = ref.hash
341 if (first && usePlaceholder) {
342 first = false
343 /*
344 if (end) {
345 // use placeholder data if there are no refs
346 value = '0000000000000000000000000000000000000000'
347 name = 'capabilities^{}'
348 }
349 */
350 name += '\0' + capabilities.join(' ')
351 }
352 return value + ' ' + name
353 })
354 )
355 ])
356}
357
358// receive-pack: push from client
359function receivePack(read, repo, options) {
360 var sendRefs = receivePackHeader([
361 'delete-refs',
362 'no-thin',
363 ], repo.refs(), null, true)
364 var done = multicb({pluck: 1})
365
366 return pktLine.encode(
367 cat([
368 // send our refs
369 sendRefs,
370 pull.once(''),
371 function (abort, cb) {
372 if (abort) return
373 // receive their refs
374 var lines = pktLine.decode(read, options)
375 pull(
376 lines.updates,
377 pull.collect(function (err, updates) {
378 if (err) return cb(err)
379 if (updates.length === 0) return cb(true)
380 var progress = progressObjects(options)
381
382 if (repo.uploadPack) {
383 var idxCb = done()
384 indexPack(lines.passthrough, function (err, idx, packfileFixed) {
385 if (err) return idxCb(err)
386 repo.uploadPack(pull.values(updates), pull.once({
387 pack: pull(
388 packfileFixed,
389 // for some reason i was getting zero length buffers which
390 // were causing muxrpc to fail, so remove them here.
391 pull.filter(function (buf) {
392 return buf.length
393 })
394 ),
395 idx: idx
396 }), idxCb)
397 })
398 } else {
399 repo.update(pull.values(updates), pull(
400 lines.passthrough,
401 pack.decode({
402 verbosity: options.verbosity,
403 onHeader: function (numObjects) {
404 progress.setNumObjects(numObjects)
405 }
406 }, repo, done()),
407 progress
408 ), done())
409 }
410
411 done(function (err) {
412 cb(err || true)
413 })
414 })
415 )
416 },
417 pull.once('unpack ok')
418 ])
419 )
420}
421
422function prepend(data, read) {
423 var done
424 return function (end, cb) {
425 if (done) {
426 read(end, cb)
427 } else {
428 done = true
429 cb(null, data)
430 }
431 }
432}
433
434module.exports = function (repo) {
435 var ended
436 var options = {
437 verbosity: 1,
438 progress: false
439 }
440
441 repo = Repo(repo)
442
443 function handleConnect(cmd, read) {
444 var args = util.split2(cmd)
445 switch (args[0]) {
446 case 'git-upload-pack':
447 return prepend('\n', uploadPack(read, repo, options))
448 case 'git-receive-pack':
449 return prepend('\n', receivePack(read, repo, options))
450 default:
451 return pull.error(new Error('Unknown service ' + args[0]))
452 }
453 }
454
455 function handleCommand(line, read) {
456 var args = util.split2(line)
457 switch (args[0]) {
458 case 'capabilities':
459 return capabilitiesSource()
460 case 'list':
461 return listRefs(refSource)
462 case 'connect':
463 return handleConnect(args[1], read)
464 case 'option':
465 return optionSource(args[1], options)
466 default:
467 return pull.error(new Error('Unknown command ' + line))
468 }
469 }
470
471 return function (read) {
472 var b = buffered()
473 b(read)
474 var command
475
476 function getCommand(cb) {
477 b.lines(null, function next(end, line) {
478 if (ended = end)
479 return cb(end)
480
481 if (line == '')
482 return b.lines(null, next)
483
484 if (options.verbosity > 1)
485 console.error('command:', line)
486
487 var cmdSource = handleCommand(line, b.passthrough)
488 cb(null, cmdSource)
489 })
490 }
491
492 return function next(abort, cb) {
493 if (ended) return cb(ended)
494
495 if (!command) {
496 if (abort) return
497 getCommand(function (end, cmd) {
498 command = cmd
499 next(end, cb)
500 })
501 return
502 }
503
504 command(abort, function (err, data) {
505 if (err) {
506 command = null
507 if (err !== true)
508 cb(err, data)
509 else
510 next(abort, cb)
511 } else {
512 // HACK: silence error when writing to closed stream
513 try {
514 cb(null, data)
515 } catch(e) {
516 if (e.message == 'process.stdout cannot be closed.'
517 || e.message == 'This socket is closed.')
518 process.exit(1)
519 throw e
520 }
521 }
522 })
523 }
524 }
525}
526

Built with git-ssb-web