git ssb

2+

Dominic / pull-stream



Tree: 578b08f48f151f6e93c4c7c0f1eccd59dea1cbd4

Files: 578b08f48f151f6e93c4c7c0f1eccd59dea1cbd4 / throughs.js

6275 bytesRaw
1var u = require('pull-core')
2var sources = require('./sources')
3var sinks = require('./sinks')
4
5var prop = u.prop
6var id = u.id
7var tester = u.tester
8
9var map = exports.map =
10function (read, map) {
11 map = prop(map) || id
12 return function (abort, cb) {
13 read(abort, function (end, data) {
14 try {
15 data = !end ? map(data) : null
16 } catch (err) {
17 return read(err, function () {
18 return cb(err)
19 })
20 }
21 cb(end, data)
22 })
23 }
24}
25
26var asyncMap = exports.asyncMap =
27function (read, map) {
28 if(!map) return read
29 return function (end, cb) {
30 if(end) return read(end, cb) //abort
31 read(null, function (end, data) {
32 if(end) return cb(end, data)
33 map(data, cb)
34 })
35 }
36}
37
38var paraMap = exports.paraMap =
39function (read, map, width) {
40 if(!map) return read
41 var ended = false, queue = [], _cb
42
43 function drain () {
44 if(!_cb) return
45 var cb = _cb
46 _cb = null
47 if(queue.length)
48 return cb(null, queue.shift())
49 else if(ended && !n)
50 return cb(ended)
51 _cb = cb
52 }
53
54 function pull () {
55 read(null, function (end, data) {
56 if(end) {
57 ended = end
58 return drain()
59 }
60 n++
61 map(data, function (err, data) {
62 n--
63
64 queue.push(data)
65 drain()
66 })
67
68 if(n < width && !ended)
69 pull()
70 })
71 }
72
73 var n = 0
74 return function (end, cb) {
75 if(end) return read(end, cb) //abort
76 //continue to read while there are less than 3 maps in flight
77 _cb = cb
78 if(queue.length || ended)
79 pull(), drain()
80 else pull()
81 }
82 return highWaterMark(asyncMap(read, map), width)
83}
84
85var filter = exports.filter =
86function (read, test) {
87 //regexp
88 test = tester(test)
89 return function next (end, cb) {
90 read(end, function (end, data) {
91 if(!end && !test(data))
92 return next(end, cb)
93 cb(end, data)
94 })
95 }
96}
97
98var filterNot = exports.filterNot =
99function (read, test) {
100 test = tester(test)
101 return filter(read, function (e) {
102 return !test(e)
103 })
104}
105
106var through = exports.through =
107function (read, op, onEnd) {
108 var a = false
109 function once (abort) {
110 if(a || !onEnd) return
111 a = true
112 onEnd(abort === true ? null : abort)
113 }
114
115 return function (end, cb) {
116 if(end) once(end)
117 return read(end, function (end, data) {
118 if(!end) op && op(data)
119 else once(end)
120 cb(end, data)
121 })
122 }
123}
124
125var take = exports.take =
126function (read, test) {
127 var ended = false
128 if('number' === typeof test) {
129 var n = test; test = function () {
130 return n --
131 }
132 }
133
134 return function (end, cb) {
135 if(ended) return cb(ended)
136 if(ended = end) return read(ended, cb)
137
138 read(null, function (end, data) {
139 if(ended = ended || end) return cb(ended)
140 if(!test(data)) {
141 ended = true
142 read(true, function (end, data) {
143 cb(ended, data)
144 })
145 }
146 else
147 cb(null, data)
148 })
149 }
150}
151
152var unique = exports.unique = function (read, field, invert) {
153 field = prop(field) || id
154 var seen = {}
155 return filter(read, function (data) {
156 var key = field(data)
157 if(seen[key]) return !!invert //false, by default
158 else seen[key] = true
159 return !invert //true by default
160 })
161}
162
163var nonUnique = exports.nonUnique = function (read, field) {
164 return unique(read, field, true)
165}
166
167var group = exports.group =
168function (read, size) {
169 var ended; size = size || 5
170 var queue = []
171
172 return function (end, cb) {
173 //this means that the upstream is sending an error.
174 if(end) return read(ended = end, cb)
175 //this means that we read an end before.
176 if(ended) return cb(ended)
177
178 read(null, function next(end, data) {
179 if(ended = ended || end) {
180 if(!queue.length)
181 return cb(ended)
182
183 var _queue = queue; queue = []
184 return cb(null, _queue)
185 }
186 queue.push(data)
187 if(queue.length < size)
188 return read(null, next)
189
190 var _queue = queue; queue = []
191 cb(null, _queue)
192 })
193 }
194}
195
196var flatten = exports.flatten = function (read) {
197 var _read
198 return function (abort, cb) {
199 if(_read) nextChunk()
200 else nextStream()
201
202 function nextChunk () {
203 _read(null, function (end, data) {
204 if(end) nextStream()
205 else cb(null, data)
206 })
207 }
208 function nextStream () {
209 read(null, function (end, stream) {
210 if(end)
211 return cb(end)
212 if(Array.isArray(stream))
213 stream = sources.values(stream)
214 else if('function' != typeof stream)
215 throw new Error('expected stream of streams')
216
217 _read = stream
218 nextChunk()
219 })
220 }
221 }
222}
223
224var prepend =
225exports.prepend =
226function (read, head) {
227
228 return function (abort, cb) {
229 if(head !== null) {
230 if(abort)
231 return read(abort, cb)
232 var _head = head
233 head = null
234 cb(null, _head)
235 } else {
236 read(abort, cb)
237 }
238 }
239
240}
241
242//var drainIf = exports.drainIf = function (op, done) {
243// sinks.drain(
244//}
245
246var _reduce = exports._reduce = function (read, reduce, initial) {
247 return function (close, cb) {
248 if(close) return read(close, cb)
249 if(ended) return cb(ended)
250
251 sinks.drain(function (item) {
252 initial = reduce(initial, item)
253 }, function (err, data) {
254 ended = err || true
255 if(!err) cb(null, initial)
256 else cb(ended)
257 })
258 (read)
259 }
260}
261
262var nextTick = process.nextTick
263
264var highWaterMark = exports.highWaterMark =
265function (read, highWaterMark) {
266 var buffer = [], waiting = [], ended, ending, reading = false
267 highWaterMark = highWaterMark || 10
268
269 function readAhead () {
270 while(waiting.length && (buffer.length || ended))
271 waiting.shift()(ended, ended ? null : buffer.shift())
272
273 if (!buffer.length && ending) ended = ending;
274 }
275
276 function next () {
277 if(ended || ending || reading || buffer.length >= highWaterMark)
278 return
279 reading = true
280 return read(ended || ending, function (end, data) {
281 reading = false
282 ending = ending || end
283 if(data != null) buffer.push(data)
284
285 next(); readAhead()
286 })
287 }
288
289 process.nextTick(next)
290
291 return function (end, cb) {
292 ended = ended || end
293 waiting.push(cb)
294
295 next(); readAhead()
296 }
297}
298
299
300
301

Built with git-ssb-web