git ssb

0+

cel / pull-git-remote-helper



Tree: 64f27ab885dda9e22ca191c12c53aa62985f38d8

Files: 64f27ab885dda9e22ca191c12c53aa62985f38d8 / index.js

12966 bytesRaw
1var pull = require('pull-stream')
2var cat = require('pull-cat')
3var buffered = require('pull-buffered')
4var pack = require('./lib/pack')
5var pktLine = require('./lib/pkt-line')
6var util = require('./lib/util')
7var multicb = require('multicb')
8var ProgressBar = require('progress')
9
10function handleOption(options, name, value) {
11 switch (name) {
12 case 'verbosity':
13 options.verbosity = +value || 0
14 return true
15 case 'progress':
16 options.progress = !!value && value !== 'false'
17 return true
18 default:
19 console.error('unknown option', name + ': ' + value)
20 return false
21 }
22}
23
24function capabilitiesSource() {
25 return pull.once([
26 'option',
27 'connect',
28 ].join('\n') + '\n\n')
29}
30
31function optionSource(cmd, options) {
32 var args = util.split2(cmd)
33 var msg = handleOption(options, args[0], args[1])
34 msg = (msg === true) ? 'ok'
35 : (msg === false) ? 'unsupported'
36 : 'error ' + msg
37 return pull.once(msg + '\n')
38}
39
40// transform ref objects into lines
41function listRefs(read) {
42 var ended
43 return function (abort, cb) {
44 if (ended) return cb(ended)
45 read(abort, function (end, ref) {
46 ended = end
47 if (end === true) cb(null, '\n')
48 if (end) cb(end)
49 else cb(null,
50 [ref.value, ref.name].concat(ref.attrs || []).join(' ') + '\n')
51 })
52 }
53}
54
55// upload-pack: fetch to client
56function uploadPack(read, repo, options) {
57 /* multi_ack thin-pack side-band side-band-64k ofs-delta shallow no-progress
58 * include-tag multi_ack_detailed
59 * agent=git/2.7.0 */
60 var sendRefs = receivePackHeader([
61 ], repo.refs(), repo.symrefs(), false)
62
63 var lines = pktLine.decode(read, options)
64 var readHave = lines.haves()
65 var acked
66 var commonHash
67 var sendPack
68 var wants = {}
69 var shallows = {}
70
71 // Packfile negotiation
72 return cat([
73 pktLine.encode(cat([
74 sendRefs,
75 pull.once(''),
76 function (abort, cb) {
77 if (abort) return
78 if (acked) return cb(true)
79
80 // read upload request (wants list) from client
81 var readWant = lines.wants()
82 readWant(null, function (end, want) {
83 if (end === true) return cb(true) // early client disconnect
84 else if (end) cb(end)
85 else nextWant(null, want)
86 })
87 function nextWant(end, want) {
88 if (end) return wantsDone(end === true ? null : end)
89 if (want.type == 'want') {
90 wants[want.hash] = true
91 } else if (want.type == 'shallow') {
92 shallows[want.hash] = true
93 } else {
94 var err = new Error("Unknown thing", want.type, want.hash)
95 return readWant(err, function (e) { cb(e || err) })
96 }
97 readWant(null, nextWant)
98 }
99
100 function wantsDone(err) {
101 if (err) return cb(err)
102 // Read upload haves (haves list).
103 // On first obj-id that we have, ACK
104 // If we have none, NAK.
105 // TODO: implement multi_ack_detailed
106 readHave(null, function next(end, have) {
107 if (end === true) {
108 // found no common object
109 acked = true
110 cb(null, 'NAK')
111 } else if (end)
112 cb(end)
113 else if (have.type != 'have')
114 cb(new Error('Unknown have' + JSON.stringify(have)))
115 else
116 repo.hasObject(have.hash, function (err, haveIt) {
117 if (err) return cb(err)
118 if (!haveIt)
119 return readHave(null, next)
120 commonHash = haveIt
121 acked = true
122 cb(null, 'ACK ' + have.hash)
123 })
124 })
125 }
126 },
127 ])),
128
129 function havesDone(abort, cb) {
130 if (abort) return cb(abort)
131 // send pack file to client
132 if (sendPack)
133 return sendPack(abort, cb)
134 getObjects(repo, commonHash, wants, shallows,
135 function (err, numObjects, readObjects) {
136 if (err) return cb(err)
137 var progress = progressObjects(options)
138 progress.setNumObjects(numObjects)
139 sendPack = pack.encode(options, numObjects,
140 progress(readObjects))
141 havesDone(abort, cb)
142 }
143 )
144 }
145 ])
146}
147
148// through stream to show a progress bar for objects being read
149function progressObjects(options) {
150 // Only show progress bar if it is requested and if it won't interfere with
151 // the debug output
152 if (!options.progress || options.verbosity > 1) {
153 var dummyProgress = function (readObject) { return readObject }
154 dummyProgress.setNumObjects = function () {}
155 return dummyProgress
156 }
157
158 var numObjects
159 var size = process.stderr.columns
160 var bar = new ProgressBar(':percent :bar', {
161 total: size,
162 clear: true
163 })
164
165 var progress = function (readObject) {
166 return function (abort, cb) {
167 readObject(abort, function next(end, object) {
168 if (end === true) {
169 bar.terminate()
170 } else if (!end) {
171 var name = object.type + ' ' + object.length
172 bar.tick(size / numObjects)
173 }
174
175 cb(end, object)
176 })
177 }
178 }
179 // TODO: put the num objects in the objects stream as a header object
180 progress.setNumObjects = function (n) {
181 numObjects = n
182 }
183 return progress
184}
185
186function getObjects(repo, commonHash, heads, shallows, cb) {
187 // get objects from commonHash to each head, inclusive.
188 // if commonHash is falsy, use root
189 var objects = []
190 var objectsAdded = {}
191 var done = multicb({pluck: 1})
192 var ended
193
194 // walk back from heads until get to commonHash
195 for (var hash in heads)
196 addObject(hash, done())
197
198 // TODO: only add new objects
199
200 function addObject(hash, cb) {
201 if (ended) return cb(ended)
202 if (hash in objectsAdded || hash == commonHash) return cb()
203 objectsAdded[hash] = true
204 repo.getObject(hash, function (err, object) {
205 if (err) return cb(err)
206 if (object.type == 'blob') {
207 objects.push(object)
208 cb()
209 } else {
210 // object must be read twice, so buffer it
211 bufferObject(object, function (err, object) {
212 if (err) return cb(err)
213 objects.push(object)
214 var hashes = getObjectLinks(object)
215 for (var sha1 in hashes)
216 addObject(sha1, done())
217 cb()
218 })
219 }
220 })
221 }
222
223 done(function (err) {
224 if (err) return cb(err)
225 cb(null, objects.length, pull.values(objects))
226 })
227}
228
229function bufferObject(object, cb) {
230 pull(
231 object.read,
232 pull.collect(function (err, bufs) {
233 if (err) return cb(err)
234 var buf = Buffer.concat(bufs, object.length)
235 cb(null, {
236 type: object.type,
237 length: object.length,
238 data: buf,
239 read: pull.once(buf)
240 })
241 })
242 )
243}
244
245// get hashes of git objects linked to from other git objects
246function getObjectLinks(object, cb) {
247 switch (object.type) {
248 case 'blob':
249 return {}
250 case 'tree':
251 return getTreeLinks(object.data)
252 case 'tag':
253 case 'commit':
254 return getCommitOrTagLinks(object.data)
255 }
256}
257
258function getTreeLinks(buf) {
259 var links = {}
260 for (var i = 0, j; j = buf.indexOf(0, i, 'ascii') + 1; i = j + 20) {
261 var hash = buf.slice(j, j + 20).toString('hex')
262 if (!(hash in links))
263 links[hash] = true
264 }
265 return links
266}
267
268function getCommitOrTagLinks(buf) {
269 var lines = buf.toString('utf8').split('\n')
270 var links = {}
271 // iterate until reach blank line (indicating start of commit/tag body)
272 for (var i = 0; lines[i]; i++) {
273 var args = lines[i].split(' ')
274 switch (args[0]) {
275 case 'tree':
276 case 'parent':
277 case 'object':
278 var hash = args[1]
279 if (!(hash in links))
280 links[hash] = true
281 }
282 }
283 return links
284}
285
286/*
287TODO: investigate capabilities
288report-status delete-refs side-band-64k quiet atomic ofs-delta
289*/
290
291// Get a line for each ref that we have. The first line also has capabilities.
292// Wrap with pktLine.encode.
293function receivePackHeader(capabilities, refSource, symrefs, usePlaceholder) {
294 var first = true
295 var symrefed = {}
296 var symrefsObj = {}
297
298 return cat([
299 function (end, cb) {
300 if (end) cb(true)
301 else if (!symrefs) cb(true)
302 else pull(
303 symrefs,
304 pull.map(function (sym) {
305 symrefed[sym.ref] = true
306 symrefsObj[sym.name] = sym.ref
307 return 'symref=' + sym.name + ':' + sym.ref
308 }),
309 pull.collect(function (err, symrefCaps) {
310 if (err) return cb(err)
311 capabilities = capabilities.concat(symrefCaps)
312 cb(true)
313 })
314 )
315 },
316 pull(
317 refSource,
318 pull.map(function (ref) {
319 // insert symrefs next to the refs that they point to
320 var out = [ref]
321 if (ref.name in symrefed)
322 for (var symrefName in symrefsObj)
323 if (symrefsObj[symrefName] === ref.name)
324 out.push({name: symrefName, hash: ref.hash})
325 return out
326 }),
327 pull.flatten(),
328 pull.map(function (ref) {
329 var name = ref.name
330 var value = ref.hash
331 if (first && usePlaceholder) {
332 first = false
333 /*
334 if (end) {
335 // use placeholder data if there are no refs
336 value = '0000000000000000000000000000000000000000'
337 name = 'capabilities^{}'
338 }
339 */
340 name += '\0' + capabilities.join(' ')
341 }
342 return value + ' ' + name
343 })
344 )
345 ])
346}
347
348// receive-pack: push from client
349function receivePack(read, repo, options) {
350 var sendRefs = receivePackHeader([
351 'delete-refs',
352 ], repo.refs(), null, true)
353 var done = multicb({pluck: 1})
354
355 return pktLine.encode(
356 cat([
357 // send our refs
358 sendRefs,
359 pull.once(''),
360 function (abort, cb) {
361 if (abort) return
362 // receive their refs
363 var lines = pktLine.decode(read, options)
364 pull(
365 lines.updates,
366 pull.collect(function (err, updates) {
367 if (err) return cb(err)
368 if (updates.length === 0) return cb(true)
369 var progress = progressObjects(options)
370 repo.update(pull.values(updates), pull(
371 lines.passthrough,
372 pack.decode({
373 verbosity: options.verbosity,
374 onHeader: function (numObjects) {
375 progress.setNumObjects(numObjects)
376 }
377 }, repo, done()),
378 progress
379 ), done())
380 done(function (err) {
381 cb(err || true)
382 })
383 })
384 )
385 },
386 pull.once('unpack ok')
387 ])
388 )
389}
390
391function prepend(data, read) {
392 var done
393 return function (end, cb) {
394 if (done) {
395 read(end, cb)
396 } else {
397 done = true
398 cb(null, data)
399 }
400 }
401}
402
403module.exports = function (repo) {
404 var ended
405 var options = {
406 verbosity: 1,
407 progress: false
408 }
409
410 function handleConnect(cmd, read) {
411 var args = util.split2(cmd)
412 switch (args[0]) {
413 case 'git-upload-pack':
414 return prepend('\n', uploadPack(read, repo, options))
415 case 'git-receive-pack':
416 return prepend('\n', receivePack(read, repo, options))
417 default:
418 return pull.error(new Error('Unknown service ' + args[0]))
419 }
420 }
421
422 function handleCommand(line, read) {
423 var args = util.split2(line)
424 switch (args[0]) {
425 case 'capabilities':
426 return capabilitiesSource()
427 case 'list':
428 return listRefs(refSource)
429 case 'connect':
430 return handleConnect(args[1], read)
431 case 'option':
432 return optionSource(args[1], options)
433 default:
434 return pull.error(new Error('Unknown command ' + line))
435 }
436 }
437
438 return function (read) {
439 var b = buffered()
440 b(read)
441 var command
442
443 function getCommand(cb) {
444 b.lines(null, function next(end, line) {
445 if (ended = end)
446 return cb(end)
447
448 if (line == '')
449 return b.lines(null, next)
450
451 if (options.verbosity > 1)
452 console.error('command:', line)
453
454 var cmdSource = handleCommand(line, b.passthrough)
455 cb(null, cmdSource)
456 })
457 }
458
459 return function next(abort, cb) {
460 if (ended) return cb(ended)
461
462 if (!command) {
463 if (abort) return
464 getCommand(function (end, cmd) {
465 command = cmd
466 next(end, cb)
467 })
468 return
469 }
470
471 command(abort, function (err, data) {
472 if (err) {
473 command = null
474 if (err !== true)
475 cb(err, data)
476 else
477 next(abort, cb)
478 } else {
479 // HACK: silence error when writing to closed stream
480 try {
481 cb(null, data)
482 } catch(e) {
483 if (e.message == 'process.stdout cannot be closed.'
484 || e.message == 'This socket is closed.')
485 process.exit(1)
486 throw e
487 }
488 }
489 })
490 }
491 }
492}
493

Built with git-ssb-web