var pull = require('pull-stream') var cat = require('pull-cat') var buffered = require('pull-buffered') var Repo = require('pull-git-repo') var pack = require('pull-git-pack') var pktLine = require('./lib/pkt-line') var indexPack = require('pull-git-pack/lib/index-pack') var util = require('./lib/util') var multicb = require('multicb') var ProgressBar = require('progress') var pkg = require('./package.json') var agentCap = 'agent=' + pkg.name + '/' + pkg.version var dummyCapsRef = { name: 'capabilities^{}', hash: '0000000000000000000000000000000000000000' } function handleOption(options, name, value) { switch (name) { case 'verbosity': options.verbosity = value|0 return true case 'progress': options.progress = !!value && value !== 'false' return true /* case 'followtags': options.followtags = !!value return true case 'depth': options.depth = depth|0 return true case 'deepen-relative': options.deepenRelative = !!value && value !== 'false' return true */ default: console.error('unknown option', name + ': ' + value) return false } } function capabilitiesSource() { return pull.once([ 'option', 'connect', ].join('\n') + '\n\n') } function optionSource(cmd, options) { var args = util.split2(cmd) var msg = handleOption(options, args[0], args[1]) msg = (msg === true) ? 'ok' : (msg === false) ? 'unsupported' : 'error ' + msg return pull.once(msg + '\n') } // transform ref objects into lines function listRefs(read) { var ended return function (abort, cb) { if (ended) return cb(ended) read(abort, function (end, ref) { ended = end if (end === true) cb(null, '\n') if (end) cb(end) else cb(null, [ref.value, ref.name].concat(ref.attrs || []).join(' ') + '\n') }) } } // upload-pack: fetch to client // references: // git/documentation/technical/pack-protocol.txt // git/documentation/technical/protocol-capabilities.txt function uploadPack(read, repo, options) { var sendRefs = receivePackHeader([ agentCap, ], repo.refs(), repo.symrefs()) var lines = pktLine.decode(read, { onCaps: onCaps, verbosity: options.verbosity }) var readWantHave = lines.haves() var acked var haves = {} var sendPack var wants = {} var shallows = {} var aborted var hasWants var gotHaves function onCaps(caps) { if (options.verbosity >= 2) { console.error('client capabilities:', caps) } } function readWant(abort, cb) { if (abort) return // read upload request (wants list) from client readWantHave(null, function next(end, want) { if (end || want.type == 'flush-pkt') { cb(end || true, cb) return } if (want.type == 'want') { wants[want.hash] = true hasWants = true } else if (want.type == 'shallow') { shallows[want.hash] = true } else { var err = new Error("Unknown thing", want.type, want.hash) return readWantHave(err, function (e) { cb(e || err) }) } readWantHave(null, next) }) } function readHave(abort, cb) { // Read upload haves (haves list). // On first obj-id that we have, ACK // If we have none, NAK. if (abort) return if (gotHaves) return cb(true) readWantHave(null, function next(end, have) { if (end === true) { // done gotHaves = true if (!acked) { cb(null, 'NAK') } else { cb(true) } } else if (have.type === 'flush-pkt') { // found no common object if (!acked) { cb(null, 'NAK') } else { readWantHave(null, next) } } else if (end) cb(end) else if (have.type != 'have') cb(new Error('Unknown have' + JSON.stringify(have))) else { haves[have.hash] = true if (acked) { readWantHave(null, next) } else if (repo.hasObjectQuick) { gotHasObject(null, repo.hasObjectQuick(have.hash)) } else { repo.hasObjectFromAny(have.hash, gotHasObject) } } function gotHasObject(err, haveIt) { if (err) return cb(err) if (!haveIt) return readWantHave(null, next) acked = true cb(null, 'ACK ' + have.hash) } }) } function readPack(abort, cb) { if (sendPack) return sendPack(abort, cb) if (abort || aborted) return console.error('abrt', abort || aborted), cb(abort || aborted) // send pack file to client if (!hasWants) return cb(true) if (options.verbosity >= 2) { console.error('wants', wants) } repo.getPack(wants, haves, options, function (err, _sendPack) { if (err) return cb(err) sendPack = _sendPack sendPack(null, cb) }) } // Packfile negotiation return cat([ pktLine.encode(cat([ sendRefs, pull.once(''), readWant, readHave ])), readPack ]) } // through stream to show a progress bar for objects being read function progressObjects(options) { // Only show progress bar if it is requested and if it won't interfere with // the debug output if (!options.progress || options.verbosity > 1) { var dummyProgress = function (readObject) { return readObject } dummyProgress.setNumObjects = function () {} return dummyProgress } var numObjects var size = process.stderr.columns var bar = new ProgressBar(':percent :bar', { total: size, clear: true }) var progress = function (readObject) { return function (abort, cb) { readObject(abort, function next(end, object) { if (end === true) { bar.terminate() } else if (!end) { var name = object.type + ' ' + object.length bar.tick(size / numObjects) } cb(end, object) }) } } // TODO: put the num objects in the objects stream as a header object progress.setNumObjects = function (n) { numObjects = n } return progress } // Get a line for each ref that we have. The first line also has capabilities. // Wrap with pktLine.encode. function receivePackHeader(capabilities, refSource, symrefs) { var hasRefs = false var first = true var symrefed = {} var symrefsObj = {} return cat([ function (end, cb) { if (end) cb(true) else if (!symrefs) cb(true) else pull( symrefs, pull.map(function (sym) { symrefed[sym.ref] = true symrefsObj[sym.name] = sym.ref return 'symref=' + sym.name + ':' + sym.ref }), pull.collect(function (err, symrefCaps) { if (err) return cb(err) capabilities = capabilities.concat(symrefCaps) cb(true) }) ) }, pull( cat([refSource, pull.once(dummyCapsRef)]), pull.map(function (ref) { if (ref !== dummyCapsRef) hasRefs = true else if (hasRefs) return [] else return [ref] // insert symrefs next to the refs that they point to var out = [ref] if (ref.name in symrefed) for (var symrefName in symrefsObj) if (symrefsObj[symrefName] === ref.name) out.push({name: symrefName, hash: ref.hash}) return out }), pull.flatten(), pull.map(function (ref) { var name = ref.name var value = ref.hash if (first) { first = false name += '\0' + capabilities.join(' ') } return value + ' ' + name }) ) ]) } // receive-pack: push from client function receivePack(read, repo, options) { var serverCaps = [ agentCap, 'delete-refs', 'quiet', 'no-thin' ] if (repo.setPushOptions) serverCaps.push('push-options') var sendRefs = receivePackHeader(serverCaps, repo.refs(), null) var done = multicb({pluck: 1}) var clientCaps = {} function onCaps(caps) { clientCaps = caps if (options.verbosity >= 2) { console.error('client capabilities:', caps) } } return pktLine.encode( cat([ // send our refs sendRefs, pull.once(''), function (abort, cb) { if (abort) return // receive their refs var lines = pktLine.decode(read, { onCaps: onCaps, verbosity: options.verbosity }) pull( lines.updates, pull.collect(function (err, updates) { if (err) return cb(err) if (updates.length === 0) return cb(true) var usePushOpts = clientCaps['push-options'] pull( usePushOpts ? lines.pktLineStrs : pull.empty(), pull.reduce(function (opts, opt) { opts[opt] = true return opts }, {}, pushOptsCb) ) function pushOptsCb(err, pushOpts) { if (err) return cb(err) repo.setPushOptions(pushOpts) var progress = progressObjects(options) var hasPack = !updates.every(function (update) { return update.new === null }) if (repo.uploadPack) { if (!hasPack) { repo.uploadPack(pull.values(updates), pull.empty(), done()) } else { var idxCb = done() pull(lines.passthrough, indexPack(function (err, idx, packfileFixed) { if (err) return idxCb(err) repo.uploadPack(pull.values(updates), pull.once({ pack: pull( packfileFixed, // for some reason i was getting zero length buffers which // were causing muxrpc to fail, so remove them here. pull.filter(function (buf) { return buf.length }) ), idx: idx }), idxCb) })) } } else { repo.update(pull.values(updates), !hasPack ? pull.empty() : pull( lines.passthrough, pack.decode({ verbosity: options.verbosity, onHeader: function (numObjects) { progress.setNumObjects(numObjects) } }, repo, done()), progress ), done()) } done(function (err) { cb(err || true) }) } }) ) }, pull.once('unpack ok') ]) ) } function prepend(data, read) { var done return function (end, cb) { if (done) { read(end, cb) } else { done = true cb(null, data) } } } module.exports = function (repo) { var ended var options = { verbosity: +process.env.GIT_VERBOSITY || 1, progress: false } repo = Repo(repo) function handleConnect(cmd, read) { var args = util.split2(cmd) switch (args[0]) { case 'git-upload-pack': return prepend('\n', uploadPack(read, repo, options)) case 'git-receive-pack': return prepend('\n', receivePack(read, repo, options)) default: return pull.error(new Error('Unknown service ' + args[0])) } } function handleCommand(line, read) { var args = util.split2(line) switch (args[0]) { case 'capabilities': return capabilitiesSource() case 'list': return listRefs(refSource) case 'connect': return handleConnect(args[1], read) case 'option': return optionSource(args[1], options) default: return pull.error(new Error('Unknown command ' + line)) } } return function (read) { var b = buffered() if (options.verbosity >= 3) { read = pull.through(function (data) { console.error('>', JSON.stringify(data.toString('ascii'))) })(read) } b(read) var command function getCommand(cb) { b.lines(null, function next(end, line) { if (ended = end) return cb(end) if (line == '') return b.lines(null, next) if (options.verbosity > 1) console.error('command:', line) var cmdSource = handleCommand(line, b.passthrough) cb(null, cmdSource) }) } return function next(abort, cb) { if (ended) return cb(ended) if (!command) { if (abort) return getCommand(function (end, cmd) { command = cmd next(end, cb) }) return } command(abort, function (err, data) { if (err) { command = null if (err !== true) cb(err, data) else next(abort, cb) } else { if (options.verbosity >= 3) { console.error('<', JSON.stringify(data)) } cb(null, data) } }) } } }