Files: 0990f2724d1de3b5d3ac4000a6cb2680aab8c194 / throughs.js
6076 bytesRaw
1 | var u = require('pull-core') |
2 | var sources = require('./sources') |
3 | var sinks = require('./sinks') |
4 | |
5 | var prop = u.prop |
6 | var id = u.id |
7 | var tester = u.tester |
8 | |
9 | var map = exports.map = |
10 | function (read, map) { |
11 | map = prop(map) || id |
12 | return function (end, cb) { |
13 | read(end, function (end, data) { |
14 | var data = !end ? map(data) : null |
15 | cb(end, data) |
16 | }) |
17 | } |
18 | } |
19 | |
20 | var asyncMap = exports.asyncMap = |
21 | function (read, map) { |
22 | if(!map) return read |
23 | return function (end, cb) { |
24 | if(end) return read(end, cb) //abort |
25 | read(null, function (end, data) { |
26 | if(end) return cb(end, data) |
27 | map(data, cb) |
28 | }) |
29 | } |
30 | } |
31 | |
32 | var paraMap = exports.paraMap = |
33 | function (read, map, width) { |
34 | if(!map) return read |
35 | var ended = false, queue = [], _cb |
36 | |
37 | function drain () { |
38 | if(!_cb) return |
39 | var cb = _cb |
40 | _cb = null |
41 | if(queue.length) |
42 | return cb(null, queue.shift()) |
43 | else if(ended && !n) |
44 | return cb(ended) |
45 | _cb = cb |
46 | } |
47 | |
48 | function pull () { |
49 | read(null, function (end, data) { |
50 | if(end) { |
51 | ended = end |
52 | return drain() |
53 | } |
54 | n++ |
55 | map(data, function (err, data) { |
56 | n-- |
57 | |
58 | queue.push(data) |
59 | drain() |
60 | }) |
61 | |
62 | if(n < width && !ended) |
63 | pull() |
64 | }) |
65 | } |
66 | |
67 | var n = 0 |
68 | return function (end, cb) { |
69 | if(end) return read(end, cb) //abort |
70 | //continue to read while there are less than 3 maps in flight |
71 | _cb = cb |
72 | if(queue.length || ended) |
73 | pull(), drain() |
74 | else pull() |
75 | } |
76 | return highWaterMark(asyncMap(read, map), width) |
77 | } |
78 | |
79 | var filter = exports.filter = |
80 | function (read, test) { |
81 | //regexp |
82 | test = tester(test) |
83 | return function next (end, cb) { |
84 | read(end, function (end, data) { |
85 | if(!end && !test(data)) |
86 | return next(end, cb) |
87 | cb(end, data) |
88 | }) |
89 | } |
90 | } |
91 | |
92 | var filterNot = exports.filterNot = |
93 | function (read, test) { |
94 | test = tester(test) |
95 | return filter(read, function (e) { |
96 | return !test(e) |
97 | }) |
98 | } |
99 | |
100 | var through = exports.through = |
101 | function (read, op, onEnd) { |
102 | var a = false |
103 | function once (abort) { |
104 | if(a || !onEnd) return |
105 | a = true |
106 | onEnd(abort === true ? null : abort) |
107 | } |
108 | |
109 | return function (end, cb) { |
110 | if(end) once(end) |
111 | return read(end, function (end, data) { |
112 | if(!end) op && op(data) |
113 | else once(end) |
114 | cb(end, data) |
115 | }) |
116 | } |
117 | } |
118 | |
119 | var take = exports.take = |
120 | function (read, test) { |
121 | var ended = false |
122 | if('number' === typeof test) { |
123 | var n = test; test = function () { |
124 | return n -- |
125 | } |
126 | } |
127 | |
128 | return function (end, cb) { |
129 | if(ended) return cb(ended) |
130 | if(ended = end) return read(ended, cb) |
131 | |
132 | read(null, function (end, data) { |
133 | if(ended = ended || end) return cb(ended) |
134 | if(!test(data)) { |
135 | ended = true |
136 | read(true, function (end, data) { |
137 | cb(ended, data) |
138 | }) |
139 | } |
140 | else |
141 | cb(null, data) |
142 | }) |
143 | } |
144 | } |
145 | |
146 | var unique = exports.unique = function (read, field, invert) { |
147 | field = prop(field) || id |
148 | var seen = {} |
149 | return filter(read, function (data) { |
150 | var key = field(data) |
151 | if(seen[key]) return !!invert //false, by default |
152 | else seen[key] = true |
153 | return !invert //true by default |
154 | }) |
155 | } |
156 | |
157 | var nonUnique = exports.nonUnique = function (read, field) { |
158 | return unique(read, field, true) |
159 | } |
160 | |
161 | var group = exports.group = |
162 | function (read, size) { |
163 | var ended; size = size || 5 |
164 | var queue = [] |
165 | |
166 | return function (end, cb) { |
167 | //this means that the upstream is sending an error. |
168 | if(end) return read(ended = end, cb) |
169 | //this means that we read an end before. |
170 | if(ended) return cb(ended) |
171 | |
172 | read(null, function next(end, data) { |
173 | if(ended = ended || end) { |
174 | if(!queue.length) |
175 | return cb(ended) |
176 | |
177 | var _queue = queue; queue = [] |
178 | return cb(null, _queue) |
179 | } |
180 | queue.push(data) |
181 | if(queue.length < size) |
182 | return read(null, next) |
183 | |
184 | var _queue = queue; queue = [] |
185 | cb(null, _queue) |
186 | }) |
187 | } |
188 | } |
189 | |
190 | var flatten = exports.flatten = function (read) { |
191 | var _read |
192 | return function (abort, cb) { |
193 | if(_read) nextChunk() |
194 | else nextStream() |
195 | |
196 | function nextChunk () { |
197 | _read(null, function (end, data) { |
198 | if(end) nextStream() |
199 | else cb(null, data) |
200 | }) |
201 | } |
202 | function nextStream () { |
203 | read(null, function (end, stream) { |
204 | if(end) |
205 | return cb(end) |
206 | if(Array.isArray(stream)) |
207 | stream = sources.values(stream) |
208 | else if('function' != typeof stream) |
209 | throw new Error('expected stream of streams') |
210 | |
211 | _read = stream |
212 | nextChunk() |
213 | }) |
214 | } |
215 | } |
216 | } |
217 | |
218 | var prepend = |
219 | exports.prepend = |
220 | function (read, head) { |
221 | |
222 | return function (abort, cb) { |
223 | if(head !== null) { |
224 | if(abort) |
225 | return read(abort, cb) |
226 | var _head = head |
227 | head = null |
228 | cb(null, _head) |
229 | } else { |
230 | read(abort, cb) |
231 | } |
232 | } |
233 | |
234 | } |
235 | |
236 | //var drainIf = exports.drainIf = function (op, done) { |
237 | // sinks.drain( |
238 | //} |
239 | |
240 | var _reduce = exports._reduce = function (read, reduce, initial) { |
241 | return function (close, cb) { |
242 | if(close) return read(close, cb) |
243 | if(ended) return cb(ended) |
244 | |
245 | sinks.drain(function (item) { |
246 | initial = reduce(initial, item) |
247 | }, function (err, data) { |
248 | ended = err || true |
249 | if(!err) cb(null, initial) |
250 | else cb(ended) |
251 | }) |
252 | (read) |
253 | } |
254 | } |
255 | |
256 | var nextTick = process.nextTick |
257 | |
258 | var highWaterMark = exports.highWaterMark = |
259 | function (read, highWaterMark) { |
260 | var buffer = [], waiting = [], ended, reading = false |
261 | highWaterMark = highWaterMark || 10 |
262 | |
263 | function readAhead () { |
264 | while(waiting.length && (buffer.length || ended)) |
265 | waiting.shift()(ended, ended ? null : buffer.shift()) |
266 | } |
267 | |
268 | function next () { |
269 | if(ended || reading || buffer.length >= highWaterMark) |
270 | return |
271 | reading = true |
272 | return read(ended, function (end, data) { |
273 | reading = false |
274 | ended = ended || end |
275 | if(data != null) buffer.push(data) |
276 | |
277 | next(); readAhead() |
278 | }) |
279 | } |
280 | |
281 | nextTick(next) |
282 | |
283 | return function (end, cb) { |
284 | ended = ended || end |
285 | waiting.push(cb) |
286 | |
287 | next(); readAhead() |
288 | } |
289 | } |
290 | |
291 | |
292 | |
293 |
Built with git-ssb-web