Files: e7e0d53aa4421ee02973e1349150612fdd489792 / throughs.js
7433 bytesRaw
1 | var u = require('pull-core') |
2 | var sources = require('./sources') |
3 | var sinks = require('./sinks') |
4 | |
5 | var prop = u.prop |
6 | var id = u.id |
7 | var tester = u.tester |
8 | |
9 | var map = exports.map = |
10 | function (read, map) { |
11 | map = prop(map) || id |
12 | return function (abort, cb) { |
13 | read(abort, function (end, data) { |
14 | try { |
15 | data = !end ? map(data) : null |
16 | } catch (err) { |
17 | return read(err, function () { |
18 | return cb(err) |
19 | }) |
20 | } |
21 | cb(end, data) |
22 | }) |
23 | } |
24 | } |
25 | |
26 | var asyncMap = exports.asyncMap = |
27 | function (read, map) { |
28 | if(!map) return read |
29 | return function (end, cb) { |
30 | if(end) return read(end, cb) //abort |
31 | read(null, function (end, data) { |
32 | if(end) return cb(end, data) |
33 | map(data, cb) |
34 | }) |
35 | } |
36 | } |
37 | |
38 | var paraMap = exports.paraMap = |
39 | function (read, map, width) { |
40 | if(!map) return read |
41 | var ended = false, queue = [], _cb |
42 | |
43 | function drain () { |
44 | if(!_cb) return |
45 | var cb = _cb |
46 | _cb = null |
47 | if(queue.length) |
48 | return cb(null, queue.shift()) |
49 | else if(ended && !n) |
50 | return cb(ended) |
51 | _cb = cb |
52 | } |
53 | |
54 | function pull () { |
55 | read(null, function (end, data) { |
56 | if(end) { |
57 | ended = end |
58 | return drain() |
59 | } |
60 | n++ |
61 | map(data, function (err, data) { |
62 | n-- |
63 | |
64 | queue.push(data) |
65 | drain() |
66 | }) |
67 | |
68 | if(n < width && !ended) |
69 | pull() |
70 | }) |
71 | } |
72 | |
73 | var n = 0 |
74 | return function (end, cb) { |
75 | if(end) return read(end, cb) //abort |
76 | //continue to read while there are less than 3 maps in flight |
77 | _cb = cb |
78 | if(queue.length || ended) |
79 | pull(), drain() |
80 | else pull() |
81 | } |
82 | return highWaterMark(asyncMap(read, map), width) |
83 | } |
84 | |
85 | var filter = exports.filter = |
86 | function (read, test) { |
87 | //regexp |
88 | test = tester(test) |
89 | return function next (end, cb) { |
90 | var sync, loop = true |
91 | while(loop) { |
92 | loop = false |
93 | sync = true |
94 | read(end, function (end, data) { |
95 | if(!end && !test(data)) |
96 | return sync ? loop = true : next(end, cb) |
97 | cb(end, data) |
98 | }) |
99 | sync = false |
100 | } |
101 | } |
102 | } |
103 | |
104 | var filterNot = exports.filterNot = |
105 | function (read, test) { |
106 | test = tester(test) |
107 | return filter(read, function (e) { |
108 | return !test(e) |
109 | }) |
110 | } |
111 | |
112 | var through = exports.through = |
113 | function (read, op, onEnd) { |
114 | var a = false |
115 | function once (abort) { |
116 | if(a || !onEnd) return |
117 | a = true |
118 | onEnd(abort === true ? null : abort) |
119 | } |
120 | |
121 | return function (end, cb) { |
122 | if(end) once(end) |
123 | return read(end, function (end, data) { |
124 | if(!end) op && op(data) |
125 | else once(end) |
126 | cb(end, data) |
127 | }) |
128 | } |
129 | } |
130 | |
131 | var take = exports.take = |
132 | function (read, test, opts) { |
133 | opts = opts || {} |
134 | var last = opts.last || false // whether the first item for which !test(item) should still pass |
135 | var ended = false |
136 | if('number' === typeof test) { |
137 | last = true |
138 | var n = test; test = function () { |
139 | return --n |
140 | } |
141 | } |
142 | |
143 | return function (end, cb) { |
144 | if(ended) return cb(ended) |
145 | if(ended = end) return read(ended, cb) |
146 | |
147 | read(null, function (end, data) { |
148 | if(ended = ended || end) return cb(ended) |
149 | if(!test(data)) { |
150 | ended = true |
151 | read(true, function () { |
152 | last ? cb(end, data) : cb(true) |
153 | }) |
154 | } |
155 | else |
156 | cb(null, data) |
157 | }) |
158 | } |
159 | } |
160 | |
161 | var unique = exports.unique = function (read, field, invert) { |
162 | field = prop(field) || id |
163 | var seen = {} |
164 | return filter(read, function (data) { |
165 | var key = field(data) |
166 | if(seen[key]) return !!invert //false, by default |
167 | else seen[key] = true |
168 | return !invert //true by default |
169 | }) |
170 | } |
171 | |
172 | var nonUnique = exports.nonUnique = function (read, field) { |
173 | return unique(read, field, true) |
174 | } |
175 | |
176 | var group = exports.group = |
177 | function (read, size) { |
178 | var ended; size = size || 5 |
179 | var queue = [] |
180 | |
181 | return function (end, cb) { |
182 | //this means that the upstream is sending an error. |
183 | if(end) return read(ended = end, cb) |
184 | //this means that we read an end before. |
185 | if(ended) return cb(ended) |
186 | |
187 | read(null, function next(end, data) { |
188 | if(ended = ended || end) { |
189 | if(!queue.length) |
190 | return cb(ended) |
191 | |
192 | var _queue = queue; queue = [] |
193 | return cb(null, _queue) |
194 | } |
195 | queue.push(data) |
196 | if(queue.length < size) |
197 | return read(null, next) |
198 | |
199 | var _queue = queue; queue = [] |
200 | cb(null, _queue) |
201 | }) |
202 | } |
203 | } |
204 | |
205 | var flatten = exports.flatten = function (read) { |
206 | var _read |
207 | return function (abort, cb) { |
208 | if (abort) { |
209 | _read ? _read(abort, function(err) { |
210 | read(err || abort, cb) |
211 | }) : read(abort, cb) |
212 | } |
213 | else if(_read) nextChunk() |
214 | else nextStream() |
215 | |
216 | function nextChunk () { |
217 | _read(null, function (err, data) { |
218 | if (err === true) nextStream() |
219 | else if (err) { |
220 | read(true, function(abortErr) { |
221 | // TODO: what do we do with the abortErr? |
222 | cb(err) |
223 | }) |
224 | } |
225 | else cb(null, data) |
226 | }) |
227 | } |
228 | function nextStream () { |
229 | read(null, function (end, stream) { |
230 | if(end) |
231 | return cb(end) |
232 | if(Array.isArray(stream) || stream && 'object' === typeof stream) |
233 | stream = sources.values(stream) |
234 | else if('function' != typeof stream) |
235 | throw new Error('expected stream of streams') |
236 | _read = stream |
237 | nextChunk() |
238 | }) |
239 | } |
240 | } |
241 | } |
242 | |
243 | var prepend = |
244 | exports.prepend = |
245 | function (read, head) { |
246 | |
247 | return function (abort, cb) { |
248 | if(head !== null) { |
249 | if(abort) |
250 | return read(abort, cb) |
251 | var _head = head |
252 | head = null |
253 | cb(null, _head) |
254 | } else { |
255 | read(abort, cb) |
256 | } |
257 | } |
258 | |
259 | } |
260 | |
261 | //var drainIf = exports.drainIf = function (op, done) { |
262 | // sinks.drain( |
263 | //} |
264 | |
265 | var _reduce = exports._reduce = function (read, reduce, initial) { |
266 | return function (close, cb) { |
267 | if(close) return read(close, cb) |
268 | if(ended) return cb(ended) |
269 | |
270 | sinks.drain(function (item) { |
271 | initial = reduce(initial, item) |
272 | }, function (err, data) { |
273 | ended = err || true |
274 | if(!err) cb(null, initial) |
275 | else cb(ended) |
276 | }) |
277 | (read) |
278 | } |
279 | } |
280 | |
281 | var nextTick = process.nextTick |
282 | |
283 | var highWaterMark = exports.highWaterMark = |
284 | function (read, highWaterMark) { |
285 | var buffer = [], waiting = [], ended, ending, reading = false |
286 | highWaterMark = highWaterMark || 10 |
287 | |
288 | function readAhead () { |
289 | while(waiting.length && (buffer.length || ended)) |
290 | waiting.shift()(ended, ended ? null : buffer.shift()) |
291 | |
292 | if (!buffer.length && ending) ended = ending; |
293 | } |
294 | |
295 | function next () { |
296 | if(ended || ending || reading || buffer.length >= highWaterMark) |
297 | return |
298 | reading = true |
299 | return read(ended || ending, function (end, data) { |
300 | reading = false |
301 | ending = ending || end |
302 | if(data != null) buffer.push(data) |
303 | |
304 | next(); readAhead() |
305 | }) |
306 | } |
307 | |
308 | process.nextTick(next) |
309 | |
310 | return function (end, cb) { |
311 | ended = ended || end |
312 | waiting.push(cb) |
313 | |
314 | next(); readAhead() |
315 | } |
316 | } |
317 | |
318 | var flatMap = exports.flatMap = |
319 | function (read, mapper) { |
320 | mapper = mapper || id |
321 | var queue = [], ended |
322 | |
323 | return function (abort, cb) { |
324 | if(queue.length) return cb(null, queue.shift()) |
325 | else if(ended) return cb(ended) |
326 | |
327 | read(abort, function next (end, data) { |
328 | if(end) ended = end |
329 | else { |
330 | var add = mapper(data) |
331 | while(add && add.length) |
332 | queue.push(add.shift()) |
333 | } |
334 | |
335 | if(queue.length) cb(null, queue.shift()) |
336 | else if(ended) cb(ended) |
337 | else read(null, next) |
338 | }) |
339 | } |
340 | } |
341 | |
342 |
Built with git-ssb-web