Files: 578b08f48f151f6e93c4c7c0f1eccd59dea1cbd4 / throughs.js
6275 bytesRaw
1 | var u = require('pull-core') |
2 | var sources = require('./sources') |
3 | var sinks = require('./sinks') |
4 | |
5 | var prop = u.prop |
6 | var id = u.id |
7 | var tester = u.tester |
8 | |
9 | var map = exports.map = |
10 | function (read, map) { |
11 | map = prop(map) || id |
12 | return function (abort, cb) { |
13 | read(abort, function (end, data) { |
14 | try { |
15 | data = !end ? map(data) : null |
16 | } catch (err) { |
17 | return read(err, function () { |
18 | return cb(err) |
19 | }) |
20 | } |
21 | cb(end, data) |
22 | }) |
23 | } |
24 | } |
25 | |
26 | var asyncMap = exports.asyncMap = |
27 | function (read, map) { |
28 | if(!map) return read |
29 | return function (end, cb) { |
30 | if(end) return read(end, cb) //abort |
31 | read(null, function (end, data) { |
32 | if(end) return cb(end, data) |
33 | map(data, cb) |
34 | }) |
35 | } |
36 | } |
37 | |
38 | var paraMap = exports.paraMap = |
39 | function (read, map, width) { |
40 | if(!map) return read |
41 | var ended = false, queue = [], _cb |
42 | |
43 | function drain () { |
44 | if(!_cb) return |
45 | var cb = _cb |
46 | _cb = null |
47 | if(queue.length) |
48 | return cb(null, queue.shift()) |
49 | else if(ended && !n) |
50 | return cb(ended) |
51 | _cb = cb |
52 | } |
53 | |
54 | function pull () { |
55 | read(null, function (end, data) { |
56 | if(end) { |
57 | ended = end |
58 | return drain() |
59 | } |
60 | n++ |
61 | map(data, function (err, data) { |
62 | n-- |
63 | |
64 | queue.push(data) |
65 | drain() |
66 | }) |
67 | |
68 | if(n < width && !ended) |
69 | pull() |
70 | }) |
71 | } |
72 | |
73 | var n = 0 |
74 | return function (end, cb) { |
75 | if(end) return read(end, cb) //abort |
76 | //continue to read while there are less than 3 maps in flight |
77 | _cb = cb |
78 | if(queue.length || ended) |
79 | pull(), drain() |
80 | else pull() |
81 | } |
82 | return highWaterMark(asyncMap(read, map), width) |
83 | } |
84 | |
85 | var filter = exports.filter = |
86 | function (read, test) { |
87 | //regexp |
88 | test = tester(test) |
89 | return function next (end, cb) { |
90 | read(end, function (end, data) { |
91 | if(!end && !test(data)) |
92 | return next(end, cb) |
93 | cb(end, data) |
94 | }) |
95 | } |
96 | } |
97 | |
98 | var filterNot = exports.filterNot = |
99 | function (read, test) { |
100 | test = tester(test) |
101 | return filter(read, function (e) { |
102 | return !test(e) |
103 | }) |
104 | } |
105 | |
106 | var through = exports.through = |
107 | function (read, op, onEnd) { |
108 | var a = false |
109 | function once (abort) { |
110 | if(a || !onEnd) return |
111 | a = true |
112 | onEnd(abort === true ? null : abort) |
113 | } |
114 | |
115 | return function (end, cb) { |
116 | if(end) once(end) |
117 | return read(end, function (end, data) { |
118 | if(!end) op && op(data) |
119 | else once(end) |
120 | cb(end, data) |
121 | }) |
122 | } |
123 | } |
124 | |
125 | var take = exports.take = |
126 | function (read, test) { |
127 | var ended = false |
128 | if('number' === typeof test) { |
129 | var n = test; test = function () { |
130 | return n -- |
131 | } |
132 | } |
133 | |
134 | return function (end, cb) { |
135 | if(ended) return cb(ended) |
136 | if(ended = end) return read(ended, cb) |
137 | |
138 | read(null, function (end, data) { |
139 | if(ended = ended || end) return cb(ended) |
140 | if(!test(data)) { |
141 | ended = true |
142 | read(true, function (end, data) { |
143 | cb(ended, data) |
144 | }) |
145 | } |
146 | else |
147 | cb(null, data) |
148 | }) |
149 | } |
150 | } |
151 | |
152 | var unique = exports.unique = function (read, field, invert) { |
153 | field = prop(field) || id |
154 | var seen = {} |
155 | return filter(read, function (data) { |
156 | var key = field(data) |
157 | if(seen[key]) return !!invert //false, by default |
158 | else seen[key] = true |
159 | return !invert //true by default |
160 | }) |
161 | } |
162 | |
163 | var nonUnique = exports.nonUnique = function (read, field) { |
164 | return unique(read, field, true) |
165 | } |
166 | |
167 | var group = exports.group = |
168 | function (read, size) { |
169 | var ended; size = size || 5 |
170 | var queue = [] |
171 | |
172 | return function (end, cb) { |
173 | //this means that the upstream is sending an error. |
174 | if(end) return read(ended = end, cb) |
175 | //this means that we read an end before. |
176 | if(ended) return cb(ended) |
177 | |
178 | read(null, function next(end, data) { |
179 | if(ended = ended || end) { |
180 | if(!queue.length) |
181 | return cb(ended) |
182 | |
183 | var _queue = queue; queue = [] |
184 | return cb(null, _queue) |
185 | } |
186 | queue.push(data) |
187 | if(queue.length < size) |
188 | return read(null, next) |
189 | |
190 | var _queue = queue; queue = [] |
191 | cb(null, _queue) |
192 | }) |
193 | } |
194 | } |
195 | |
196 | var flatten = exports.flatten = function (read) { |
197 | var _read |
198 | return function (abort, cb) { |
199 | if(_read) nextChunk() |
200 | else nextStream() |
201 | |
202 | function nextChunk () { |
203 | _read(null, function (end, data) { |
204 | if(end) nextStream() |
205 | else cb(null, data) |
206 | }) |
207 | } |
208 | function nextStream () { |
209 | read(null, function (end, stream) { |
210 | if(end) |
211 | return cb(end) |
212 | if(Array.isArray(stream)) |
213 | stream = sources.values(stream) |
214 | else if('function' != typeof stream) |
215 | throw new Error('expected stream of streams') |
216 | |
217 | _read = stream |
218 | nextChunk() |
219 | }) |
220 | } |
221 | } |
222 | } |
223 | |
224 | var prepend = |
225 | exports.prepend = |
226 | function (read, head) { |
227 | |
228 | return function (abort, cb) { |
229 | if(head !== null) { |
230 | if(abort) |
231 | return read(abort, cb) |
232 | var _head = head |
233 | head = null |
234 | cb(null, _head) |
235 | } else { |
236 | read(abort, cb) |
237 | } |
238 | } |
239 | |
240 | } |
241 | |
242 | //var drainIf = exports.drainIf = function (op, done) { |
243 | // sinks.drain( |
244 | //} |
245 | |
246 | var _reduce = exports._reduce = function (read, reduce, initial) { |
247 | return function (close, cb) { |
248 | if(close) return read(close, cb) |
249 | if(ended) return cb(ended) |
250 | |
251 | sinks.drain(function (item) { |
252 | initial = reduce(initial, item) |
253 | }, function (err, data) { |
254 | ended = err || true |
255 | if(!err) cb(null, initial) |
256 | else cb(ended) |
257 | }) |
258 | (read) |
259 | } |
260 | } |
261 | |
262 | var nextTick = process.nextTick |
263 | |
264 | var highWaterMark = exports.highWaterMark = |
265 | function (read, highWaterMark) { |
266 | var buffer = [], waiting = [], ended, ending, reading = false |
267 | highWaterMark = highWaterMark || 10 |
268 | |
269 | function readAhead () { |
270 | while(waiting.length && (buffer.length || ended)) |
271 | waiting.shift()(ended, ended ? null : buffer.shift()) |
272 | |
273 | if (!buffer.length && ending) ended = ending; |
274 | } |
275 | |
276 | function next () { |
277 | if(ended || ending || reading || buffer.length >= highWaterMark) |
278 | return |
279 | reading = true |
280 | return read(ended || ending, function (end, data) { |
281 | reading = false |
282 | ending = ending || end |
283 | if(data != null) buffer.push(data) |
284 | |
285 | next(); readAhead() |
286 | }) |
287 | } |
288 | |
289 | process.nextTick(next) |
290 | |
291 | return function (end, cb) { |
292 | ended = ended || end |
293 | waiting.push(cb) |
294 | |
295 | next(); readAhead() |
296 | } |
297 | } |
298 | |
299 | |
300 | |
301 |
Built with git-ssb-web