Files: 8a69c2e9c2cee69cd220830ceb1e29bf92d40ca0 / throughs.js
4876 bytesRaw
1 | var u = require('./util') |
2 | var sources = require('./sources') |
3 | var prop = u.prop |
4 | var id = u.id |
5 | var tester = u.tester |
6 | |
7 | var map = exports.map = |
8 | function (read, map) { |
9 | map = prop(map) || id |
10 | return function (end, cb) { |
11 | read(end, function (end, data) { |
12 | var data = !end ? map(data) : null |
13 | cb(end, data) |
14 | }) |
15 | } |
16 | } |
17 | |
18 | var asyncMap = exports.asyncMap = |
19 | function (read, map) { |
20 | if(!map) return read |
21 | return function (end, cb) { |
22 | if(end) return read(end, cb) //abort |
23 | read(null, function (end, data) { |
24 | if(end) return cb(end, data) |
25 | map(data, cb) |
26 | }) |
27 | } |
28 | } |
29 | |
30 | var filter = exports.filter = |
31 | function (read, test) { |
32 | //regexp |
33 | test = tester(test) |
34 | return function next (end, cb) { |
35 | read(end, function (end, data) { |
36 | if(!end && !test(data)) |
37 | return next(end, cb) |
38 | cb(end, data) |
39 | }) |
40 | } |
41 | } |
42 | |
43 | var filterNot = exports.filterNot = |
44 | function (read, test) { |
45 | test = tester(test) |
46 | return filter(read, function (e) { |
47 | return !test(e) |
48 | }) |
49 | } |
50 | |
51 | var through = exports.through = |
52 | function (read, op, onEnd) { |
53 | var a = false |
54 | function once (abort) { |
55 | if(a || !onEnd) return |
56 | a = true |
57 | onEnd(abort === true ? null : abort) |
58 | } |
59 | |
60 | return function (end, cb) { |
61 | if(end) once(end) |
62 | return read(end, function (end, data) { |
63 | if(!end) op && op(data) |
64 | else once(end) |
65 | cb(end, data) |
66 | }) |
67 | } |
68 | } |
69 | |
70 | var take = exports.take = |
71 | function (read, test) { |
72 | var ended = false, more |
73 | if('number' === typeof test) { |
74 | var n = test; test = function () { |
75 | return n -- |
76 | } |
77 | } |
78 | // else |
79 | // test = tester(test) |
80 | |
81 | return function (end, cb) { |
82 | if(ended) return cb(ended) |
83 | if(1 === more) end = true |
84 | if(ended = end) return read(ended, cb) |
85 | |
86 | read(null, function (end, data) { |
87 | if(ended = ended || end) return cb(ended) |
88 | if(!(more = test(data))) { |
89 | ended = true |
90 | read(true, function (end, data) { |
91 | cb(ended, data) |
92 | }) |
93 | } |
94 | else |
95 | cb(null, data) |
96 | }) |
97 | } |
98 | } |
99 | |
100 | var unique = exports.unique = function (read, field, invert) { |
101 | field = prop(field) || id |
102 | var seen = {} |
103 | return filter(read, function (data) { |
104 | var key = field(data) |
105 | if(seen[key]) return !!invert //false, by default |
106 | else seen[key] = true |
107 | return !invert //true by default |
108 | }) |
109 | } |
110 | |
111 | var nonUnique = exports.nonUnique = function (read, field) { |
112 | return unique(read, field, true) |
113 | } |
114 | |
115 | var group = exports.group = |
116 | function (read, size) { |
117 | var ended; size = size || 5 |
118 | var queue = [] |
119 | |
120 | return function (end, cb) { |
121 | //this means that the upstream is sending an error. |
122 | if(end) return read(ended = end, cb) |
123 | //this means that we read an end before. |
124 | if(ended) return cb(ended) |
125 | |
126 | read(null, function next(end, data) { |
127 | if(ended = ended || end) { |
128 | if(!queue.length) |
129 | return cb(ended) |
130 | |
131 | var _queue = queue; queue = [] |
132 | return cb(null, _queue) |
133 | } |
134 | queue.push(data) |
135 | if(queue.length < size) |
136 | return read(null, next) |
137 | |
138 | var _queue = queue; queue = [] |
139 | cb(null, _queue) |
140 | }) |
141 | } |
142 | } |
143 | |
144 | var flatten = exports.flatten = function (read) { |
145 | var _read |
146 | return function (abort, cb) { |
147 | if(_read) nextChunk() |
148 | else nextStream() |
149 | |
150 | function nextChunk () { |
151 | _read(null, function (end, data) { |
152 | if(end) nextStream() |
153 | else cb(null, data) |
154 | }) |
155 | } |
156 | function nextStream () { |
157 | read(null, function (end, stream) { |
158 | if(end) |
159 | return cb(end) |
160 | if(Array.isArray(stream)) |
161 | stream = sources.values(stream) |
162 | else if('function' != typeof stream) |
163 | throw new Error('expected stream of streams') |
164 | |
165 | _read = stream |
166 | nextChunk() |
167 | }) |
168 | } |
169 | } |
170 | |
171 | /* var chunk |
172 | return function (end, cb) { |
173 | //this means that the upstream is sending an error. |
174 | if(end) return read(ended = end, cb) |
175 | |
176 | if(chunk && chunk.length) |
177 | return cb(null, chunk.shift()) |
178 | |
179 | read(null, function (err, data) { |
180 | if(err) return cb(err) |
181 | chunk = data |
182 | |
183 | if(chunk && chunk.length) |
184 | return cb(null, chunk.shift()) |
185 | }) |
186 | }*/ |
187 | } |
188 | |
189 | var nextTick = process.nextTick |
190 | |
191 | var highWaterMark = exports.highWaterMark = |
192 | function (read, highWaterMark) { |
193 | var buffer = [], waiting = [], ended, reading = false |
194 | highWaterMark = highWaterMark || 10 |
195 | |
196 | function readAhead () { |
197 | while(waiting.length && (buffer.length || ended)) |
198 | waiting.shift()(ended, ended ? null : buffer.shift()) |
199 | } |
200 | |
201 | function next () { |
202 | if(ended || reading || buffer.length >= highWaterMark) |
203 | return |
204 | reading = true |
205 | return read(ended, function (end, data) { |
206 | reading = false |
207 | ended = ended || end |
208 | if(data != null) buffer.push(data) |
209 | |
210 | next(); readAhead() |
211 | }) |
212 | } |
213 | |
214 | nextTick(next) |
215 | |
216 | return function (end, cb) { |
217 | ended = ended || end |
218 | waiting.push(cb) |
219 | |
220 | next(); readAhead() |
221 | } |
222 | } |
223 | |
224 | |
225 | |
226 |
Built with git-ssb-web