Files: b763dc91d24d0b1d54f6e8eb7d4587259deb5a52 / index.js
3775 bytesRaw
1 | var pull = require('pull-stream') |
2 | var cat = require('pull-cat') |
3 | var loop = require('looper') |
4 | var buffered = require('pull-buffered') |
5 | var multicb = require('multicb') |
6 | var crypto = require('crypto') |
7 | var skipFooter = require('pull-skip-footer') |
8 | |
9 | function packHeader(numObjects) { |
10 | var header = new Buffer(12) |
11 | header.write('PACK') |
12 | header.writeUInt32BE(2, 4) |
13 | header.writeUInt32BE(numObjects, 8) |
14 | return header |
15 | } |
16 | |
17 | function forEachAsync(arr, fn, cb) { |
18 | var i = 0 |
19 | loop(function (next) { |
20 | if (i >= arr.length) return cb() |
21 | fn(arr[i++], function (err) { |
22 | if (err) return cb(err) |
23 | next() |
24 | }) |
25 | }) |
26 | } |
27 | |
28 | function reduceAsync(arr, fn, init, cb) { |
29 | var i = 0 |
30 | var acc = init |
31 | loop(function (next) { |
32 | if (i >= arr.length) return cb(null, acc) |
33 | fn(arr[i++], acc, function (err, data) { |
34 | if (err) return cb(err) |
35 | acc = data |
36 | next() |
37 | }) |
38 | }) |
39 | } |
40 | |
41 | function skipHeader(len) { |
42 | return function (read) { |
43 | return function (end, cb) { |
44 | if (end || len <= 0) read(end, cb) |
45 | else read(null, function next(end, data) { |
46 | if (end) return cb(end) |
47 | var _len = len |
48 | len -= data.length |
49 | if (len > 0) read(null, next) |
50 | else cb(null, data.slice(_len)) |
51 | }) |
52 | } |
53 | } |
54 | } |
55 | |
56 | function readHeader(read, len, cb) { |
57 | var headerBufs = [] |
58 | var dataBuf |
59 | read(null, function next(end, data) { |
60 | if (end) return cb(end === true ? new Error('Missing header') : err) |
61 | if (data.length > len) { |
62 | // got more than enough for header |
63 | headerBufs.push(data.slice(0, len)) |
64 | var header = Buffer.concat(headerBufs) |
65 | headerBufs = null |
66 | dataBuf = data.slice(len) |
67 | cb(null, header, readRest) |
68 | } else if (data.length === len) { |
69 | // got enough for header |
70 | headerBufs.push(data) |
71 | var header = Buffer.concat(headerBufs) |
72 | headerBufs = null |
73 | cb(null, header, read) |
74 | } else { |
75 | len -= data.length |
76 | headerBufs.push(data) |
77 | read(null, next) |
78 | } |
79 | }) |
80 | function readRest(end, cb) { |
81 | var buf = dataBuf |
82 | if (end || buf == null) read(end, cb) |
83 | else dataBuf = null, cb(null, buf) |
84 | } |
85 | } |
86 | |
87 | function getNumObjects(packs, cb) { |
88 | reduceAsync(packs, function (pack, num, cb) { |
89 | if (pack.numObjects != null) { |
90 | pack.read = pull(pack.read, skipHeader(12), skipFooter(20)) |
91 | cb(null, num + pack.numObjects) |
92 | } else { |
93 | readHeader(pack.read, 12, function (err, header, readRest) { |
94 | if (err === true) return cb(new Error('Missing header')) |
95 | if (err) return cb(err) |
96 | pack.numObjects = header.readUInt32BE(8) |
97 | pack.read = skipFooter(20)(readRest) |
98 | cb(null, num + pack.numObjects) |
99 | }) |
100 | } |
101 | }, 0, cb) |
102 | } |
103 | |
104 | function closePacks(packs, cb) { |
105 | forEachAsync(packs, function (pack, cb) { |
106 | pack.read(true, cb) |
107 | }) |
108 | } |
109 | |
110 | module.exports = function concatPacks(packs) { |
111 | /* packs: [{read: source, numObjects: int}] */ |
112 | var checksum = crypto.createHash('sha1') |
113 | var packI = 0 |
114 | var state = 'begin' |
115 | |
116 | return function next(end, cb) { |
117 | switch (state) { |
118 | case 'begin': |
119 | if (end) return closePacks(cb) |
120 | return getNumObjects(packs, function (err, numObjects) { |
121 | if (err) return cb(err) |
122 | state = 'startpack' |
123 | cb(null, packHeader(numObjects)) |
124 | }) |
125 | |
126 | case 'startpack': |
127 | if (end) return closePacks(cb) |
128 | if (packI >= packs.length) { |
129 | state = 'end' |
130 | return cb(null, checksum.digest()) |
131 | } |
132 | var pack = packs[packI] |
133 | return pack.read(null, function (err, data) { |
134 | if (err === true) { |
135 | packI++ |
136 | return next(null, cb) |
137 | } |
138 | if (err) return cb(err) |
139 | checksum.update(data) |
140 | cb(null, data) |
141 | }) |
142 | |
143 | case 'end': |
144 | return cb(true) |
145 | } |
146 | } |
147 | } |
148 |
Built with git-ssb-web