Files: 77e8971d09fb121162d025a7851aca84b3d5562f / throughs.js
5937 bytesRaw
1 | ; |
2 | |
3 | function id (item) { return item } |
4 | |
5 | function prop (key) { |
6 | return ( |
7 | 'string' == typeof key |
8 | ? function (data) { return data[key] } |
9 | : 'object' === typeof key && 'function' === typeof key.exec //regexp |
10 | ? function (data) { var v = map.exec(data); return v && v[0] } |
11 | : key |
12 | ) |
13 | } |
14 | |
15 | function tester (test) { |
16 | return ( |
17 | 'object' === typeof test && 'function' === typeof test.test //regexp |
18 | ? function (data) { return test.test(data) } |
19 | : prop (test) || id |
20 | ) |
21 | } |
22 | |
23 | var sources = require('./sources') |
24 | var sinks = require('./sinks') |
25 | |
26 | var map = exports.map = |
27 | |
28 | function (map) { |
29 | if(!map) return id |
30 | map = prop(map) |
31 | return function (read) { |
32 | return function (abort, cb) { |
33 | read(abort, function (end, data) { |
34 | try { |
35 | data = !end ? map(data) : null |
36 | } catch (err) { |
37 | return read(err, function () { |
38 | return cb(err) |
39 | }) |
40 | } |
41 | cb(end, data) |
42 | }) |
43 | } |
44 | } |
45 | } |
46 | |
47 | var asyncMap = exports.asyncMap = |
48 | function async (map) { |
49 | if(!map) return id |
50 | map = prop(map) |
51 | var busy = false, abortCb, aborted |
52 | return function (read) { |
53 | return function next (abort, cb) { |
54 | if(aborted) return cb(aborted) |
55 | if(abort) { |
56 | aborted = abort |
57 | if(!busy) read(abort, cb) |
58 | else read(abort, function () { |
59 | //if we are still busy, wait for the mapper to complete. |
60 | if(busy) abortCb = cb |
61 | else cb(abort) |
62 | }) |
63 | } |
64 | else |
65 | read(null, function (end, data) { |
66 | if(end) { |
67 | cb(end) |
68 | if(abortCb) cb(end, data) |
69 | } |
70 | else { |
71 | busy = true |
72 | map(data, function (err, data) { |
73 | busy = false |
74 | if(aborted) { |
75 | cb(aborted) |
76 | abortCb(aborted) |
77 | } |
78 | else if(err) next (err, cb) |
79 | else cb(null, data) |
80 | }) |
81 | } |
82 | }) |
83 | } |
84 | } |
85 | } |
86 | |
87 | |
88 | function asyncMap (map) { |
89 | if(!map) return id //when read is passed, pass it on. |
90 | return function (read) { |
91 | return function (end, cb) { |
92 | if(end) return read(end, cb) //abort |
93 | read(null, function (end, data) { |
94 | if(end) return cb(end, data) |
95 | map(data, cb) |
96 | }) |
97 | } |
98 | } |
99 | } |
100 | |
101 | var filter = exports.filter = |
102 | function (test) { |
103 | //regexp |
104 | test = tester(test) |
105 | return function (read) { |
106 | return function next (end, cb) { |
107 | var sync, loop = true |
108 | while(loop) { |
109 | loop = false |
110 | sync = true |
111 | read(end, function (end, data) { |
112 | if(!end && !test(data)) |
113 | return sync ? loop = true : next(end, cb) |
114 | cb(end, data) |
115 | }) |
116 | sync = false |
117 | } |
118 | } |
119 | } |
120 | } |
121 | |
122 | var filterNot = exports.filterNot = |
123 | function (test) { |
124 | test = tester(test) |
125 | return filter(function (data) { return !test(data) }) |
126 | } |
127 | |
128 | //a pass through stream that doesn't change the value. |
129 | var through = exports.through = |
130 | function (op, onEnd) { |
131 | var a = false |
132 | |
133 | function once (abort) { |
134 | if(a || !onEnd) return |
135 | a = true |
136 | onEnd(abort === true ? null : abort) |
137 | } |
138 | |
139 | return function (read) { |
140 | return function (end, cb) { |
141 | if(end) once(end) |
142 | return read(end, function (end, data) { |
143 | if(!end) op && op(data) |
144 | else once(end) |
145 | cb(end, data) |
146 | }) |
147 | } |
148 | } |
149 | } |
150 | |
151 | //read a number of items and then stop. |
152 | var take = exports.take = |
153 | function (test, opts) { |
154 | opts = opts || {} |
155 | var last = opts.last || false // whether the first item for which !test(item) should still pass |
156 | var ended = false |
157 | if('number' === typeof test) { |
158 | last = true |
159 | var n = test; test = function () { |
160 | return --n |
161 | } |
162 | } |
163 | |
164 | return function (read) { |
165 | |
166 | function terminate (cb) { |
167 | read(true, function (err) { |
168 | last = false; cb(err || true) |
169 | }) |
170 | } |
171 | |
172 | return function (end, cb) { |
173 | if(ended) last ? terminate(cb) : cb(ended) |
174 | else if(ended = end) read(ended, cb) |
175 | else |
176 | read(null, function (end, data) { |
177 | if(ended = ended || end) { |
178 | //last ? terminate(cb) : |
179 | cb(ended) |
180 | } |
181 | else if(!test(data)) { |
182 | ended = true |
183 | last ? cb(null, data) : terminate(cb) |
184 | } |
185 | else |
186 | cb(null, data) |
187 | }) |
188 | } |
189 | } |
190 | } |
191 | |
192 | //drop items you have already seen. |
193 | var unique = exports.unique = function (field, invert) { |
194 | field = prop(field) || id |
195 | var seen = {} |
196 | return filter(function (data) { |
197 | var key = field(data) |
198 | if(seen[key]) return !!invert //false, by default |
199 | else seen[key] = true |
200 | return !invert //true by default |
201 | }) |
202 | } |
203 | |
204 | //passes an item through when you see it for the second time. |
205 | var nonUnique = exports.nonUnique = function (field) { |
206 | return unique(field, true) |
207 | } |
208 | |
209 | //convert a stream of arrays or streams into just a stream. |
210 | var flatten = exports.flatten = function () { |
211 | return function (read) { |
212 | var _read |
213 | return function (abort, cb) { |
214 | if (abort) { //abort the current stream, and then stream of streams. |
215 | _read ? _read(abort, function(err) { |
216 | read(err || abort, cb) |
217 | }) : read(abort, cb) |
218 | } |
219 | else if(_read) nextChunk() |
220 | else nextStream() |
221 | |
222 | function nextChunk () { |
223 | _read(null, function (err, data) { |
224 | if (err === true) nextStream() |
225 | else if (err) { |
226 | read(true, function(abortErr) { |
227 | // TODO: what do we do with the abortErr? |
228 | cb(err) |
229 | }) |
230 | } |
231 | else cb(null, data) |
232 | }) |
233 | } |
234 | function nextStream () { |
235 | _read = null |
236 | read(null, function (end, stream) { |
237 | if(end) |
238 | return cb(end) |
239 | if(Array.isArray(stream) || stream && 'object' === typeof stream) |
240 | stream = sources.values(stream) |
241 | else if('function' != typeof stream) |
242 | stream = sources.once(stream) |
243 | _read = stream |
244 | nextChunk() |
245 | }) |
246 | } |
247 | } |
248 | } |
249 | } |
250 | |
251 | |
252 | |
253 | |
254 | |
255 | |
256 | |
257 |
Built with git-ssb-web