git ssb

0+

cel / pull-git-pack-concat



Tree: 95c2f819f269008c6fcc897fbe57cd8a5d4deab1

Files: 95c2f819f269008c6fcc897fbe57cd8a5d4deab1 / index.js

3839 bytesRaw
1var pull = require('pull-stream')
2var cat = require('pull-cat')
3var loop = require('looper')
4var buffered = require('pull-buffered')
5var multicb = require('multicb')
6var crypto = require('crypto')
7var skipFooter = require('pull-skip-footer')
8
9function packHeader(numObjects) {
10 var header = new Buffer(12)
11 header.write('PACK')
12 header.writeUInt32BE(2, 4)
13 header.writeUInt32BE(numObjects, 8)
14 return header
15}
16
17function forEachAsync(arr, fn, cb) {
18 var i = 0
19 loop(function (next) {
20 if (i >= arr.length) return cb()
21 fn(arr[i++], function (err) {
22 if (err) return cb(err)
23 next()
24 })
25 })
26}
27
28function reduceAsync(arr, fn, init, cb) {
29 var i = 0
30 var acc = init
31 loop(function (next) {
32 if (i >= arr.length) return cb(null, acc)
33 fn(arr[i++], acc, function (err, data) {
34 if (err) return cb(err)
35 acc = data
36 next()
37 })
38 })
39}
40
41function skipHeader(len) {
42 return function (read) {
43 return function (end, cb) {
44 if (end || len <= 0) read(end, cb)
45 else read(null, function next(end, data) {
46 if (end) return cb(end)
47 var _len = len
48 len -= data.length
49 if (len > 0) read(null, next)
50 else cb(null, data.slice(_len))
51 })
52 }
53 }
54}
55
56function readHeader(read, len, cb) {
57 var headerBufs = []
58 var dataBuf
59 read(null, function next(end, data) {
60 if (end) return cb(end === true ? new Error('Missing header') : err)
61 if (data.length > len) {
62 // got more than enough for header
63 headerBufs.push(data.slice(0, len))
64 var header = Buffer.concat(headerBufs)
65 headerBufs = null
66 dataBuf = data.slice(len)
67 cb(null, header, readRest)
68 } else if (data.length === len) {
69 // got enough for header
70 headerBufs.push(data)
71 var header = Buffer.concat(headerBufs)
72 headerBufs = null
73 cb(null, header, read)
74 } else {
75 len -= data.length
76 headerBufs.push(data)
77 read(null, next)
78 }
79 })
80 function readRest(end, cb) {
81 var buf = dataBuf
82 if (end || buf == null) read(end, cb)
83 else dataBuf = null, cb(null, buf)
84 }
85}
86
87function getNumObjects(packs, cb) {
88 reduceAsync(packs, function (pack, num, cb) {
89 if (pack.numObjects != null) {
90 pack.read = pull(pack.read, skipHeader(12), skipFooter(20))
91 cb(null, num + pack.numObjects)
92 } else {
93 readHeader(pack.read, 12, function (err, header, readRest) {
94 if (err === true) return cb(new Error('Missing header'))
95 if (err) return cb(err)
96 pack.numObjects = header.readUInt32BE(8)
97 pack.read = skipFooter(20)(readRest)
98 cb(null, num + pack.numObjects)
99 })
100 }
101 }, 0, cb)
102}
103
104function closePacks(packs, cb) {
105 forEachAsync(packs, function (pack, cb) {
106 pack.read(true, cb)
107 })
108}
109
110module.exports = function concatPacks(packs) {
111 /* packs: [{read: source, numObjects: int}] */
112 var checksum = crypto.createHash('sha1')
113 var packI = 0
114 var state = 'begin'
115
116 return function next(end, cb) {
117 switch (state) {
118 case 'begin':
119 if (end) return closePacks(cb)
120 return getNumObjects(packs, function (err, numObjects) {
121 if (err) return cb(err)
122 state = 'startpack'
123 var header = packHeader(numObjects)
124 checksum.update(header)
125 cb(null, header)
126 })
127
128 case 'startpack':
129 if (end) return closePacks(cb)
130 if (packI >= packs.length) {
131 state = 'end'
132 return cb(null, checksum.digest())
133 }
134 var pack = packs[packI]
135 return pack.read(null, function (err, data) {
136 if (err === true) {
137 packI++
138 return next(null, cb)
139 }
140 if (err) return cb(err)
141 checksum.update(data)
142 cb(null, data)
143 })
144
145 case 'end':
146 return cb(true)
147 }
148 }
149}
150

Built with git-ssb-web