Files: 66cac3c056c184eb58ccbdd628cb74b1917bb4ab / throughs.js
7351 bytesRaw
1 | var u = require('pull-core') |
2 | var sources = require('./sources') |
3 | var sinks = require('./sinks') |
4 | |
5 | var prop = u.prop |
6 | var id = u.id |
7 | var tester = u.tester |
8 | |
9 | var map = exports.map = |
10 | function (read, map) { |
11 | map = prop(map) || id |
12 | return function (abort, cb) { |
13 | read(abort, function (end, data) { |
14 | try { |
15 | data = !end ? map(data) : null |
16 | } catch (err) { |
17 | return read(err, function () { |
18 | return cb(err) |
19 | }) |
20 | } |
21 | cb(end, data) |
22 | }) |
23 | } |
24 | } |
25 | |
26 | var asyncMap = exports.asyncMap = |
27 | function (read, map) { |
28 | if(!map) return read |
29 | return function (end, cb) { |
30 | if(end) return read(end, cb) //abort |
31 | read(null, function (end, data) { |
32 | if(end) return cb(end, data) |
33 | map(data, cb) |
34 | }) |
35 | } |
36 | } |
37 | |
38 | var paraMap = exports.paraMap = |
39 | function (read, map, width) { |
40 | if(!map) return read |
41 | var ended = false, queue = [], _cb |
42 | |
43 | function drain () { |
44 | if(!_cb) return |
45 | var cb = _cb |
46 | _cb = null |
47 | if(queue.length) |
48 | return cb(null, queue.shift()) |
49 | else if(ended && !n) |
50 | return cb(ended) |
51 | _cb = cb |
52 | } |
53 | |
54 | function pull () { |
55 | read(null, function (end, data) { |
56 | if(end) { |
57 | ended = end |
58 | return drain() |
59 | } |
60 | n++ |
61 | map(data, function (err, data) { |
62 | n-- |
63 | |
64 | queue.push(data) |
65 | drain() |
66 | }) |
67 | |
68 | if(n < width && !ended) |
69 | pull() |
70 | }) |
71 | } |
72 | |
73 | var n = 0 |
74 | return function (end, cb) { |
75 | if(end) return read(end, cb) //abort |
76 | //continue to read while there are less than 3 maps in flight |
77 | _cb = cb |
78 | if(queue.length || ended) |
79 | pull(), drain() |
80 | else pull() |
81 | } |
82 | return highWaterMark(asyncMap(read, map), width) |
83 | } |
84 | |
85 | var filter = exports.filter = |
86 | function (read, test) { |
87 | //regexp |
88 | test = tester(test) |
89 | return function next (end, cb) { |
90 | var sync, loop = true |
91 | while(loop) { |
92 | loop = false |
93 | sync = true |
94 | read(end, function (end, data) { |
95 | if(!end && !test(data)) |
96 | return sync ? loop = true : next(end, cb) |
97 | cb(end, data) |
98 | }) |
99 | sync = false |
100 | } |
101 | } |
102 | } |
103 | |
104 | var filterNot = exports.filterNot = |
105 | function (read, test) { |
106 | test = tester(test) |
107 | return filter(read, function (e) { |
108 | return !test(e) |
109 | }) |
110 | } |
111 | |
112 | var through = exports.through = |
113 | function (read, op, onEnd) { |
114 | var a = false |
115 | function once (abort) { |
116 | if(a || !onEnd) return |
117 | a = true |
118 | onEnd(abort === true ? null : abort) |
119 | } |
120 | |
121 | return function (end, cb) { |
122 | if(end) once(end) |
123 | return read(end, function (end, data) { |
124 | if(!end) op && op(data) |
125 | else once(end) |
126 | cb(end, data) |
127 | }) |
128 | } |
129 | } |
130 | |
131 | var take = exports.take = |
132 | function (read, test, opts) { |
133 | opts = opts || {} |
134 | var last = opts.last || false // whether the first item for which !test(item) should still pass |
135 | var ended = false |
136 | if('number' === typeof test) { |
137 | last = true |
138 | var n = test; test = function () { |
139 | return --n |
140 | } |
141 | } |
142 | |
143 | function terminate (cb) { |
144 | read(true, function (err) { |
145 | last = false; cb(err || true) |
146 | }) |
147 | } |
148 | |
149 | return function (end, cb) { |
150 | if(ended) last ? terminate(cb) : cb(ended) |
151 | else if(ended = end) read(ended, cb) |
152 | else |
153 | read(null, function (end, data) { |
154 | if(ended = ended || end) { |
155 | //last ? terminate(cb) : |
156 | cb(ended) |
157 | } |
158 | else if(!test(data)) { |
159 | ended = true |
160 | last ? cb(null, data) : terminate(cb) |
161 | } |
162 | else |
163 | cb(null, data) |
164 | }) |
165 | } |
166 | } |
167 | |
168 | var unique = exports.unique = function (read, field, invert) { |
169 | field = prop(field) || id |
170 | var seen = {} |
171 | return filter(read, function (data) { |
172 | var key = field(data) |
173 | if(seen[key]) return !!invert //false, by default |
174 | else seen[key] = true |
175 | return !invert //true by default |
176 | }) |
177 | } |
178 | |
179 | var nonUnique = exports.nonUnique = function (read, field) { |
180 | return unique(read, field, true) |
181 | } |
182 | |
183 | var group = exports.group = |
184 | function (read, size) { |
185 | var ended; size = size || 5 |
186 | var queue = [] |
187 | |
188 | return function (end, cb) { |
189 | //this means that the upstream is sending an error. |
190 | if(end) return read(ended = end, cb) |
191 | //this means that we read an end before. |
192 | if(ended) return cb(ended) |
193 | |
194 | read(null, function next(end, data) { |
195 | if(ended = ended || end) { |
196 | if(!queue.length) |
197 | return cb(ended) |
198 | |
199 | var _queue = queue; queue = [] |
200 | return cb(null, _queue) |
201 | } |
202 | queue.push(data) |
203 | if(queue.length < size) |
204 | return read(null, next) |
205 | |
206 | var _queue = queue; queue = [] |
207 | cb(null, _queue) |
208 | }) |
209 | } |
210 | } |
211 | |
212 | var flatten = exports.flatten = function (read) { |
213 | var _read |
214 | return function (abort, cb) { |
215 | if (abort) return read(abort, cb) |
216 | if(_read) nextChunk() |
217 | else nextStream() |
218 | |
219 | function nextChunk () { |
220 | _read(null, function (end, data) { |
221 | if(end) nextStream() |
222 | else cb(null, data) |
223 | }) |
224 | } |
225 | function nextStream () { |
226 | read(null, function (end, stream) { |
227 | if(end) |
228 | return cb(end) |
229 | if(Array.isArray(stream) || stream && 'object' === typeof stream) |
230 | stream = sources.values(stream) |
231 | else if('function' != typeof stream) |
232 | throw new Error('expected stream of streams') |
233 | _read = stream |
234 | nextChunk() |
235 | }) |
236 | } |
237 | } |
238 | } |
239 | |
240 | var prepend = |
241 | exports.prepend = |
242 | function (read, head) { |
243 | |
244 | return function (abort, cb) { |
245 | if(head !== null) { |
246 | if(abort) |
247 | return read(abort, cb) |
248 | var _head = head |
249 | head = null |
250 | cb(null, _head) |
251 | } else { |
252 | read(abort, cb) |
253 | } |
254 | } |
255 | |
256 | } |
257 | |
258 | //var drainIf = exports.drainIf = function (op, done) { |
259 | // sinks.drain( |
260 | //} |
261 | |
262 | var _reduce = exports._reduce = function (read, reduce, initial) { |
263 | return function (close, cb) { |
264 | if(close) return read(close, cb) |
265 | if(ended) return cb(ended) |
266 | |
267 | sinks.drain(function (item) { |
268 | initial = reduce(initial, item) |
269 | }, function (err, data) { |
270 | ended = err || true |
271 | if(!err) cb(null, initial) |
272 | else cb(ended) |
273 | }) |
274 | (read) |
275 | } |
276 | } |
277 | |
278 | var nextTick = process.nextTick |
279 | |
280 | var highWaterMark = exports.highWaterMark = |
281 | function (read, highWaterMark) { |
282 | var buffer = [], waiting = [], ended, ending, reading = false |
283 | highWaterMark = highWaterMark || 10 |
284 | |
285 | function readAhead () { |
286 | while(waiting.length && (buffer.length || ended)) |
287 | waiting.shift()(ended, ended ? null : buffer.shift()) |
288 | |
289 | if (!buffer.length && ending) ended = ending; |
290 | } |
291 | |
292 | function next () { |
293 | if(ended || ending || reading || buffer.length >= highWaterMark) |
294 | return |
295 | reading = true |
296 | return read(ended || ending, function (end, data) { |
297 | reading = false |
298 | ending = ending || end |
299 | if(data != null) buffer.push(data) |
300 | |
301 | next(); readAhead() |
302 | }) |
303 | } |
304 | |
305 | process.nextTick(next) |
306 | |
307 | return function (end, cb) { |
308 | ended = ended || end |
309 | waiting.push(cb) |
310 | |
311 | next(); readAhead() |
312 | } |
313 | } |
314 | |
315 | var flatMap = exports.flatMap = |
316 | function (read, mapper) { |
317 | mapper = mapper || id |
318 | var queue = [], ended |
319 | |
320 | return function (abort, cb) { |
321 | if(queue.length) return cb(null, queue.shift()) |
322 | else if(ended) return cb(ended) |
323 | |
324 | read(abort, function next (end, data) { |
325 | if(end) ended = end |
326 | else { |
327 | var add = mapper(data) |
328 | while(add && add.length) |
329 | queue.push(add.shift()) |
330 | } |
331 | |
332 | if(queue.length) cb(null, queue.shift()) |
333 | else if(ended) cb(ended) |
334 | else read(null, next) |
335 | }) |
336 | } |
337 | } |
338 | |
339 |
Built with git-ssb-web