git ssb

0+

cel / pull-git-remote-helper



Tree: ddc92710d0eeb04faeb812b67f81828abb16b67d

Files: ddc92710d0eeb04faeb812b67f81828abb16b67d / index.js

13000 bytesRaw
1var pull = require('pull-stream')
2var cat = require('pull-cat')
3var buffered = require('pull-buffered')
4var pack = require('./lib/pack')
5var pktLine = require('./lib/pkt-line')
6var util = require('./lib/util')
7var multicb = require('multicb')
8var ProgressBar = require('progress')
9
10function handleOption(options, name, value) {
11 switch (name) {
12 case 'verbosity':
13 options.verbosity = +value || 0
14 return true
15 case 'progress':
16 options.progress = !!value && value !== 'false'
17 return true
18 default:
19 console.error('unknown option', name + ': ' + value)
20 return false
21 }
22}
23
24function capabilitiesSource() {
25 return pull.once([
26 'option',
27 'connect',
28 ].join('\n') + '\n\n')
29}
30
31function optionSource(cmd, options) {
32 var args = util.split2(cmd)
33 var msg = handleOption(options, args[0], args[1])
34 msg = (msg === true) ? 'ok'
35 : (msg === false) ? 'unsupported'
36 : 'error ' + msg
37 return pull.once(msg + '\n')
38}
39
40// transform ref objects into lines
41function listRefs(read) {
42 var ended
43 return function (abort, cb) {
44 if (ended) return cb(ended)
45 read(abort, function (end, ref) {
46 ended = end
47 if (end === true) cb(null, '\n')
48 if (end) cb(end)
49 else cb(null,
50 [ref.value, ref.name].concat(ref.attrs || []).join(' ') + '\n')
51 })
52 }
53}
54
55// upload-pack: fetch to client
56function uploadPack(read, repo, options) {
57 /* multi_ack thin-pack side-band side-band-64k ofs-delta shallow no-progress
58 * include-tag multi_ack_detailed
59 * agent=git/2.7.0 */
60 var sendRefs = receivePackHeader([
61 ], repo.refs(), repo.symrefs(), false)
62
63 var lines = pktLine.decode(read, options)
64 var readHave = lines.haves()
65 var acked
66 var commonHash
67 var sendPack
68 var wants = {}
69 var shallows = {}
70
71 // Packfile negotiation
72 return cat([
73 pktLine.encode(cat([
74 sendRefs,
75 pull.once(''),
76 function (abort, cb) {
77 if (abort) return
78 if (acked) return cb(true)
79
80 // read upload request (wants list) from client
81 var readWant = lines.wants()
82 readWant(null, function (end, want) {
83 if (end === true) return cb(true) // early client disconnect
84 else if (end) cb(end)
85 else nextWant(null, want)
86 })
87 function nextWant(end, want) {
88 if (end) return wantsDone(end === true ? null : end)
89 if (want.type == 'want') {
90 wants[want.hash] = true
91 } else if (want.type == 'shallow') {
92 shallows[want.hash] = true
93 } else {
94 var err = new Error("Unknown thing", want.type, want.hash)
95 return readWant(err, function (e) { cb(e || err) })
96 }
97 readWant(null, nextWant)
98 }
99
100 function wantsDone(err) {
101 if (err) return cb(err)
102 // Read upload haves (haves list).
103 // On first obj-id that we have, ACK
104 // If we have none, NAK.
105 // TODO: implement multi_ack_detailed
106 readHave(null, function next(end, have) {
107 if (end === true) {
108 // found no common object
109 acked = true
110 cb(null, 'NAK')
111 } else if (end)
112 cb(end)
113 else if (have.type != 'have')
114 cb(new Error('Unknown have' + JSON.stringify(have)))
115 else
116 repo.hasObject(have.hash, function (err, haveIt) {
117 if (err) return cb(err)
118 if (!haveIt)
119 return readHave(null, next)
120 commonHash = haveIt
121 acked = true
122 cb(null, 'ACK ' + have.hash)
123 })
124 })
125 }
126 },
127 ])),
128
129 function havesDone(abort, cb) {
130 if (abort) return cb(abort)
131 // send pack file to client
132 if (sendPack)
133 return sendPack(abort, cb)
134 getObjects(repo, commonHash, wants, shallows,
135 function (err, numObjects, readObjects) {
136 if (err) return cb(err)
137 var progress = progressObjects(options)
138 progress.setNumObjects(numObjects)
139 sendPack = pack.encode(options, numObjects,
140 progress(readObjects))
141 havesDone(abort, cb)
142 }
143 )
144 }
145 ])
146}
147
148// through stream to show a progress bar for objects being read
149function progressObjects(options) {
150 // Only show progress bar if it is requested and if it won't interfere with
151 // the debug output
152 if (!options.progress || options.verbosity > 1) {
153 var dummyProgress = function (readObject) { return readObject }
154 dummyProgress.setNumObjects = function () {}
155 return dummyProgress
156 }
157
158 var numObjects
159 var size = 60 // process.stderr.columns - 5
160 var bar = new ProgressBar(':percent :bar', { total: size })
161
162 var progress = function (readObject) {
163 return function (abort, cb) {
164 readObject(abort, function next(end, object) {
165 if (end === true) {
166 // seems to interfere with stdout
167 // bar.terminate()
168 } else if (!end) {
169 var name = object.type + ' ' + object.length
170 bar.tick(size / numObjects)
171 }
172
173 cb(end, object)
174 })
175 }
176 }
177 // TODO: put the num objects in the objects stream as a header object
178 progress.setNumObjects = function (n) {
179 numObjects = n
180 }
181 return progress
182}
183
184function getObjects(repo, commonHash, heads, shallows, cb) {
185 // get objects from commonHash to each head, inclusive.
186 // if commonHash is falsy, use root
187 var objects = []
188 var objectsAdded = {}
189 var done = multicb({pluck: 1})
190 var ended
191
192 // walk back from heads until get to commonHash
193 for (var hash in heads)
194 addObject(hash, done())
195
196 // TODO: only add new objects
197
198 function addObject(hash, cb) {
199 if (ended) return cb(ended)
200 if (hash in objectsAdded || hash == commonHash) return cb()
201 objectsAdded[hash] = true
202 repo.getObject(hash, function (err, object) {
203 if (err) return cb(err)
204 if (object.type == 'blob') {
205 objects.push(object)
206 cb()
207 } else {
208 // object must be read twice, so buffer it
209 bufferObject(object, function (err, object) {
210 if (err) return cb(err)
211 objects.push(object)
212 var hashes = getObjectLinks(object)
213 for (var sha1 in hashes)
214 addObject(sha1, done())
215 cb()
216 })
217 }
218 })
219 }
220
221 done(function (err) {
222 if (err) return cb(err)
223 cb(null, objects.length, pull.values(objects))
224 })
225}
226
227function bufferObject(object, cb) {
228 pull(
229 object.read,
230 pull.collect(function (err, bufs) {
231 if (err) return cb(err)
232 var buf = Buffer.concat(bufs, object.length)
233 cb(null, {
234 type: object.type,
235 length: object.length,
236 data: buf,
237 read: pull.once(buf)
238 })
239 })
240 )
241}
242
243// get hashes of git objects linked to from other git objects
244function getObjectLinks(object, cb) {
245 switch (object.type) {
246 case 'blob':
247 return {}
248 case 'tree':
249 return getTreeLinks(object.data)
250 case 'tag':
251 case 'commit':
252 return getCommitOrTagLinks(object.data)
253 }
254}
255
256function getTreeLinks(buf) {
257 var links = {}
258 for (var i = 0, j; j = buf.indexOf(0, i, 'ascii') + 1; i = j + 20) {
259 var hash = buf.slice(j, j + 20).toString('hex')
260 if (!(hash in links))
261 links[hash] = true
262 }
263 return links
264}
265
266function getCommitOrTagLinks(buf) {
267 var lines = buf.toString('utf8').split('\n')
268 var links = {}
269 // iterate until reach blank line (indicating start of commit/tag body)
270 for (var i = 0; lines[i]; i++) {
271 var args = lines[i].split(' ')
272 switch (args[0]) {
273 case 'tree':
274 case 'parent':
275 case 'object':
276 var hash = args[1]
277 if (!(hash in links))
278 links[hash] = true
279 }
280 }
281 return links
282}
283
284/*
285TODO: investigate capabilities
286report-status delete-refs side-band-64k quiet atomic ofs-delta
287*/
288
289// Get a line for each ref that we have. The first line also has capabilities.
290// Wrap with pktLine.encode.
291function receivePackHeader(capabilities, refSource, symrefs, usePlaceholder) {
292 var first = true
293 var symrefed = {}
294 var symrefsObj = {}
295
296 return cat([
297 function (end, cb) {
298 if (end) cb(true)
299 else if (!symrefs) cb(true)
300 else pull(
301 symrefs,
302 pull.map(function (sym) {
303 symrefed[sym.ref] = true
304 symrefsObj[sym.name] = sym.ref
305 return 'symref=' + sym.name + ':' + sym.ref
306 }),
307 pull.collect(function (err, symrefCaps) {
308 if (err) return cb(err)
309 capabilities = capabilities.concat(symrefCaps)
310 cb(true)
311 })
312 )
313 },
314 pull(
315 refSource,
316 pull.map(function (ref) {
317 // insert symrefs next to the refs that they point to
318 var out = [ref]
319 if (ref.name in symrefed)
320 for (var symrefName in symrefsObj)
321 if (symrefsObj[symrefName] === ref.name)
322 out.push({name: symrefName, hash: ref.hash})
323 return out
324 }),
325 pull.flatten(),
326 pull.map(function (ref) {
327 var name = ref.name
328 var value = ref.hash
329 if (first && usePlaceholder) {
330 first = false
331 /*
332 if (end) {
333 // use placeholder data if there are no refs
334 value = '0000000000000000000000000000000000000000'
335 name = 'capabilities^{}'
336 }
337 */
338 name += '\0' + capabilities.join(' ')
339 }
340 return value + ' ' + name
341 })
342 )
343 ])
344}
345
346// receive-pack: push from client
347function receivePack(read, repo, options) {
348 var sendRefs = receivePackHeader([
349 'delete-refs',
350 ], repo.refs(), null, true)
351 var done = multicb({pluck: 1})
352
353 return pktLine.encode(
354 cat([
355 // send our refs
356 sendRefs,
357 pull.once(''),
358 function (abort, cb) {
359 if (abort) return
360 // receive their refs
361 var lines = pktLine.decode(read, options)
362 pull(
363 lines.updates,
364 pull.collect(function (err, updates) {
365 if (err) return cb(err)
366 if (updates.length === 0) return cb(true)
367 var progress = progressObjects(options)
368 repo.update(pull.values(updates), pull(
369 lines.passthrough,
370 pack.decode({
371 verbosity: options.verbosity,
372 onHeader: function (numObjects) {
373 progress.setNumObjects(numObjects)
374 }
375 }, repo, done()),
376 progress
377 ), done())
378 done(function (err) {
379 cb(err || true)
380 })
381 })
382 )
383 },
384 pull.once('unpack ok')
385 ])
386 )
387}
388
389function prepend(data, read) {
390 var done
391 return function (end, cb) {
392 if (done) {
393 read(end, cb)
394 } else {
395 done = true
396 cb(null, data)
397 }
398 }
399}
400
401module.exports = function (repo) {
402 var ended
403 var options = {
404 verbosity: 1,
405 progress: false
406 }
407
408 function handleConnect(cmd, read) {
409 var args = util.split2(cmd)
410 switch (args[0]) {
411 case 'git-upload-pack':
412 return prepend('\n', uploadPack(read, repo, options))
413 case 'git-receive-pack':
414 return prepend('\n', receivePack(read, repo, options))
415 default:
416 return pull.error(new Error('Unknown service ' + args[0]))
417 }
418 }
419
420 function handleCommand(line, read) {
421 var args = util.split2(line)
422 switch (args[0]) {
423 case 'capabilities':
424 return capabilitiesSource()
425 case 'list':
426 return listRefs(refSource)
427 case 'connect':
428 return handleConnect(args[1], read)
429 case 'option':
430 return optionSource(args[1], options)
431 default:
432 return pull.error(new Error('Unknown command ' + line))
433 }
434 }
435
436 return function (read) {
437 var b = buffered()
438 b(read)
439 var command
440
441 function getCommand(cb) {
442 b.lines(null, function next(end, line) {
443 if (ended = end)
444 return cb(end)
445
446 if (line == '')
447 return b.lines(null, next)
448
449 if (options.verbosity > 1)
450 console.error('command:', line)
451
452 var cmdSource = handleCommand(line, b.passthrough)
453 cb(null, cmdSource)
454 })
455 }
456
457 return function next(abort, cb) {
458 if (ended) return cb(ended)
459
460 if (!command) {
461 if (abort) return
462 getCommand(function (end, cmd) {
463 command = cmd
464 next(end, cb)
465 })
466 return
467 }
468
469 command(abort, function (err, data) {
470 if (err) {
471 command = null
472 if (err !== true)
473 cb(err, data)
474 else
475 next(abort, cb)
476 } else {
477 // HACK: silence error when writing to closed stream
478 try {
479 cb(null, data)
480 } catch(e) {
481 if (e.message == 'process.stdout cannot be closed.'
482 || e.message == 'This socket is closed.')
483 process.exit(1)
484 throw e
485 }
486 }
487 })
488 }
489 }
490}
491

Built with git-ssb-web