Files: 964094904bbed175a3bc8fa4490d7c39b57b2ca9 / throughs.js
5600 bytesRaw
1 | ; |
2 | |
3 | function id (item) { return item } |
4 | |
5 | function prop (key) { |
6 | return ( |
7 | 'string' == typeof key |
8 | ? function (data) { return data[key] } |
9 | : 'object' === typeof key && 'function' === typeof key.exec //regexp |
10 | ? function (data) { var v = map.exec(data); return v && v[0] } |
11 | : key |
12 | ) |
13 | } |
14 | |
15 | function tester (test) { |
16 | return ( |
17 | 'object' === typeof test && 'function' === typeof test.test //regexp |
18 | ? function (data) { return test.test(data) } |
19 | : prop (test) || id |
20 | ) |
21 | } |
22 | |
23 | var sources = require('./sources') |
24 | var sinks = require('./sinks') |
25 | |
26 | var map = exports.map = |
27 | |
28 | function (map) { |
29 | if(!map) return id |
30 | map = prop(map) |
31 | return function (read) { |
32 | return function (abort, cb) { |
33 | read(abort, function (end, data) { |
34 | try { |
35 | data = !end ? map(data) : null |
36 | } catch (err) { |
37 | return read(err, function () { |
38 | return cb(err) |
39 | }) |
40 | } |
41 | cb(end, data) |
42 | }) |
43 | } |
44 | } |
45 | } |
46 | |
47 | var asyncMap = exports.asyncMap = |
48 | function async (map) { |
49 | if(!map) return id |
50 | map = prop(map) |
51 | var busy = false, abortCb, aborted |
52 | return function (read) { |
53 | return function next (abort, cb) { |
54 | if(aborted) return cb(aborted) |
55 | if(abort) { |
56 | aborted = abort |
57 | if(!busy) read(abort, cb) |
58 | else read(abort, function () { |
59 | //if we are still busy, wait for the mapper to complete. |
60 | if(busy) abortCb = cb |
61 | else cb(abort) |
62 | }) |
63 | } |
64 | else |
65 | read(null, function (end, data) { |
66 | if(end) cb(end) |
67 | else if(aborted) cb(aborted) |
68 | else { |
69 | busy = true |
70 | map(data, function (err, data) { |
71 | busy = false |
72 | if(aborted) { |
73 | cb(aborted) |
74 | abortCb(aborted) |
75 | } |
76 | else if(err) next (err, cb) |
77 | else cb(null, data) |
78 | }) |
79 | } |
80 | }) |
81 | } |
82 | } |
83 | } |
84 | |
85 | var filter = exports.filter = |
86 | function (test) { |
87 | //regexp |
88 | test = tester(test) |
89 | return function (read) { |
90 | return function next (end, cb) { |
91 | var sync, loop = true |
92 | while(loop) { |
93 | loop = false |
94 | sync = true |
95 | read(end, function (end, data) { |
96 | if(!end && !test(data)) |
97 | return sync ? loop = true : next(end, cb) |
98 | cb(end, data) |
99 | }) |
100 | sync = false |
101 | } |
102 | } |
103 | } |
104 | } |
105 | |
106 | var filterNot = exports.filterNot = |
107 | function (test) { |
108 | test = tester(test) |
109 | return filter(function (data) { return !test(data) }) |
110 | } |
111 | |
112 | //a pass through stream that doesn't change the value. |
113 | var through = exports.through = |
114 | function (op, onEnd) { |
115 | var a = false |
116 | |
117 | function once (abort) { |
118 | if(a || !onEnd) return |
119 | a = true |
120 | onEnd(abort === true ? null : abort) |
121 | } |
122 | |
123 | return function (read) { |
124 | return function (end, cb) { |
125 | if(end) once(end) |
126 | return read(end, function (end, data) { |
127 | if(!end) op && op(data) |
128 | else once(end) |
129 | cb(end, data) |
130 | }) |
131 | } |
132 | } |
133 | } |
134 | |
135 | //read a number of items and then stop. |
136 | var take = exports.take = |
137 | function (test, opts) { |
138 | opts = opts || {} |
139 | var last = opts.last || false // whether the first item for which !test(item) should still pass |
140 | var ended = false |
141 | if('number' === typeof test) { |
142 | last = true |
143 | var n = test; test = function () { |
144 | return --n |
145 | } |
146 | } |
147 | |
148 | return function (read) { |
149 | |
150 | function terminate (cb) { |
151 | read(true, function (err) { |
152 | last = false; cb(err || true) |
153 | }) |
154 | } |
155 | |
156 | return function (end, cb) { |
157 | if(ended) last ? terminate(cb) : cb(ended) |
158 | else if(ended = end) read(ended, cb) |
159 | else |
160 | read(null, function (end, data) { |
161 | if(ended = ended || end) { |
162 | //last ? terminate(cb) : |
163 | cb(ended) |
164 | } |
165 | else if(!test(data)) { |
166 | ended = true |
167 | last ? cb(null, data) : terminate(cb) |
168 | } |
169 | else |
170 | cb(null, data) |
171 | }) |
172 | } |
173 | } |
174 | } |
175 | |
176 | //drop items you have already seen. |
177 | var unique = exports.unique = function (field, invert) { |
178 | field = prop(field) || id |
179 | var seen = {} |
180 | return filter(function (data) { |
181 | var key = field(data) |
182 | if(seen[key]) return !!invert //false, by default |
183 | else seen[key] = true |
184 | return !invert //true by default |
185 | }) |
186 | } |
187 | |
188 | //passes an item through when you see it for the second time. |
189 | var nonUnique = exports.nonUnique = function (field) { |
190 | return unique(field, true) |
191 | } |
192 | |
193 | //convert a stream of arrays or streams into just a stream. |
194 | var flatten = exports.flatten = function () { |
195 | return function (read) { |
196 | var _read |
197 | return function (abort, cb) { |
198 | if (abort) { //abort the current stream, and then stream of streams. |
199 | _read ? _read(abort, function(err) { |
200 | read(err || abort, cb) |
201 | }) : read(abort, cb) |
202 | } |
203 | else if(_read) nextChunk() |
204 | else nextStream() |
205 | |
206 | function nextChunk () { |
207 | _read(null, function (err, data) { |
208 | if (err === true) nextStream() |
209 | else if (err) { |
210 | read(true, function(abortErr) { |
211 | // TODO: what do we do with the abortErr? |
212 | cb(err) |
213 | }) |
214 | } |
215 | else cb(null, data) |
216 | }) |
217 | } |
218 | function nextStream () { |
219 | _read = null |
220 | read(null, function (end, stream) { |
221 | if(end) |
222 | return cb(end) |
223 | if(Array.isArray(stream) || stream && 'object' === typeof stream) |
224 | stream = sources.values(stream) |
225 | else if('function' != typeof stream) |
226 | stream = sources.once(stream) |
227 | _read = stream |
228 | nextChunk() |
229 | }) |
230 | } |
231 | } |
232 | } |
233 | } |
234 | |
235 |
Built with git-ssb-web