git ssb

2+

Dominic / pull-stream



Tree: 8a69c2e9c2cee69cd220830ceb1e29bf92d40ca0

Files: 8a69c2e9c2cee69cd220830ceb1e29bf92d40ca0 / throughs.js

4876 bytesRaw
1var u = require('./util')
2var sources = require('./sources')
3var prop = u.prop
4var id = u.id
5var tester = u.tester
6
7var map = exports.map =
8function (read, map) {
9 map = prop(map) || id
10 return function (end, cb) {
11 read(end, function (end, data) {
12 var data = !end ? map(data) : null
13 cb(end, data)
14 })
15 }
16}
17
18var asyncMap = exports.asyncMap =
19function (read, map) {
20 if(!map) return read
21 return function (end, cb) {
22 if(end) return read(end, cb) //abort
23 read(null, function (end, data) {
24 if(end) return cb(end, data)
25 map(data, cb)
26 })
27 }
28}
29
30var filter = exports.filter =
31function (read, test) {
32 //regexp
33 test = tester(test)
34 return function next (end, cb) {
35 read(end, function (end, data) {
36 if(!end && !test(data))
37 return next(end, cb)
38 cb(end, data)
39 })
40 }
41}
42
43var filterNot = exports.filterNot =
44function (read, test) {
45 test = tester(test)
46 return filter(read, function (e) {
47 return !test(e)
48 })
49}
50
51var through = exports.through =
52function (read, op, onEnd) {
53 var a = false
54 function once (abort) {
55 if(a || !onEnd) return
56 a = true
57 onEnd(abort === true ? null : abort)
58 }
59
60 return function (end, cb) {
61 if(end) once(end)
62 return read(end, function (end, data) {
63 if(!end) op && op(data)
64 else once(end)
65 cb(end, data)
66 })
67 }
68}
69
70var take = exports.take =
71function (read, test) {
72 var ended = false, more
73 if('number' === typeof test) {
74 var n = test; test = function () {
75 return n --
76 }
77 }
78 // else
79// test = tester(test)
80
81 return function (end, cb) {
82 if(ended) return cb(ended)
83 if(1 === more) end = true
84 if(ended = end) return read(ended, cb)
85
86 read(null, function (end, data) {
87 if(ended = ended || end) return cb(ended)
88 if(!(more = test(data))) {
89 ended = true
90 read(true, function (end, data) {
91 cb(ended, data)
92 })
93 }
94 else
95 cb(null, data)
96 })
97 }
98}
99
100var unique = exports.unique = function (read, field, invert) {
101 field = prop(field) || id
102 var seen = {}
103 return filter(read, function (data) {
104 var key = field(data)
105 if(seen[key]) return !!invert //false, by default
106 else seen[key] = true
107 return !invert //true by default
108 })
109}
110
111var nonUnique = exports.nonUnique = function (read, field) {
112 return unique(read, field, true)
113}
114
115var group = exports.group =
116function (read, size) {
117 var ended; size = size || 5
118 var queue = []
119
120 return function (end, cb) {
121 //this means that the upstream is sending an error.
122 if(end) return read(ended = end, cb)
123 //this means that we read an end before.
124 if(ended) return cb(ended)
125
126 read(null, function next(end, data) {
127 if(ended = ended || end) {
128 if(!queue.length)
129 return cb(ended)
130
131 var _queue = queue; queue = []
132 return cb(null, _queue)
133 }
134 queue.push(data)
135 if(queue.length < size)
136 return read(null, next)
137
138 var _queue = queue; queue = []
139 cb(null, _queue)
140 })
141 }
142}
143
144var flatten = exports.flatten = function (read) {
145 var _read
146 return function (abort, cb) {
147 if(_read) nextChunk()
148 else nextStream()
149
150 function nextChunk () {
151 _read(null, function (end, data) {
152 if(end) nextStream()
153 else cb(null, data)
154 })
155 }
156 function nextStream () {
157 read(null, function (end, stream) {
158 if(end)
159 return cb(end)
160 if(Array.isArray(stream))
161 stream = sources.values(stream)
162 else if('function' != typeof stream)
163 throw new Error('expected stream of streams')
164
165 _read = stream
166 nextChunk()
167 })
168 }
169 }
170
171/* var chunk
172 return function (end, cb) {
173 //this means that the upstream is sending an error.
174 if(end) return read(ended = end, cb)
175
176 if(chunk && chunk.length)
177 return cb(null, chunk.shift())
178
179 read(null, function (err, data) {
180 if(err) return cb(err)
181 chunk = data
182
183 if(chunk && chunk.length)
184 return cb(null, chunk.shift())
185 })
186 }*/
187}
188
189var nextTick = process.nextTick
190
191var highWaterMark = exports.highWaterMark =
192function (read, highWaterMark) {
193 var buffer = [], waiting = [], ended, reading = false
194 highWaterMark = highWaterMark || 10
195
196 function readAhead () {
197 while(waiting.length && (buffer.length || ended))
198 waiting.shift()(ended, ended ? null : buffer.shift())
199 }
200
201 function next () {
202 if(ended || reading || buffer.length >= highWaterMark)
203 return
204 reading = true
205 return read(ended, function (end, data) {
206 reading = false
207 ended = ended || end
208 if(data != null) buffer.push(data)
209
210 next(); readAhead()
211 })
212 }
213
214 nextTick(next)
215
216 return function (end, cb) {
217 ended = ended || end
218 waiting.push(cb)
219
220 next(); readAhead()
221 }
222}
223
224
225
226

Built with git-ssb-web