Files: 7aa21412f6c0bd6b84022d67ff8e51711eddb510 / flatten.js
1250 bytesRaw
1 | |
2 | |
3 | var values = require('./values') |
4 | var once = require('./once') |
5 | |
6 | //convert a stream of arrays or streams into just a stream. |
7 | module.exports = function flatten () { |
8 | return function (read) { |
9 | var _read |
10 | return function (abort, cb) { |
11 | if (abort) { //abort the current stream, and then stream of streams. |
12 | _read ? _read(abort, function(err) { |
13 | read(err || abort, cb) |
14 | }) : read(abort, cb) |
15 | } |
16 | else if(_read) nextChunk() |
17 | else nextStream() |
18 | |
19 | function nextChunk () { |
20 | _read(null, function (err, data) { |
21 | if (err === true) nextStream() |
22 | else if (err) { |
23 | read(true, function(abortErr) { |
24 | // TODO: what do we do with the abortErr? |
25 | cb(err) |
26 | }) |
27 | } |
28 | else cb(null, data) |
29 | }) |
30 | } |
31 | function nextStream () { |
32 | _read = null |
33 | read(null, function (end, stream) { |
34 | if(end) |
35 | return cb(end) |
36 | if(Array.isArray(stream) || stream && 'object' === typeof stream) |
37 | stream = values(stream) |
38 | else if('function' != typeof stream) |
39 | stream = once(stream) |
40 | _read = stream |
41 | nextChunk() |
42 | }) |
43 | } |
44 | } |
45 | } |
46 | } |
47 | |
48 |
Built with git-ssb-web