git ssb

2+

Dominic / pull-stream



Tree: 0990f2724d1de3b5d3ac4000a6cb2680aab8c194

Files: 0990f2724d1de3b5d3ac4000a6cb2680aab8c194 / throughs.js

6076 bytesRaw
1var u = require('pull-core')
2var sources = require('./sources')
3var sinks = require('./sinks')
4
5var prop = u.prop
6var id = u.id
7var tester = u.tester
8
9var map = exports.map =
10function (read, map) {
11 map = prop(map) || id
12 return function (end, cb) {
13 read(end, function (end, data) {
14 var data = !end ? map(data) : null
15 cb(end, data)
16 })
17 }
18}
19
20var asyncMap = exports.asyncMap =
21function (read, map) {
22 if(!map) return read
23 return function (end, cb) {
24 if(end) return read(end, cb) //abort
25 read(null, function (end, data) {
26 if(end) return cb(end, data)
27 map(data, cb)
28 })
29 }
30}
31
32var paraMap = exports.paraMap =
33function (read, map, width) {
34 if(!map) return read
35 var ended = false, queue = [], _cb
36
37 function drain () {
38 if(!_cb) return
39 var cb = _cb
40 _cb = null
41 if(queue.length)
42 return cb(null, queue.shift())
43 else if(ended && !n)
44 return cb(ended)
45 _cb = cb
46 }
47
48 function pull () {
49 read(null, function (end, data) {
50 if(end) {
51 ended = end
52 return drain()
53 }
54 n++
55 map(data, function (err, data) {
56 n--
57
58 queue.push(data)
59 drain()
60 })
61
62 if(n < width && !ended)
63 pull()
64 })
65 }
66
67 var n = 0
68 return function (end, cb) {
69 if(end) return read(end, cb) //abort
70 //continue to read while there are less than 3 maps in flight
71 _cb = cb
72 if(queue.length || ended)
73 pull(), drain()
74 else pull()
75 }
76 return highWaterMark(asyncMap(read, map), width)
77}
78
79var filter = exports.filter =
80function (read, test) {
81 //regexp
82 test = tester(test)
83 return function next (end, cb) {
84 read(end, function (end, data) {
85 if(!end && !test(data))
86 return next(end, cb)
87 cb(end, data)
88 })
89 }
90}
91
92var filterNot = exports.filterNot =
93function (read, test) {
94 test = tester(test)
95 return filter(read, function (e) {
96 return !test(e)
97 })
98}
99
100var through = exports.through =
101function (read, op, onEnd) {
102 var a = false
103 function once (abort) {
104 if(a || !onEnd) return
105 a = true
106 onEnd(abort === true ? null : abort)
107 }
108
109 return function (end, cb) {
110 if(end) once(end)
111 return read(end, function (end, data) {
112 if(!end) op && op(data)
113 else once(end)
114 cb(end, data)
115 })
116 }
117}
118
119var take = exports.take =
120function (read, test) {
121 var ended = false
122 if('number' === typeof test) {
123 var n = test; test = function () {
124 return n --
125 }
126 }
127
128 return function (end, cb) {
129 if(ended) return cb(ended)
130 if(ended = end) return read(ended, cb)
131
132 read(null, function (end, data) {
133 if(ended = ended || end) return cb(ended)
134 if(!test(data)) {
135 ended = true
136 read(true, function (end, data) {
137 cb(ended, data)
138 })
139 }
140 else
141 cb(null, data)
142 })
143 }
144}
145
146var unique = exports.unique = function (read, field, invert) {
147 field = prop(field) || id
148 var seen = {}
149 return filter(read, function (data) {
150 var key = field(data)
151 if(seen[key]) return !!invert //false, by default
152 else seen[key] = true
153 return !invert //true by default
154 })
155}
156
157var nonUnique = exports.nonUnique = function (read, field) {
158 return unique(read, field, true)
159}
160
161var group = exports.group =
162function (read, size) {
163 var ended; size = size || 5
164 var queue = []
165
166 return function (end, cb) {
167 //this means that the upstream is sending an error.
168 if(end) return read(ended = end, cb)
169 //this means that we read an end before.
170 if(ended) return cb(ended)
171
172 read(null, function next(end, data) {
173 if(ended = ended || end) {
174 if(!queue.length)
175 return cb(ended)
176
177 var _queue = queue; queue = []
178 return cb(null, _queue)
179 }
180 queue.push(data)
181 if(queue.length < size)
182 return read(null, next)
183
184 var _queue = queue; queue = []
185 cb(null, _queue)
186 })
187 }
188}
189
190var flatten = exports.flatten = function (read) {
191 var _read
192 return function (abort, cb) {
193 if(_read) nextChunk()
194 else nextStream()
195
196 function nextChunk () {
197 _read(null, function (end, data) {
198 if(end) nextStream()
199 else cb(null, data)
200 })
201 }
202 function nextStream () {
203 read(null, function (end, stream) {
204 if(end)
205 return cb(end)
206 if(Array.isArray(stream))
207 stream = sources.values(stream)
208 else if('function' != typeof stream)
209 throw new Error('expected stream of streams')
210
211 _read = stream
212 nextChunk()
213 })
214 }
215 }
216}
217
218var prepend =
219exports.prepend =
220function (read, head) {
221
222 return function (abort, cb) {
223 if(head !== null) {
224 if(abort)
225 return read(abort, cb)
226 var _head = head
227 head = null
228 cb(null, _head)
229 } else {
230 read(abort, cb)
231 }
232 }
233
234}
235
236//var drainIf = exports.drainIf = function (op, done) {
237// sinks.drain(
238//}
239
240var _reduce = exports._reduce = function (read, reduce, initial) {
241 return function (close, cb) {
242 if(close) return read(close, cb)
243 if(ended) return cb(ended)
244
245 sinks.drain(function (item) {
246 initial = reduce(initial, item)
247 }, function (err, data) {
248 ended = err || true
249 if(!err) cb(null, initial)
250 else cb(ended)
251 })
252 (read)
253 }
254}
255
256var nextTick = process.nextTick
257
258var highWaterMark = exports.highWaterMark =
259function (read, highWaterMark) {
260 var buffer = [], waiting = [], ended, reading = false
261 highWaterMark = highWaterMark || 10
262
263 function readAhead () {
264 while(waiting.length && (buffer.length || ended))
265 waiting.shift()(ended, ended ? null : buffer.shift())
266 }
267
268 function next () {
269 if(ended || reading || buffer.length >= highWaterMark)
270 return
271 reading = true
272 return read(ended, function (end, data) {
273 reading = false
274 ended = ended || end
275 if(data != null) buffer.push(data)
276
277 next(); readAhead()
278 })
279 }
280
281 nextTick(next)
282
283 return function (end, cb) {
284 ended = ended || end
285 waiting.push(cb)
286
287 next(); readAhead()
288 }
289}
290
291
292
293

Built with git-ssb-web