Files: f261ad64678ce2c22cec604efe2e109c52c83e12 / throughs.js
7177 bytesRaw
1 | var u = require('pull-core') |
2 | var sources = require('./sources') |
3 | var sinks = require('./sinks') |
4 | |
5 | var prop = u.prop |
6 | var id = u.id |
7 | var tester = u.tester |
8 | |
9 | var map = exports.map = |
10 | function (read, map) { |
11 | map = prop(map) || id |
12 | return function (abort, cb) { |
13 | read(abort, function (end, data) { |
14 | try { |
15 | data = !end ? map(data) : null |
16 | } catch (err) { |
17 | return read(err, function () { |
18 | return cb(err) |
19 | }) |
20 | } |
21 | cb(end, data) |
22 | }) |
23 | } |
24 | } |
25 | |
26 | var asyncMap = exports.asyncMap = |
27 | function (read, map) { |
28 | if(!map) return read |
29 | return function (end, cb) { |
30 | if(end) return read(end, cb) //abort |
31 | read(null, function (end, data) { |
32 | if(end) return cb(end, data) |
33 | map(data, cb) |
34 | }) |
35 | } |
36 | } |
37 | |
38 | var paraMap = exports.paraMap = |
39 | function (read, map, width) { |
40 | if(!map) return read |
41 | var ended = false, queue = [], _cb |
42 | |
43 | function drain () { |
44 | if(!_cb) return |
45 | var cb = _cb |
46 | _cb = null |
47 | if(queue.length) |
48 | return cb(null, queue.shift()) |
49 | else if(ended && !n) |
50 | return cb(ended) |
51 | _cb = cb |
52 | } |
53 | |
54 | function pull () { |
55 | read(null, function (end, data) { |
56 | if(end) { |
57 | ended = end |
58 | return drain() |
59 | } |
60 | n++ |
61 | map(data, function (err, data) { |
62 | n-- |
63 | |
64 | queue.push(data) |
65 | drain() |
66 | }) |
67 | |
68 | if(n < width && !ended) |
69 | pull() |
70 | }) |
71 | } |
72 | |
73 | var n = 0 |
74 | return function (end, cb) { |
75 | if(end) return read(end, cb) //abort |
76 | //continue to read while there are less than 3 maps in flight |
77 | _cb = cb |
78 | if(queue.length || ended) |
79 | pull(), drain() |
80 | else pull() |
81 | } |
82 | return highWaterMark(asyncMap(read, map), width) |
83 | } |
84 | |
85 | var filter = exports.filter = |
86 | function (read, test) { |
87 | //regexp |
88 | test = tester(test) |
89 | return function next (end, cb) { |
90 | var sync, loop = true |
91 | while(loop) { |
92 | loop = false |
93 | sync = true |
94 | read(end, function (end, data) { |
95 | if(!end && !test(data)) |
96 | return sync ? loop = true : next(end, cb) |
97 | cb(end, data) |
98 | }) |
99 | sync = false |
100 | } |
101 | } |
102 | } |
103 | |
104 | var filterNot = exports.filterNot = |
105 | function (read, test) { |
106 | test = tester(test) |
107 | return filter(read, function (e) { |
108 | return !test(e) |
109 | }) |
110 | } |
111 | |
112 | var through = exports.through = |
113 | function (read, op, onEnd) { |
114 | var a = false |
115 | function once (abort) { |
116 | if(a || !onEnd) return |
117 | a = true |
118 | onEnd(abort === true ? null : abort) |
119 | } |
120 | |
121 | return function (end, cb) { |
122 | if(end) once(end) |
123 | return read(end, function (end, data) { |
124 | if(!end) op && op(data) |
125 | else once(end) |
126 | cb(end, data) |
127 | }) |
128 | } |
129 | } |
130 | |
131 | var take = exports.take = |
132 | function (read, test, opts) { |
133 | opts = opts || {} |
134 | var last = opts.last || false // whether the first item for which !test(item) should still pass |
135 | var ended = false |
136 | if('number' === typeof test) { |
137 | last = true |
138 | var n = test; test = function () { |
139 | return --n |
140 | } |
141 | } |
142 | |
143 | return function (end, cb) { |
144 | if(ended) return cb(ended) |
145 | if(ended = end) return read(ended, cb) |
146 | |
147 | read(null, function (end, data) { |
148 | if(ended = ended || end) return cb(ended) |
149 | if(!test(data)) { |
150 | ended = true |
151 | read(true, function () { |
152 | last ? cb(end, data) : cb(true) |
153 | }) |
154 | } |
155 | else |
156 | cb(null, data) |
157 | }) |
158 | } |
159 | } |
160 | |
161 | var unique = exports.unique = function (read, field, invert) { |
162 | field = prop(field) || id |
163 | var seen = {} |
164 | return filter(read, function (data) { |
165 | var key = field(data) |
166 | if(seen[key]) return !!invert //false, by default |
167 | else seen[key] = true |
168 | return !invert //true by default |
169 | }) |
170 | } |
171 | |
172 | var nonUnique = exports.nonUnique = function (read, field) { |
173 | return unique(read, field, true) |
174 | } |
175 | |
176 | var group = exports.group = |
177 | function (read, size) { |
178 | var ended; size = size || 5 |
179 | var queue = [] |
180 | |
181 | return function (end, cb) { |
182 | //this means that the upstream is sending an error. |
183 | if(end) return read(ended = end, cb) |
184 | //this means that we read an end before. |
185 | if(ended) return cb(ended) |
186 | |
187 | read(null, function next(end, data) { |
188 | if(ended = ended || end) { |
189 | if(!queue.length) |
190 | return cb(ended) |
191 | |
192 | var _queue = queue; queue = [] |
193 | return cb(null, _queue) |
194 | } |
195 | queue.push(data) |
196 | if(queue.length < size) |
197 | return read(null, next) |
198 | |
199 | var _queue = queue; queue = [] |
200 | cb(null, _queue) |
201 | }) |
202 | } |
203 | } |
204 | |
205 | var flatten = exports.flatten = function (read) { |
206 | var _read |
207 | return function (abort, cb) { |
208 | if (abort) return read(abort, cb) |
209 | if(_read) nextChunk() |
210 | else nextStream() |
211 | |
212 | function nextChunk () { |
213 | _read(null, function (end, data) { |
214 | if(end) nextStream() |
215 | else cb(null, data) |
216 | }) |
217 | } |
218 | function nextStream () { |
219 | read(null, function (end, stream) { |
220 | if(end) |
221 | return cb(end) |
222 | if(Array.isArray(stream) || stream && 'object' === typeof stream) |
223 | stream = sources.values(stream) |
224 | else if('function' != typeof stream) |
225 | throw new Error('expected stream of streams') |
226 | _read = stream |
227 | nextChunk() |
228 | }) |
229 | } |
230 | } |
231 | } |
232 | |
233 | var prepend = |
234 | exports.prepend = |
235 | function (read, head) { |
236 | |
237 | return function (abort, cb) { |
238 | if(head !== null) { |
239 | if(abort) |
240 | return read(abort, cb) |
241 | var _head = head |
242 | head = null |
243 | cb(null, _head) |
244 | } else { |
245 | read(abort, cb) |
246 | } |
247 | } |
248 | |
249 | } |
250 | |
251 | //var drainIf = exports.drainIf = function (op, done) { |
252 | // sinks.drain( |
253 | //} |
254 | |
255 | var _reduce = exports._reduce = function (read, reduce, initial) { |
256 | return function (close, cb) { |
257 | if(close) return read(close, cb) |
258 | if(ended) return cb(ended) |
259 | |
260 | sinks.drain(function (item) { |
261 | initial = reduce(initial, item) |
262 | }, function (err, data) { |
263 | ended = err || true |
264 | if(!err) cb(null, initial) |
265 | else cb(ended) |
266 | }) |
267 | (read) |
268 | } |
269 | } |
270 | |
271 | var nextTick = process.nextTick |
272 | |
273 | var highWaterMark = exports.highWaterMark = |
274 | function (read, highWaterMark) { |
275 | var buffer = [], waiting = [], ended, ending, reading = false |
276 | highWaterMark = highWaterMark || 10 |
277 | |
278 | function readAhead () { |
279 | while(waiting.length && (buffer.length || ended)) |
280 | waiting.shift()(ended, ended ? null : buffer.shift()) |
281 | |
282 | if (!buffer.length && ending) ended = ending; |
283 | } |
284 | |
285 | function next () { |
286 | if(ended || ending || reading || buffer.length >= highWaterMark) |
287 | return |
288 | reading = true |
289 | return read(ended || ending, function (end, data) { |
290 | reading = false |
291 | ending = ending || end |
292 | if(data != null) buffer.push(data) |
293 | |
294 | next(); readAhead() |
295 | }) |
296 | } |
297 | |
298 | process.nextTick(next) |
299 | |
300 | return function (end, cb) { |
301 | ended = ended || end |
302 | waiting.push(cb) |
303 | |
304 | next(); readAhead() |
305 | } |
306 | } |
307 | |
308 | var flatMap = exports.flatMap = |
309 | function (read, mapper) { |
310 | mapper = mapper || id |
311 | var queue = [], ended |
312 | |
313 | return function (abort, cb) { |
314 | if(queue.length) return cb(null, queue.shift()) |
315 | else if(ended) return cb(ended) |
316 | |
317 | read(abort, function next (end, data) { |
318 | if(end) ended = end |
319 | else { |
320 | var add = mapper(data) |
321 | while(add && add.length) |
322 | queue.push(add.shift()) |
323 | } |
324 | |
325 | if(queue.length) cb(null, queue.shift()) |
326 | else if(ended) cb(ended) |
327 | else read(null, next) |
328 | }) |
329 | } |
330 | } |
331 | |
332 |
Built with git-ssb-web