var pull = require('pull-stream') var cat = require('pull-cat') var loop = require('looper') var buffered = require('pull-buffered') var multicb = require('multicb') var crypto = require('crypto') var skipFooter = require('pull-skip-footer') function packHeader(numObjects) { var header = new Buffer(12) header.write('PACK') header.writeUInt32BE(2, 4) header.writeUInt32BE(numObjects, 8) return header } function forEachAsync(arr, fn, cb) { var i = 0 loop(function (next) { if (i >= arr.length) return cb() fn(arr[i++], function (err) { if (err) return cb(err) next() }) }) } function reduceAsync(arr, fn, init, cb) { var i = 0 var acc = init loop(function (next) { if (i >= arr.length) return cb(null, acc) fn(arr[i++], acc, function (err, data) { if (err) return cb(err) acc = data next() }) }) } function skipHeader(len) { return function (read) { return function (end, cb) { if (end || len <= 0) read(end, cb) else read(null, function next(end, data) { if (end) return cb(end) var _len = len len -= data.length if (len > 0) read(null, next) else cb(null, data.slice(_len)) }) } } } function readHeader(read, len, cb) { var headerBufs = [] var dataBuf read(null, function next(end, data) { if (end) return cb(end === true ? new Error('Missing header') : err) if (data.length > len) { // got more than enough for header headerBufs.push(data.slice(0, len)) var header = Buffer.concat(headerBufs) headerBufs = null dataBuf = data.slice(len) cb(null, header, readRest) } else if (data.length === len) { // got enough for header headerBufs.push(data) var header = Buffer.concat(headerBufs) headerBufs = null cb(null, header, read) } else { len -= data.length headerBufs.push(data) read(null, next) } }) function readRest(end, cb) { var buf = dataBuf if (end || buf == null) read(end, cb) else dataBuf = null, cb(null, buf) } } function getNumObjects(packs, cb) { reduceAsync(packs, function (pack, num, cb) { if (pack.numObjects != null) { pack.read = pull(pack.read, skipHeader(12), skipFooter(20)) cb(null, num + pack.numObjects) } else { readHeader(pack.read, 12, function (err, header, readRest) { if (err === true) return cb(new Error('Missing header')) if (err) return cb(err) pack.numObjects = header.readUInt32BE(8) pack.read = skipFooter(20)(readRest) cb(null, num + pack.numObjects) }) } }, 0, cb) } function closePacks(packs, cb) { forEachAsync(packs, function (pack, cb) { pack.read(true, cb) }) } module.exports = function concatPacks(packs) { /* packs: [{read: source, numObjects: int}] */ var checksum = crypto.createHash('sha1') var packI = 0 var state = 'begin' return function next(end, cb) { switch (state) { case 'begin': if (end) return closePacks(cb) return getNumObjects(packs, function (err, numObjects) { if (err) return cb(err) state = 'startpack' var header = packHeader(numObjects) checksum.update(header) cb(null, header) }) case 'startpack': if (end) return closePacks(cb) if (packI >= packs.length) { state = 'end' return cb(null, checksum.digest()) } var pack = packs[packI] return pack.read(null, function (err, data) { if (err === true) { packI++ return next(null, cb) } if (err) return cb(err) checksum.update(data) cb(null, data) }) case 'end': return cb(true) } } }