Files: 9cbec93bf2fcddb09f5c4154a97fdcc4112a7328 / throughs.js
4988 bytesRaw
1 | ; |
2 | |
3 | function id (item) { return item } |
4 | |
5 | function prop (key) { |
6 | return ( |
7 | 'string' == typeof key |
8 | ? function (data) { return data[key] } |
9 | : 'object' === typeof key && 'function' === typeof key.exec //regexp |
10 | ? function (data) { var v = map.exec(data); return v && v[0] } |
11 | : key |
12 | ) |
13 | } |
14 | |
15 | function tester (test) { |
16 | return ( |
17 | 'object' === typeof test && 'function' === typeof test.test //regexp |
18 | ? function (data) { return test.test(data) } |
19 | : prop (test) || id |
20 | ) |
21 | } |
22 | |
23 | var sources = require('./sources') |
24 | var sinks = require('./sinks') |
25 | |
26 | var map = exports.map = |
27 | function (map) { |
28 | if(!map) return id |
29 | map = prop(map) |
30 | return function (read) { |
31 | return function (abort, cb) { |
32 | read(abort, function (end, data) { |
33 | try { |
34 | data = !end ? map(data) : null |
35 | } catch (err) { |
36 | return read(err, function () { |
37 | return cb(err) |
38 | }) |
39 | } |
40 | cb(end, data) |
41 | }) |
42 | } |
43 | } |
44 | } |
45 | |
46 | var asyncMap = exports.asyncMap = |
47 | function (map) { |
48 | if(!map) return id //when read is passed, pass it on. |
49 | return function (read) { |
50 | return function (end, cb) { |
51 | if(end) return read(end, cb) //abort |
52 | read(null, function (end, data) { |
53 | if(end) return cb(end, data) |
54 | map(data, cb) |
55 | }) |
56 | } |
57 | } |
58 | } |
59 | |
60 | var filter = exports.filter = |
61 | function (test) { |
62 | //regexp |
63 | test = tester(test) |
64 | return function (read) { |
65 | return function next (end, cb) { |
66 | var sync, loop = true |
67 | while(loop) { |
68 | loop = false |
69 | sync = true |
70 | read(end, function (end, data) { |
71 | if(!end && !test(data)) |
72 | return sync ? loop = true : next(end, cb) |
73 | cb(end, data) |
74 | }) |
75 | sync = false |
76 | } |
77 | } |
78 | } |
79 | } |
80 | |
81 | var filterNot = exports.filterNot = |
82 | function (test) { |
83 | test = tester(test) |
84 | return filter(function (data) { return !test(data) }) |
85 | } |
86 | |
87 | //a pass through stream that doesn't change the value. |
88 | var through = exports.through = |
89 | function (op, onEnd) { |
90 | var a = false |
91 | |
92 | function once (abort) { |
93 | if(a || !onEnd) return |
94 | a = true |
95 | onEnd(abort === true ? null : abort) |
96 | } |
97 | |
98 | return function (read) { |
99 | return function (end, cb) { |
100 | if(end) once(end) |
101 | return read(end, function (end, data) { |
102 | if(!end) op && op(data) |
103 | else once(end) |
104 | cb(end, data) |
105 | }) |
106 | } |
107 | } |
108 | } |
109 | |
110 | //read a number of items and then stop. |
111 | var take = exports.take = |
112 | function (test, opts) { |
113 | opts = opts || {} |
114 | var last = opts.last || false // whether the first item for which !test(item) should still pass |
115 | var ended = false |
116 | if('number' === typeof test) { |
117 | last = true |
118 | var n = test; test = function () { |
119 | return --n |
120 | } |
121 | } |
122 | |
123 | return function (read) { |
124 | |
125 | function terminate (cb) { |
126 | read(true, function (err) { |
127 | last = false; cb(err || true) |
128 | }) |
129 | } |
130 | |
131 | return function (end, cb) { |
132 | if(ended) last ? terminate(cb) : cb(ended) |
133 | else if(ended = end) read(ended, cb) |
134 | else |
135 | read(null, function (end, data) { |
136 | if(ended = ended || end) { |
137 | //last ? terminate(cb) : |
138 | cb(ended) |
139 | } |
140 | else if(!test(data)) { |
141 | ended = true |
142 | last ? cb(null, data) : terminate(cb) |
143 | } |
144 | else |
145 | cb(null, data) |
146 | }) |
147 | } |
148 | } |
149 | } |
150 | |
151 | //drop items you have already seen. |
152 | var unique = exports.unique = function (field, invert) { |
153 | field = prop(field) || id |
154 | var seen = {} |
155 | return filter(function (data) { |
156 | var key = field(data) |
157 | if(seen[key]) return !!invert //false, by default |
158 | else seen[key] = true |
159 | return !invert //true by default |
160 | }) |
161 | } |
162 | |
163 | //passes an item through when you see it for the second time. |
164 | var nonUnique = exports.nonUnique = function (field) { |
165 | return unique(field, true) |
166 | } |
167 | |
168 | //convert a stream of arrays or streams into just a stream. |
169 | var flatten = exports.flatten = function () { |
170 | return function (read) { |
171 | var _read |
172 | return function (abort, cb) { |
173 | if (abort) { //abort the current stream, and then stream of streams. |
174 | _read ? _read(abort, function(err) { |
175 | read(err || abort, cb) |
176 | }) : read(abort, cb) |
177 | } |
178 | else if(_read) nextChunk() |
179 | else nextStream() |
180 | |
181 | function nextChunk () { |
182 | _read(null, function (err, data) { |
183 | if (err === true) nextStream() |
184 | else if (err) { |
185 | read(true, function(abortErr) { |
186 | // TODO: what do we do with the abortErr? |
187 | cb(err) |
188 | }) |
189 | } |
190 | else cb(null, data) |
191 | }) |
192 | } |
193 | function nextStream () { |
194 | _read = null |
195 | read(null, function (end, stream) { |
196 | if(end) |
197 | return cb(end) |
198 | if(Array.isArray(stream) || stream && 'object' === typeof stream) |
199 | stream = sources.values(stream) |
200 | else if('function' != typeof stream) |
201 | throw new Error('expected stream of streams') |
202 | _read = stream |
203 | nextChunk() |
204 | }) |
205 | } |
206 | } |
207 | } |
208 | } |
209 | |
210 |
Built with git-ssb-web