Files: c4caefd7c6cd6984ee0223c391cd4ce43118cf29 / throughs.js
2368 bytesRaw
1 | var k = 0 |
2 | var map = exports.map = |
3 | function (read, map) { |
4 | var _k = k++ |
5 | if('string' == typeof map) |
6 | function (data) { return data[key] } |
7 | map = map || function (e) {return e} |
8 | return function (end, cb) { |
9 | read(end, function (end, data) { |
10 | var data = !end ? map(data) : null |
11 | cb(end, data) |
12 | }) |
13 | } |
14 | } |
15 | |
16 | var filter = exports.filter = |
17 | function (read, test) { |
18 | //regexp |
19 | if('object' === typeof test |
20 | && 'function' === typeof test.test) |
21 | test = test.test.bind(test) |
22 | test = test || function (data) {return !!data} |
23 | return function next (end, cb) { |
24 | read(end, function (end, data) { |
25 | if(!end && !test(data)) |
26 | return next(end, cb) |
27 | cb(end, data) |
28 | }) |
29 | } |
30 | } |
31 | |
32 | var through = exports.through = |
33 | function (read, op) { |
34 | return function (end, cb) { |
35 | return read(end, function (end, data) { |
36 | if(!end) op && op(data) |
37 | cb(end, data) |
38 | }) |
39 | } |
40 | } |
41 | |
42 | var take = exports.take = |
43 | function (read, test) { |
44 | var ended = false |
45 | if('number' === typeof test) { |
46 | var n = test; test = function () { |
47 | return n-- > 0 |
48 | } |
49 | } |
50 | return function (end, cb) { |
51 | if(end) { |
52 | if(!ended) return ended = end, readable(end, cb) |
53 | cb(ended) |
54 | } |
55 | return read(null, function (end, data) { |
56 | if(ended) return |
57 | console.log('take?', end, !test(data)) |
58 | if(end) return cb(ended = end) |
59 | //TODO, CHECK THAT END LOGIC IS CORRECT WITH TAKE!!! |
60 | if(!test(data)) { |
61 | ended = true |
62 | nextTick(function () { |
63 | read(true, function (){}) |
64 | }) |
65 | return cb(true) |
66 | } |
67 | return cb(null, data) |
68 | }) |
69 | } |
70 | } |
71 | |
72 | var nextTick = process.nextTick |
73 | |
74 | var highWaterMark = exports.highWaterMark = |
75 | function (read, highWaterMark) { |
76 | var buffer = [], waiting = [], ended, reading = false |
77 | highWaterMark = highWaterMark || 10 |
78 | |
79 | function readAhead () { |
80 | while(waiting.length && (buffer.length || ended)) |
81 | waiting.shift()(ended, ended ? null : buffer.shift()) |
82 | } |
83 | |
84 | function next () { |
85 | if(ended || reading || buffer.length >= highWaterMark) |
86 | return |
87 | reading = true |
88 | return read(ended, function (end, data) { |
89 | reading = false |
90 | ended = ended || end |
91 | if(data != null) buffer.push(data) |
92 | |
93 | next(); readAhead() |
94 | }) |
95 | } |
96 | |
97 | nextTick(next) |
98 | |
99 | return function (end, cb) { |
100 | ended = ended || end |
101 | waiting.push(cb) |
102 | |
103 | next(); readAhead() |
104 | } |
105 | } |
106 | |
107 |
Built with git-ssb-web