Files: c6af3c441f8bde3384e3ca604903bb01755e2fac / throughs.js
2349 bytesRaw
1 | var k = 0 |
2 | var map = exports.map = |
3 | function (read, map) { |
4 | var _k = k++ |
5 | if('string' == typeof map) { |
6 | var key = map |
7 | map = function (data) { return data[key] } |
8 | } |
9 | map = map || function (e) {return e} |
10 | return function (end, cb) { |
11 | read(end, function (end, data) { |
12 | var data = !end ? map(data) : null |
13 | cb(end, data) |
14 | }) |
15 | } |
16 | } |
17 | |
18 | var filter = exports.filter = |
19 | function (read, test) { |
20 | //regexp |
21 | if('object' === typeof test |
22 | && 'function' === typeof test.test) |
23 | test = test.test.bind(test) |
24 | test = test || function (data) {return !!data} |
25 | return function next (end, cb) { |
26 | read(end, function (end, data) { |
27 | if(!end && !test(data)) |
28 | return next(end, cb) |
29 | cb(end, data) |
30 | }) |
31 | } |
32 | } |
33 | |
34 | var through = exports.through = |
35 | function (read, op) { |
36 | return function (end, cb) { |
37 | return read(end, function (end, data) { |
38 | if(!end) op && op(data) |
39 | cb(end, data) |
40 | }) |
41 | } |
42 | } |
43 | |
44 | var take = exports.take = |
45 | function (read, test) { |
46 | var ended = false |
47 | if('number' === typeof test) { |
48 | var n = test; test = function () { |
49 | return n-- > 0 |
50 | } |
51 | } |
52 | return function (end, cb) { |
53 | if(end) { |
54 | if(!ended) return ended = end, read(end, cb) |
55 | cb(ended) |
56 | } |
57 | return read(null, function (end, data) { |
58 | if(ended) return |
59 | if(end) return cb(ended = end) |
60 | //TODO, CHECK THAT END LOGIC IS CORRECT WITH TAKE!!! |
61 | if(!test(data)) { |
62 | ended = true |
63 | nextTick(function () { |
64 | read(true, function (){}) |
65 | }) |
66 | return cb(true) |
67 | } |
68 | return cb(null, data) |
69 | }) |
70 | } |
71 | } |
72 | |
73 | var nextTick = process.nextTick |
74 | |
75 | var highWaterMark = exports.highWaterMark = |
76 | function (read, highWaterMark) { |
77 | var buffer = [], waiting = [], ended, reading = false |
78 | highWaterMark = highWaterMark || 10 |
79 | |
80 | function readAhead () { |
81 | while(waiting.length && (buffer.length || ended)) |
82 | waiting.shift()(ended, ended ? null : buffer.shift()) |
83 | } |
84 | |
85 | function next () { |
86 | if(ended || reading || buffer.length >= highWaterMark) |
87 | return |
88 | reading = true |
89 | return read(ended, function (end, data) { |
90 | reading = false |
91 | ended = ended || end |
92 | if(data != null) buffer.push(data) |
93 | |
94 | next(); readAhead() |
95 | }) |
96 | } |
97 | |
98 | nextTick(next) |
99 | |
100 | return function (end, cb) { |
101 | ended = ended || end |
102 | waiting.push(cb) |
103 | |
104 | next(); readAhead() |
105 | } |
106 | } |
107 | |
108 |
Built with git-ssb-web