Files: 545734ec6384c1b2b71f539ea4acfe56af123b1e / throughs.js
4192 bytesRaw
1 | function prop (map) { |
2 | if('string' == typeof map) { |
3 | var key = map |
4 | return function (data) { return data[key] } |
5 | } |
6 | return map |
7 | } |
8 | |
9 | function id (item) { |
10 | return item |
11 | } |
12 | |
13 | var k = 0 |
14 | var map = exports.map = |
15 | function (read, map) { |
16 | var _k = k++ |
17 | map = prop(map) || id |
18 | return function (end, cb) { |
19 | read(end, function (end, data) { |
20 | var data = !end ? map(data) : null |
21 | cb(end, data) |
22 | }) |
23 | } |
24 | } |
25 | |
26 | var asyncMap = exports.asyncMap = |
27 | function (read, map) { |
28 | if(!map) return read |
29 | return function (end, cb) { |
30 | if(end) return read(end, cb) //abort |
31 | read(null, function (end, data) { |
32 | if(end) return cb(end, data) |
33 | map(data, cb) |
34 | }) |
35 | } |
36 | } |
37 | |
38 | var filter = exports.filter = |
39 | function (read, test) { |
40 | //regexp |
41 | if('object' === typeof test |
42 | && 'function' === typeof test.test) |
43 | test = test.test.bind(test) |
44 | test = prop(test) || id |
45 | return function next (end, cb) { |
46 | read(end, function (end, data) { |
47 | if(!end && !test(data)) |
48 | return next(end, cb) |
49 | cb(end, data) |
50 | }) |
51 | } |
52 | } |
53 | |
54 | var through = exports.through = |
55 | function (read, op, onEnd) { |
56 | var a = false |
57 | function once (abort) { |
58 | if(a || !onEnd) return |
59 | a = true |
60 | onEnd(abort === true ? null : abort) |
61 | } |
62 | |
63 | return function (end, cb) { |
64 | return read(end, function (end, data) { |
65 | if(!end) op && op(data) |
66 | else once(end) |
67 | cb(end, data) |
68 | }) |
69 | } |
70 | } |
71 | |
72 | var take = exports.take = |
73 | function (read, test) { |
74 | var ended = false |
75 | if('number' === typeof test) { |
76 | var n = test; test = function () { |
77 | return n --> 0 |
78 | } |
79 | } |
80 | return function (end, cb) { |
81 | if(ended = ended || end) |
82 | return read(ended, cb) |
83 | |
84 | read(null, function (end, data) { |
85 | if(ended = ended || end) return cb(ended) |
86 | //TODO, CHECK THAT END LOGIC IS CORRECT WITH TAKE!!! |
87 | if(!test(data)) { |
88 | ended = true |
89 | read(true, cb) |
90 | } |
91 | else |
92 | cb(null, data) |
93 | }) |
94 | } |
95 | } |
96 | |
97 | var unique = exports.unique = function (read, field, invert) { |
98 | field = prop(field) || id |
99 | var seen = {} |
100 | return filter(read, function (data) { |
101 | var key = field(data) |
102 | if(seen[key]) return !!invert //false, by default |
103 | else seen[key] = true |
104 | return !invert //true by default |
105 | }) |
106 | } |
107 | |
108 | var nonUnique = exports.nonUnique = function (read, field) { |
109 | return unique(read, field, true) |
110 | } |
111 | |
112 | var group = exports.group = |
113 | function (read, size) { |
114 | var ended; size = size || 5 |
115 | var queue = [] |
116 | |
117 | return function (end, cb) { |
118 | //this means that the upstream is sending an error. |
119 | if(end) return read(ended = end, cb) |
120 | //this means that we read an end before. |
121 | if(ended) return cb(ended) |
122 | |
123 | read(null, function next(end, data) { |
124 | if(ended = ended || end) { |
125 | if(!queue.length) |
126 | return cb(ended) |
127 | |
128 | var _queue = queue; queue = [] |
129 | return cb(null, _queue) |
130 | } |
131 | queue.push(data) |
132 | if(queue.length < size) |
133 | return read(null, next) |
134 | |
135 | var _queue = queue; queue = [] |
136 | cb(null, _queue) |
137 | }) |
138 | } |
139 | } |
140 | |
141 | var flatten = exports.flatten = function (read) { |
142 | var chunk |
143 | return function (end, cb) { |
144 | //this means that the upstream is sending an error. |
145 | if(end) return read(ended = end, cb) |
146 | |
147 | if(chunk && chunk.length) |
148 | return cb(null, chunk.shift()) |
149 | |
150 | read(null, function (err, data) { |
151 | if(err) return cb(err) |
152 | chunk = data |
153 | |
154 | if(chunk && chunk.length) |
155 | return cb(null, chunk.shift()) |
156 | }) |
157 | } |
158 | } |
159 | |
160 | var nextTick = process.nextTick |
161 | |
162 | var highWaterMark = exports.highWaterMark = |
163 | function (read, highWaterMark) { |
164 | var buffer = [], waiting = [], ended, reading = false |
165 | highWaterMark = highWaterMark || 10 |
166 | |
167 | function readAhead () { |
168 | while(waiting.length && (buffer.length || ended)) |
169 | waiting.shift()(ended, ended ? null : buffer.shift()) |
170 | } |
171 | |
172 | function next () { |
173 | if(ended || reading || buffer.length >= highWaterMark) |
174 | return |
175 | reading = true |
176 | return read(ended, function (end, data) { |
177 | reading = false |
178 | ended = ended || end |
179 | if(data != null) buffer.push(data) |
180 | |
181 | next(); readAhead() |
182 | }) |
183 | } |
184 | |
185 | nextTick(next) |
186 | |
187 | return function (end, cb) { |
188 | ended = ended || end |
189 | waiting.push(cb) |
190 | |
191 | next(); readAhead() |
192 | } |
193 | } |
194 | |
195 | |
196 | |
197 |
Built with git-ssb-web