git ssb

2+

Dominic / pull-stream



Tree: 623e06d3e4aae1dfccc051e0b1a5fa1e9cc405e4

Files: 623e06d3e4aae1dfccc051e0b1a5fa1e9cc405e4 / throughs.js

7607 bytesRaw
1var u = require('pull-core')
2var sources = require('./sources')
3var sinks = require('./sinks')
4
5var prop = u.prop
6var id = u.id
7var tester = u.tester
8
9var map = exports.map =
10function (read, map) {
11 map = prop(map) || id
12 return function (abort, cb) {
13 read(abort, function (end, data) {
14 try {
15 data = !end ? map(data) : null
16 } catch (err) {
17 return read(err, function () {
18 return cb(err)
19 })
20 }
21 cb(end, data)
22 })
23 }
24}
25
26var asyncMap = exports.asyncMap =
27function (read, map) {
28 if(!map) return read
29 return function (end, cb) {
30 if(end) return read(end, cb) //abort
31 read(null, function (end, data) {
32 if(end) return cb(end, data)
33 map(data, cb)
34 })
35 }
36}
37
38var paraMap = exports.paraMap =
39function (read, map, width) {
40 if(!map) return read
41 var ended = false, queue = [], _cb
42
43 function drain () {
44 if(!_cb) return
45 var cb = _cb
46 _cb = null
47 if(queue.length)
48 return cb(null, queue.shift())
49 else if(ended && !n)
50 return cb(ended)
51 _cb = cb
52 }
53
54 function pull () {
55 read(null, function (end, data) {
56 if(end) {
57 ended = end
58 return drain()
59 }
60 n++
61 map(data, function (err, data) {
62 n--
63
64 queue.push(data)
65 drain()
66 })
67
68 if(n < width && !ended)
69 pull()
70 })
71 }
72
73 var n = 0
74 return function (end, cb) {
75 if(end) return read(end, cb) //abort
76 //continue to read while there are less than 3 maps in flight
77 _cb = cb
78 if(queue.length || ended)
79 pull(), drain()
80 else pull()
81 }
82 return highWaterMark(asyncMap(read, map), width)
83}
84
85var filter = exports.filter =
86function (read, test) {
87 //regexp
88 test = tester(test)
89 return function next (end, cb) {
90 var sync, loop = true
91 while(loop) {
92 loop = false
93 sync = true
94 read(end, function (end, data) {
95 if(!end && !test(data))
96 return sync ? loop = true : next(end, cb)
97 cb(end, data)
98 })
99 sync = false
100 }
101 }
102}
103
104var filterNot = exports.filterNot =
105function (read, test) {
106 test = tester(test)
107 return filter(read, function (e) {
108 return !test(e)
109 })
110}
111
112var through = exports.through =
113function (read, op, onEnd) {
114 var a = false
115 function once (abort) {
116 if(a || !onEnd) return
117 a = true
118 onEnd(abort === true ? null : abort)
119 }
120
121 return function (end, cb) {
122 if(end) once(end)
123 return read(end, function (end, data) {
124 if(!end) op && op(data)
125 else once(end)
126 cb(end, data)
127 })
128 }
129}
130
131var take = exports.take =
132function (read, test, opts) {
133 opts = opts || {}
134 var last = opts.last || false // whether the first item for which !test(item) should still pass
135 var ended = false
136 if('number' === typeof test) {
137 last = true
138 var n = test; test = function () {
139 return --n
140 }
141 }
142
143 function terminate (cb) {
144 read(true, function (err) {
145 last = false; cb(err || true)
146 })
147 }
148
149 return function (end, cb) {
150 if(ended) last ? terminate(cb) : cb(ended)
151 else if(ended = end) read(ended, cb)
152 else
153 read(null, function (end, data) {
154 if(ended = ended || end) {
155 //last ? terminate(cb) :
156 cb(ended)
157 }
158 else if(!test(data)) {
159 ended = true
160 last ? cb(null, data) : terminate(cb)
161 }
162 else
163 cb(null, data)
164 })
165 }
166}
167
168var unique = exports.unique = function (read, field, invert) {
169 field = prop(field) || id
170 var seen = {}
171 return filter(read, function (data) {
172 var key = field(data)
173 if(seen[key]) return !!invert //false, by default
174 else seen[key] = true
175 return !invert //true by default
176 })
177}
178
179var nonUnique = exports.nonUnique = function (read, field) {
180 return unique(read, field, true)
181}
182
183var group = exports.group =
184function (read, size) {
185 var ended; size = size || 5
186 var queue = []
187
188 return function (end, cb) {
189 //this means that the upstream is sending an error.
190 if(end) return read(ended = end, cb)
191 //this means that we read an end before.
192 if(ended) return cb(ended)
193
194 read(null, function next(end, data) {
195 if(ended = ended || end) {
196 if(!queue.length)
197 return cb(ended)
198
199 var _queue = queue; queue = []
200 return cb(null, _queue)
201 }
202 queue.push(data)
203 if(queue.length < size)
204 return read(null, next)
205
206 var _queue = queue; queue = []
207 cb(null, _queue)
208 })
209 }
210}
211
212var flatten = exports.flatten = function (read) {
213 var _read
214 return function (abort, cb) {
215 if (abort) {
216 _read ? _read(abort, function(err) {
217 read(err || abort, cb)
218 }) : read(abort, cb)
219 }
220 else if(_read) nextChunk()
221 else nextStream()
222
223 function nextChunk () {
224 _read(null, function (err, data) {
225 if (err === true) nextStream()
226 else if (err) {
227 read(true, function(abortErr) {
228 // TODO: what do we do with the abortErr?
229 cb(err)
230 })
231 }
232 else cb(null, data)
233 })
234 }
235 function nextStream () {
236 read(null, function (end, stream) {
237 if(end)
238 return cb(end)
239 if(Array.isArray(stream) || stream && 'object' === typeof stream)
240 stream = sources.values(stream)
241 else if('function' != typeof stream)
242 throw new Error('expected stream of streams')
243 _read = stream
244 nextChunk()
245 })
246 }
247 }
248}
249
250var prepend =
251exports.prepend =
252function (read, head) {
253
254 return function (abort, cb) {
255 if(head !== null) {
256 if(abort)
257 return read(abort, cb)
258 var _head = head
259 head = null
260 cb(null, _head)
261 } else {
262 read(abort, cb)
263 }
264 }
265
266}
267
268//var drainIf = exports.drainIf = function (op, done) {
269// sinks.drain(
270//}
271
272var _reduce = exports._reduce = function (read, reduce, initial) {
273 return function (close, cb) {
274 if(close) return read(close, cb)
275 if(ended) return cb(ended)
276
277 sinks.drain(function (item) {
278 initial = reduce(initial, item)
279 }, function (err, data) {
280 ended = err || true
281 if(!err) cb(null, initial)
282 else cb(ended)
283 })
284 (read)
285 }
286}
287
288var nextTick = process.nextTick
289
290var highWaterMark = exports.highWaterMark =
291function (read, highWaterMark) {
292 var buffer = [], waiting = [], ended, ending, reading = false
293 highWaterMark = highWaterMark || 10
294
295 function readAhead () {
296 while(waiting.length && (buffer.length || ended))
297 waiting.shift()(ended, ended ? null : buffer.shift())
298
299 if (!buffer.length && ending) ended = ending;
300 }
301
302 function next () {
303 if(ended || ending || reading || buffer.length >= highWaterMark)
304 return
305 reading = true
306 return read(ended || ending, function (end, data) {
307 reading = false
308 ending = ending || end
309 if(data != null) buffer.push(data)
310
311 next(); readAhead()
312 })
313 }
314
315 process.nextTick(next)
316
317 return function (end, cb) {
318 ended = ended || end
319 waiting.push(cb)
320
321 next(); readAhead()
322 }
323}
324
325var flatMap = exports.flatMap =
326function (read, mapper) {
327 mapper = mapper || id
328 var queue = [], ended
329
330 return function (abort, cb) {
331 if(queue.length) return cb(null, queue.shift())
332 else if(ended) return cb(ended)
333
334 read(abort, function next (end, data) {
335 if(end) ended = end
336 else {
337 var add = mapper(data)
338 while(add && add.length)
339 queue.push(add.shift())
340 }
341
342 if(queue.length) cb(null, queue.shift())
343 else if(ended) cb(ended)
344 else read(null, next)
345 })
346 }
347}
348
349

Built with git-ssb-web