git ssb

0+

cel / pull-git-remote-helper



Tree: 3743a50e99ae0400aabf3a4a34e510181b274aea

Files: 3743a50e99ae0400aabf3a4a34e510181b274aea / index.js

13870 bytesRaw
1var pull = require('pull-stream')
2var cat = require('pull-cat')
3var cache = require('pull-cache')
4var buffered = require('pull-buffered')
5var Repo = require('pull-git-repo')
6var pack = require('pull-git-pack')
7var pktLine = require('./lib/pkt-line')
8var indexPack = require('./lib/index-pack')
9var util = require('./lib/util')
10var multicb = require('multicb')
11var ProgressBar = require('progress')
12
13function handleOption(options, name, value) {
14 switch (name) {
15 case 'verbosity':
16 options.verbosity = +value || 0
17 return true
18 case 'progress':
19 options.progress = !!value && value !== 'false'
20 return true
21 default:
22 console.error('unknown option', name + ': ' + value)
23 return false
24 }
25}
26
27function capabilitiesSource() {
28 return pull.once([
29 'option',
30 'connect',
31 ].join('\n') + '\n\n')
32}
33
34function optionSource(cmd, options) {
35 var args = util.split2(cmd)
36 var msg = handleOption(options, args[0], args[1])
37 msg = (msg === true) ? 'ok'
38 : (msg === false) ? 'unsupported'
39 : 'error ' + msg
40 return pull.once(msg + '\n')
41}
42
43// transform ref objects into lines
44function listRefs(read) {
45 var ended
46 return function (abort, cb) {
47 if (ended) return cb(ended)
48 read(abort, function (end, ref) {
49 ended = end
50 if (end === true) cb(null, '\n')
51 if (end) cb(end)
52 else cb(null,
53 [ref.value, ref.name].concat(ref.attrs || []).join(' ') + '\n')
54 })
55 }
56}
57
58// upload-pack: fetch to client
59function uploadPack(read, repo, options) {
60 /* multi_ack thin-pack side-band side-band-64k ofs-delta shallow no-progress
61 * include-tag multi_ack_detailed
62 * agent=git/2.7.0 */
63 var sendRefs = receivePackHeader([
64 'thin-pack',
65 ], repo.refs(), repo.symrefs(), false)
66
67 var lines = pktLine.decode(read, options)
68 var readHave = lines.haves()
69 var acked
70 var commonHash
71 var sendPack
72 var wants = {}
73 var shallows = {}
74
75 // Packfile negotiation
76 return cat([
77 pktLine.encode(cat([
78 sendRefs,
79 pull.once(''),
80 function (abort, cb) {
81 if (abort) return
82 if (acked) return cb(true)
83
84 // read upload request (wants list) from client
85 var readWant = lines.wants()
86 readWant(null, function (end, want) {
87 if (end === true) return cb(true) // early client disconnect
88 else if (end) cb(end)
89 else nextWant(null, want)
90 })
91 function nextWant(end, want) {
92 if (end) return wantsDone(end === true ? null : end)
93 if (want.type == 'want') {
94 wants[want.hash] = true
95 } else if (want.type == 'shallow') {
96 shallows[want.hash] = true
97 } else {
98 var err = new Error("Unknown thing", want.type, want.hash)
99 return readWant(err, function (e) { cb(e || err) })
100 }
101 readWant(null, nextWant)
102 }
103
104 function wantsDone(err) {
105 if (err) return cb(err)
106 // Read upload haves (haves list).
107 // On first obj-id that we have, ACK
108 // If we have none, NAK.
109 // TODO: implement multi_ack_detailed
110 readHave(null, function next(end, have) {
111 if (end === true) {
112 // found no common object
113 acked = true
114 cb(null, 'NAK')
115 } else if (end)
116 cb(end)
117 else if (have.type != 'have')
118 cb(new Error('Unknown have' + JSON.stringify(have)))
119 else
120 repo.hasObjectFromAny(have.hash, function (err, haveIt) {
121 if (err) return cb(err)
122 if (!haveIt)
123 return readHave(null, next)
124 commonHash = haveIt
125 acked = true
126 cb(null, 'ACK ' + have.hash)
127 })
128 })
129 }
130 },
131 ])),
132
133 function havesDone(abort, cb) {
134 if (abort) return cb(abort)
135 // send pack file to client
136 if (sendPack)
137 return sendPack(abort, cb)
138 getObjects(repo, commonHash, wants, shallows,
139 function (err, numObjects, readObjects) {
140 if (err) return cb(err)
141 var progress = progressObjects(options)
142 progress.setNumObjects(numObjects)
143 sendPack = pack.encode(options, numObjects,
144 progress(readObjects))
145 havesDone(abort, cb)
146 }
147 )
148 }
149 ])
150}
151
152// through stream to show a progress bar for objects being read
153function progressObjects(options) {
154 // Only show progress bar if it is requested and if it won't interfere with
155 // the debug output
156 if (!options.progress || options.verbosity > 1) {
157 var dummyProgress = function (readObject) { return readObject }
158 dummyProgress.setNumObjects = function () {}
159 return dummyProgress
160 }
161
162 var numObjects
163 var size = process.stderr.columns
164 var bar = new ProgressBar(':percent :bar', {
165 total: size,
166 clear: true
167 })
168
169 var progress = function (readObject) {
170 return function (abort, cb) {
171 readObject(abort, function next(end, object) {
172 if (end === true) {
173 bar.terminate()
174 } else if (!end) {
175 var name = object.type + ' ' + object.length
176 bar.tick(size / numObjects)
177 }
178
179 cb(end, object)
180 })
181 }
182 }
183 // TODO: put the num objects in the objects stream as a header object
184 progress.setNumObjects = function (n) {
185 numObjects = n
186 }
187 return progress
188}
189
190function getObjects(repo, commonHash, heads, shallows, cb) {
191 // get objects from commonHash to each head, inclusive.
192 // if commonHash is falsy, use root
193 var objects = []
194 var objectsAdded = {}
195 var done = multicb({pluck: 1})
196 var ended
197
198 // walk back from heads until get to commonHash
199 for (var hash in heads)
200 addObject(hash, done())
201
202 // TODO: only add new objects
203
204 function addObject(hash, cb) {
205 if (ended) return cb(ended)
206 if (hash in objectsAdded || hash == commonHash) return cb()
207 objectsAdded[hash] = true
208 repo.getObjectFromAny(hash, function (err, object) {
209 if (err) return cb(err)
210 if (object.type == 'blob') {
211 objects.push(object)
212 cb()
213 } else {
214 // object must be read twice, so buffer it
215 bufferObject(object, function (err, object) {
216 if (err) return cb(err)
217 objects.push(object)
218 var hashes = getObjectLinks(object)
219 for (var sha1 in hashes)
220 addObject(sha1, done())
221 cb()
222 })
223 }
224 })
225 }
226
227 done(function (err) {
228 if (err) return cb(err)
229 cb(null, objects.length, pull.values(objects))
230 })
231}
232
233function bufferObject(object, cb) {
234 pull(
235 object.read,
236 pull.collect(function (err, bufs) {
237 if (err) return cb(err)
238 var buf = Buffer.concat(bufs, object.length)
239 cb(null, {
240 type: object.type,
241 length: object.length,
242 data: buf,
243 read: pull.once(buf)
244 })
245 })
246 )
247}
248
249// get hashes of git objects linked to from other git objects
250function getObjectLinks(object, cb) {
251 switch (object.type) {
252 case 'blob':
253 return {}
254 case 'tree':
255 return getTreeLinks(object.data)
256 case 'tag':
257 case 'commit':
258 return getCommitOrTagLinks(object.data)
259 }
260}
261
262function getTreeLinks(buf) {
263 var links = {}
264 for (var i = 0, j; j = buf.indexOf(0, i, 'ascii') + 1; i = j + 20) {
265 var hash = buf.slice(j, j + 20).toString('hex')
266 if (!(hash in links))
267 links[hash] = true
268 }
269 return links
270}
271
272function getCommitOrTagLinks(buf) {
273 var lines = buf.toString('utf8').split('\n')
274 var links = {}
275 // iterate until reach blank line (indicating start of commit/tag body)
276 for (var i = 0; lines[i]; i++) {
277 var args = lines[i].split(' ')
278 switch (args[0]) {
279 case 'tree':
280 case 'parent':
281 case 'object':
282 var hash = args[1]
283 if (!(hash in links))
284 links[hash] = true
285 }
286 }
287 return links
288}
289
290/*
291TODO: investigate capabilities
292report-status delete-refs side-band-64k quiet atomic ofs-delta
293*/
294
295// Get a line for each ref that we have. The first line also has capabilities.
296// Wrap with pktLine.encode.
297function receivePackHeader(capabilities, refSource, symrefs, usePlaceholder) {
298 var first = true
299 var symrefed = {}
300 var symrefsObj = {}
301
302 return cat([
303 function (end, cb) {
304 if (end) cb(true)
305 else if (!symrefs) cb(true)
306 else pull(
307 symrefs,
308 pull.map(function (sym) {
309 symrefed[sym.ref] = true
310 symrefsObj[sym.name] = sym.ref
311 return 'symref=' + sym.name + ':' + sym.ref
312 }),
313 pull.collect(function (err, symrefCaps) {
314 if (err) return cb(err)
315 capabilities = capabilities.concat(symrefCaps)
316 cb(true)
317 })
318 )
319 },
320 pull(
321 refSource,
322 pull.map(function (ref) {
323 // insert symrefs next to the refs that they point to
324 var out = [ref]
325 if (ref.name in symrefed)
326 for (var symrefName in symrefsObj)
327 if (symrefsObj[symrefName] === ref.name)
328 out.push({name: symrefName, hash: ref.hash})
329 return out
330 }),
331 pull.flatten(),
332 pull.map(function (ref) {
333 var name = ref.name
334 var value = ref.hash
335 if (first && usePlaceholder) {
336 first = false
337 /*
338 if (end) {
339 // use placeholder data if there are no refs
340 value = '0000000000000000000000000000000000000000'
341 name = 'capabilities^{}'
342 }
343 */
344 name += '\0' + capabilities.join(' ')
345 }
346 return value + ' ' + name
347 })
348 )
349 ])
350}
351
352// receive-pack: push from client
353function receivePack(read, repo, options) {
354 var sendRefs = receivePackHeader([
355 'delete-refs',
356 ], repo.refs(), null, true)
357 var done = multicb({pluck: 1})
358
359 return pktLine.encode(
360 cat([
361 // send our refs
362 sendRefs,
363 pull.once(''),
364 function (abort, cb) {
365 if (abort) return
366 // receive their refs
367 var lines = pktLine.decode(read, options)
368 pull(
369 lines.updates,
370 pull.collect(function (err, updates) {
371 if (err) return cb(err)
372 if (updates.length === 0) return cb(true)
373 var progress = progressObjects(options)
374
375 if (repo.uploadPack) {
376 var idxCb = done()
377 indexPack(lines.passthrough, function (err, idx, packfileFixed) {
378 if (err) return idxCb(err)
379 repo.uploadPack(pull.values(updates), pull.once({
380 pack: pull(
381 packfileFixed,
382 // for some reason i was getting zero length buffers which
383 // were causing muxrpc to fail, so remove them here.
384 pull.filter(function (buf) {
385 return buf.length
386 })
387 ),
388 idx: idx
389 }), idxCb)
390 })
391 } else {
392 repo.update(pull.values(updates), pull(
393 lines.passthrough,
394 pack.decode({
395 verbosity: options.verbosity,
396 onHeader: function (numObjects) {
397 progress.setNumObjects(numObjects)
398 }
399 }, repo, done()),
400 progress
401 ), done())
402 }
403
404 done(function (err) {
405 cb(err || true)
406 })
407 })
408 )
409 },
410 pull.once('unpack ok')
411 ])
412 )
413}
414
415function prepend(data, read) {
416 var done
417 return function (end, cb) {
418 if (done) {
419 read(end, cb)
420 } else {
421 done = true
422 cb(null, data)
423 }
424 }
425}
426
427module.exports = function (repo) {
428 var ended
429 var options = {
430 verbosity: 1,
431 progress: false
432 }
433
434 repo = Repo(repo)
435
436 function handleConnect(cmd, read) {
437 var args = util.split2(cmd)
438 switch (args[0]) {
439 case 'git-upload-pack':
440 return prepend('\n', uploadPack(read, repo, options))
441 case 'git-receive-pack':
442 return prepend('\n', receivePack(read, repo, options))
443 default:
444 return pull.error(new Error('Unknown service ' + args[0]))
445 }
446 }
447
448 function handleCommand(line, read) {
449 var args = util.split2(line)
450 switch (args[0]) {
451 case 'capabilities':
452 return capabilitiesSource()
453 case 'list':
454 return listRefs(refSource)
455 case 'connect':
456 return handleConnect(args[1], read)
457 case 'option':
458 return optionSource(args[1], options)
459 default:
460 return pull.error(new Error('Unknown command ' + line))
461 }
462 }
463
464 return function (read) {
465 var b = buffered()
466 b(read)
467 var command
468
469 function getCommand(cb) {
470 b.lines(null, function next(end, line) {
471 if (ended = end)
472 return cb(end)
473
474 if (line == '')
475 return b.lines(null, next)
476
477 if (options.verbosity > 1)
478 console.error('command:', line)
479
480 var cmdSource = handleCommand(line, b.passthrough)
481 cb(null, cmdSource)
482 })
483 }
484
485 return function next(abort, cb) {
486 if (ended) return cb(ended)
487
488 if (!command) {
489 if (abort) return
490 getCommand(function (end, cmd) {
491 command = cmd
492 next(end, cb)
493 })
494 return
495 }
496
497 command(abort, function (err, data) {
498 if (err) {
499 command = null
500 if (err !== true)
501 cb(err, data)
502 else
503 next(abort, cb)
504 } else {
505 // HACK: silence error when writing to closed stream
506 try {
507 cb(null, data)
508 } catch(e) {
509 if (e.message == 'process.stdout cannot be closed.'
510 || e.message == 'This socket is closed.')
511 process.exit(1)
512 throw e
513 }
514 }
515 })
516 }
517 }
518}
519

Built with git-ssb-web