git ssb

0+

cel / pull-git-remote-helper



Tree: ba1140afd3a13d6dd0f20a8db819e8775cf1bbac

Files: ba1140afd3a13d6dd0f20a8db819e8775cf1bbac / index.js

14073 bytesRaw
1var pull = require('pull-stream')
2var cat = require('pull-cat')
3var cache = require('pull-cache')
4var buffered = require('pull-buffered')
5var Repo = require('pull-git-repo')
6var pack = require('pull-git-pack')
7var pktLine = require('./lib/pkt-line')
8var indexPack = require('pull-git-pack/lib/index-pack')
9var util = require('./lib/util')
10var multicb = require('multicb')
11var ProgressBar = require('progress')
12
13function handleOption(options, name, value) {
14 switch (name) {
15 case 'verbosity':
16 options.verbosity = +value || 0
17 return true
18 case 'progress':
19 options.progress = !!value && value !== 'false'
20 return true
21 default:
22 console.error('unknown option', name + ': ' + value)
23 return false
24 }
25}
26
27function capabilitiesSource() {
28 return pull.once([
29 'option',
30 'connect',
31 ].join('\n') + '\n\n')
32}
33
34function optionSource(cmd, options) {
35 var args = util.split2(cmd)
36 var msg = handleOption(options, args[0], args[1])
37 msg = (msg === true) ? 'ok'
38 : (msg === false) ? 'unsupported'
39 : 'error ' + msg
40 return pull.once(msg + '\n')
41}
42
43// transform ref objects into lines
44function listRefs(read) {
45 var ended
46 return function (abort, cb) {
47 if (ended) return cb(ended)
48 read(abort, function (end, ref) {
49 ended = end
50 if (end === true) cb(null, '\n')
51 if (end) cb(end)
52 else cb(null,
53 [ref.value, ref.name].concat(ref.attrs || []).join(' ') + '\n')
54 })
55 }
56}
57
58// upload-pack: fetch to client
59function uploadPack(read, repo, options) {
60 /* multi_ack thin-pack side-band side-band-64k ofs-delta shallow no-progress
61 * include-tag multi_ack_detailed
62 * agent=git/2.7.0 */
63 var sendRefs = receivePackHeader([
64 'thin-pack',
65 ], repo.refs(), repo.symrefs(), false)
66
67 var lines = pktLine.decode(read, options)
68 var readHave = lines.haves()
69 var acked
70 var commonHash
71 var sendPack
72 var wants = {}
73 var shallows = {}
74
75 // Packfile negotiation
76 return cat([
77 pktLine.encode(cat([
78 sendRefs,
79 pull.once(''),
80 function (abort, cb) {
81 if (abort) return
82 if (acked) return cb(true)
83
84 // read upload request (wants list) from client
85 var readWant = lines.wants()
86 readWant(null, function (end, want) {
87 if (end === true) return cb(true) // early client disconnect
88 else if (end) cb(end)
89 else nextWant(null, want)
90 })
91 function nextWant(end, want) {
92 if (end) return wantsDone(end === true ? null : end)
93 if (want.type == 'want') {
94 wants[want.hash] = true
95 } else if (want.type == 'shallow') {
96 shallows[want.hash] = true
97 } else {
98 var err = new Error("Unknown thing", want.type, want.hash)
99 return readWant(err, function (e) { cb(e || err) })
100 }
101 readWant(null, nextWant)
102 }
103
104 function wantsDone(err) {
105 if (err) return cb(err)
106 // Read upload haves (haves list).
107 // On first obj-id that we have, ACK
108 // If we have none, NAK.
109 // TODO: implement multi_ack_detailed
110 readHave(null, function next(end, have) {
111 if (end === true) {
112 // found no common object
113 acked = true
114 cb(null, 'NAK')
115 } else if (end)
116 cb(end)
117 else if (have.type != 'have')
118 cb(new Error('Unknown have' + JSON.stringify(have)))
119 else
120 repo.hasObjectFromAny(have.hash, function (err, haveIt) {
121 if (err) return cb(err)
122 if (!haveIt)
123 return readHave(null, next)
124 commonHash = haveIt
125 acked = true
126 cb(null, 'ACK ' + have.hash)
127 })
128 })
129 }
130 },
131 ])),
132
133 function havesDone(abort, cb) {
134 if (abort) return cb(abort)
135 // send pack file to client
136 if (sendPack)
137 return sendPack(abort, cb)
138 getObjects(repo, commonHash, wants, shallows,
139 function (err, numObjects, readObjects) {
140 if (err) return cb(err)
141 var progress = progressObjects(options)
142 progress.setNumObjects(numObjects)
143 sendPack = pack.encode(options, numObjects,
144 progress(readObjects))
145 havesDone(abort, cb)
146 }
147 )
148 }
149 ])
150}
151
152// through stream to show a progress bar for objects being read
153function progressObjects(options) {
154 // Only show progress bar if it is requested and if it won't interfere with
155 // the debug output
156 if (!options.progress || options.verbosity > 1) {
157 var dummyProgress = function (readObject) { return readObject }
158 dummyProgress.setNumObjects = function () {}
159 return dummyProgress
160 }
161
162 var numObjects
163 var size = process.stderr.columns
164 var bar = new ProgressBar(':percent :bar', {
165 total: size,
166 clear: true
167 })
168
169 var progress = function (readObject) {
170 return function (abort, cb) {
171 readObject(abort, function next(end, object) {
172 if (end === true) {
173 bar.terminate()
174 } else if (!end) {
175 var name = object.type + ' ' + object.length
176 bar.tick(size / numObjects)
177 }
178
179 cb(end, object)
180 })
181 }
182 }
183 // TODO: put the num objects in the objects stream as a header object
184 progress.setNumObjects = function (n) {
185 numObjects = n
186 }
187 return progress
188}
189
190function getObjects(repo, commonHash, heads, shallows, cb) {
191 // get objects from commonHash to each head, inclusive.
192 // if commonHash is falsy, use root
193 var objects = []
194 var objectsAdded = {}
195 var done = multicb({pluck: 1})
196 var ended
197
198 // walk back from heads until get to commonHash
199 for (var hash in heads)
200 addObject(hash, done())
201
202 // TODO: only add new objects
203
204 function addObject(hash, cb) {
205 if (ended) return cb(ended)
206 if (hash in objectsAdded || hash == commonHash) return cb()
207 objectsAdded[hash] = true
208 repo.getObjectFromAny(hash, function (err, object) {
209 if (err) return cb(err)
210 if (object.type == 'blob') {
211 objects.push(object)
212 cb()
213 } else {
214 // object must be read twice, so buffer it
215 bufferObject(object, function (err, object) {
216 if (err) return cb(err)
217 objects.push(object)
218 var hashes = getObjectLinks(object)
219 for (var sha1 in hashes)
220 addObject(sha1, done())
221 cb()
222 })
223 }
224 })
225 }
226
227 done(function (err) {
228 if (err) return cb(err)
229 cb(null, objects.length, pull.values(objects))
230 })
231}
232
233function bufferObject(object, cb) {
234 pull(
235 object.read,
236 pull.collect(function (err, bufs) {
237 if (err) return cb(err)
238 var buf = Buffer.concat(bufs, object.length)
239 cb(null, {
240 type: object.type,
241 length: object.length,
242 data: buf,
243 read: pull.once(buf)
244 })
245 })
246 )
247}
248
249// get hashes of git objects linked to from other git objects
250function getObjectLinks(object, cb) {
251 switch (object.type) {
252 case 'blob':
253 return {}
254 case 'tree':
255 return getTreeLinks(object.data)
256 case 'tag':
257 case 'commit':
258 return getCommitOrTagLinks(object.data)
259 }
260}
261
262function getTreeLinks(buf) {
263 var links = {}
264 for (var i = 0, j; j = buf.indexOf(0, i, 'ascii') + 1; i = j + 20) {
265 var hash = buf.slice(j, j + 20).toString('hex')
266 var mode = parseInt(buf.slice(i, j).toString('ascii'), 8)
267 if (mode == 0160000) {
268 // skip link to git commit since it may not be in this repo
269 continue
270 }
271 if (!(hash in links))
272 links[hash] = true
273 }
274 return links
275}
276
277function getCommitOrTagLinks(buf) {
278 var lines = buf.toString('utf8').split('\n')
279 var links = {}
280 // iterate until reach blank line (indicating start of commit/tag body)
281 for (var i = 0; lines[i]; i++) {
282 var args = lines[i].split(' ')
283 switch (args[0]) {
284 case 'tree':
285 case 'parent':
286 case 'object':
287 var hash = args[1]
288 if (!(hash in links))
289 links[hash] = true
290 }
291 }
292 return links
293}
294
295/*
296TODO: investigate capabilities
297report-status delete-refs side-band-64k quiet atomic ofs-delta
298*/
299
300// Get a line for each ref that we have. The first line also has capabilities.
301// Wrap with pktLine.encode.
302function receivePackHeader(capabilities, refSource, symrefs, usePlaceholder) {
303 var first = true
304 var symrefed = {}
305 var symrefsObj = {}
306
307 return cat([
308 function (end, cb) {
309 if (end) cb(true)
310 else if (!symrefs) cb(true)
311 else pull(
312 symrefs,
313 pull.map(function (sym) {
314 symrefed[sym.ref] = true
315 symrefsObj[sym.name] = sym.ref
316 return 'symref=' + sym.name + ':' + sym.ref
317 }),
318 pull.collect(function (err, symrefCaps) {
319 if (err) return cb(err)
320 capabilities = capabilities.concat(symrefCaps)
321 cb(true)
322 })
323 )
324 },
325 pull(
326 refSource,
327 pull.map(function (ref) {
328 // insert symrefs next to the refs that they point to
329 var out = [ref]
330 if (ref.name in symrefed)
331 for (var symrefName in symrefsObj)
332 if (symrefsObj[symrefName] === ref.name)
333 out.push({name: symrefName, hash: ref.hash})
334 return out
335 }),
336 pull.flatten(),
337 pull.map(function (ref) {
338 var name = ref.name
339 var value = ref.hash
340 if (first && usePlaceholder) {
341 first = false
342 /*
343 if (end) {
344 // use placeholder data if there are no refs
345 value = '0000000000000000000000000000000000000000'
346 name = 'capabilities^{}'
347 }
348 */
349 name += '\0' + capabilities.join(' ')
350 }
351 return value + ' ' + name
352 })
353 )
354 ])
355}
356
357// receive-pack: push from client
358function receivePack(read, repo, options) {
359 var sendRefs = receivePackHeader([
360 'delete-refs',
361 'no-thin',
362 ], repo.refs(), null, true)
363 var done = multicb({pluck: 1})
364
365 return pktLine.encode(
366 cat([
367 // send our refs
368 sendRefs,
369 pull.once(''),
370 function (abort, cb) {
371 if (abort) return
372 // receive their refs
373 var lines = pktLine.decode(read, options)
374 pull(
375 lines.updates,
376 pull.collect(function (err, updates) {
377 if (err) return cb(err)
378 if (updates.length === 0) return cb(true)
379 var progress = progressObjects(options)
380
381 if (repo.uploadPack) {
382 var idxCb = done()
383 indexPack(lines.passthrough, function (err, idx, packfileFixed) {
384 if (err) return idxCb(err)
385 repo.uploadPack(pull.values(updates), pull.once({
386 pack: pull(
387 packfileFixed,
388 // for some reason i was getting zero length buffers which
389 // were causing muxrpc to fail, so remove them here.
390 pull.filter(function (buf) {
391 return buf.length
392 })
393 ),
394 idx: idx
395 }), idxCb)
396 })
397 } else {
398 repo.update(pull.values(updates), pull(
399 lines.passthrough,
400 pack.decode({
401 verbosity: options.verbosity,
402 onHeader: function (numObjects) {
403 progress.setNumObjects(numObjects)
404 }
405 }, repo, done()),
406 progress
407 ), done())
408 }
409
410 done(function (err) {
411 cb(err || true)
412 })
413 })
414 )
415 },
416 pull.once('unpack ok')
417 ])
418 )
419}
420
421function prepend(data, read) {
422 var done
423 return function (end, cb) {
424 if (done) {
425 read(end, cb)
426 } else {
427 done = true
428 cb(null, data)
429 }
430 }
431}
432
433module.exports = function (repo) {
434 var ended
435 var options = {
436 verbosity: 1,
437 progress: false
438 }
439
440 repo = Repo(repo)
441
442 function handleConnect(cmd, read) {
443 var args = util.split2(cmd)
444 switch (args[0]) {
445 case 'git-upload-pack':
446 return prepend('\n', uploadPack(read, repo, options))
447 case 'git-receive-pack':
448 return prepend('\n', receivePack(read, repo, options))
449 default:
450 return pull.error(new Error('Unknown service ' + args[0]))
451 }
452 }
453
454 function handleCommand(line, read) {
455 var args = util.split2(line)
456 switch (args[0]) {
457 case 'capabilities':
458 return capabilitiesSource()
459 case 'list':
460 return listRefs(refSource)
461 case 'connect':
462 return handleConnect(args[1], read)
463 case 'option':
464 return optionSource(args[1], options)
465 default:
466 return pull.error(new Error('Unknown command ' + line))
467 }
468 }
469
470 return function (read) {
471 var b = buffered()
472 b(read)
473 var command
474
475 function getCommand(cb) {
476 b.lines(null, function next(end, line) {
477 if (ended = end)
478 return cb(end)
479
480 if (line == '')
481 return b.lines(null, next)
482
483 if (options.verbosity > 1)
484 console.error('command:', line)
485
486 var cmdSource = handleCommand(line, b.passthrough)
487 cb(null, cmdSource)
488 })
489 }
490
491 return function next(abort, cb) {
492 if (ended) return cb(ended)
493
494 if (!command) {
495 if (abort) return
496 getCommand(function (end, cmd) {
497 command = cmd
498 next(end, cb)
499 })
500 return
501 }
502
503 command(abort, function (err, data) {
504 if (err) {
505 command = null
506 if (err !== true)
507 cb(err, data)
508 else
509 next(abort, cb)
510 } else {
511 // HACK: silence error when writing to closed stream
512 try {
513 cb(null, data)
514 } catch(e) {
515 if (e.message == 'process.stdout cannot be closed.'
516 || e.message == 'This socket is closed.')
517 process.exit(1)
518 throw e
519 }
520 }
521 })
522 }
523 }
524}
525

Built with git-ssb-web