Files: e75b6ac2254345793473e239af19644d38aa30d1 / throughs.js
1607 bytesRaw
1 | var map = function (map) { |
2 | map = map || function (e) {return e} |
3 | return function (readable) { |
4 | return function (reader) { |
5 | return reader(function (end, cb) { |
6 | readable(end, function (end, data) { |
7 | cb(end, map(data)) |
8 | }) |
9 | }) |
10 | } |
11 | } |
12 | } |
13 | |
14 | var dumb = function (read, op) { |
15 | return function (end, cb) { |
16 | return read(end, function (end, data) { |
17 | op && op(data) |
18 | cb(end, data) |
19 | }) |
20 | } |
21 | } |
22 | |
23 | var take = function (read, test) { |
24 | var ended = false |
25 | if('number' === typeof test) { |
26 | var n = test; test = function () { |
27 | return n-- > 0 |
28 | } |
29 | } |
30 | return function (end, cb) { |
31 | if(end) { |
32 | if(!ended) return ended = end, readable(end, cb) |
33 | cb(ended) |
34 | } |
35 | return read(null, function (end, data) { |
36 | if(end || !test(data)) return read(end || true, cb) |
37 | return cb(null, data) |
38 | }) |
39 | } |
40 | } |
41 | |
42 | |
43 | var nextTick = process.nextTick |
44 | var highWaterMark = function (read, highWaterMark) { |
45 | var buffer = [], waiting = [], ended, reading = false |
46 | highWaterMark = highWaterMark || 10 |
47 | |
48 | function readAhead () { |
49 | while(waiting.length && (buffer.length || ended)) |
50 | waiting.shift()(ended, ended ? null : buffer.shift()) |
51 | } |
52 | |
53 | function next () { |
54 | if(ended || reading || buffer.length >= highWaterMark) |
55 | return |
56 | reading = true |
57 | return read(ended, function (end, data) { |
58 | reading = false |
59 | ended = ended || end |
60 | if(data != null) buffer.push(data) |
61 | |
62 | next(); readAhead() |
63 | }) |
64 | } |
65 | |
66 | nextTick(next) |
67 | |
68 | return function (end, cb) { |
69 | ended = ended || end |
70 | waiting.push(cb) |
71 | |
72 | next(); readAhead() |
73 | } |
74 | } |
75 | |
76 |
Built with git-ssb-web