git ssb

2+

Dominic / pull-stream



Tree: 77e8971d09fb121162d025a7851aca84b3d5562f

Files: 77e8971d09fb121162d025a7851aca84b3d5562f / throughs.js

5937 bytesRaw
1'use strict';
2
3function id (item) { return item }
4
5function prop (key) {
6 return (
7 'string' == typeof key
8 ? function (data) { return data[key] }
9 : 'object' === typeof key && 'function' === typeof key.exec //regexp
10 ? function (data) { var v = map.exec(data); return v && v[0] }
11 : key
12 )
13}
14
15function tester (test) {
16 return (
17 'object' === typeof test && 'function' === typeof test.test //regexp
18 ? function (data) { return test.test(data) }
19 : prop (test) || id
20 )
21}
22
23var sources = require('./sources')
24var sinks = require('./sinks')
25
26var map = exports.map =
27
28function (map) {
29 if(!map) return id
30 map = prop(map)
31 return function (read) {
32 return function (abort, cb) {
33 read(abort, function (end, data) {
34 try {
35 data = !end ? map(data) : null
36 } catch (err) {
37 return read(err, function () {
38 return cb(err)
39 })
40 }
41 cb(end, data)
42 })
43 }
44 }
45}
46
47var asyncMap = exports.asyncMap =
48function async (map) {
49 if(!map) return id
50 map = prop(map)
51 var busy = false, abortCb, aborted
52 return function (read) {
53 return function next (abort, cb) {
54 if(aborted) return cb(aborted)
55 if(abort) {
56 aborted = abort
57 if(!busy) read(abort, cb)
58 else read(abort, function () {
59 //if we are still busy, wait for the mapper to complete.
60 if(busy) abortCb = cb
61 else cb(abort)
62 })
63 }
64 else
65 read(null, function (end, data) {
66 if(end) {
67 cb(end)
68 if(abortCb) cb(end, data)
69 }
70 else {
71 busy = true
72 map(data, function (err, data) {
73 busy = false
74 if(aborted) {
75 cb(aborted)
76 abortCb(aborted)
77 }
78 else if(err) next (err, cb)
79 else cb(null, data)
80 })
81 }
82 })
83 }
84 }
85}
86
87
88function asyncMap (map) {
89 if(!map) return id //when read is passed, pass it on.
90 return function (read) {
91 return function (end, cb) {
92 if(end) return read(end, cb) //abort
93 read(null, function (end, data) {
94 if(end) return cb(end, data)
95 map(data, cb)
96 })
97 }
98 }
99}
100
101var filter = exports.filter =
102function (test) {
103 //regexp
104 test = tester(test)
105 return function (read) {
106 return function next (end, cb) {
107 var sync, loop = true
108 while(loop) {
109 loop = false
110 sync = true
111 read(end, function (end, data) {
112 if(!end && !test(data))
113 return sync ? loop = true : next(end, cb)
114 cb(end, data)
115 })
116 sync = false
117 }
118 }
119 }
120}
121
122var filterNot = exports.filterNot =
123function (test) {
124 test = tester(test)
125 return filter(function (data) { return !test(data) })
126}
127
128//a pass through stream that doesn't change the value.
129var through = exports.through =
130function (op, onEnd) {
131 var a = false
132
133 function once (abort) {
134 if(a || !onEnd) return
135 a = true
136 onEnd(abort === true ? null : abort)
137 }
138
139 return function (read) {
140 return function (end, cb) {
141 if(end) once(end)
142 return read(end, function (end, data) {
143 if(!end) op && op(data)
144 else once(end)
145 cb(end, data)
146 })
147 }
148 }
149}
150
151//read a number of items and then stop.
152var take = exports.take =
153function (test, opts) {
154 opts = opts || {}
155 var last = opts.last || false // whether the first item for which !test(item) should still pass
156 var ended = false
157 if('number' === typeof test) {
158 last = true
159 var n = test; test = function () {
160 return --n
161 }
162 }
163
164 return function (read) {
165
166 function terminate (cb) {
167 read(true, function (err) {
168 last = false; cb(err || true)
169 })
170 }
171
172 return function (end, cb) {
173 if(ended) last ? terminate(cb) : cb(ended)
174 else if(ended = end) read(ended, cb)
175 else
176 read(null, function (end, data) {
177 if(ended = ended || end) {
178 //last ? terminate(cb) :
179 cb(ended)
180 }
181 else if(!test(data)) {
182 ended = true
183 last ? cb(null, data) : terminate(cb)
184 }
185 else
186 cb(null, data)
187 })
188 }
189 }
190}
191
192//drop items you have already seen.
193var unique = exports.unique = function (field, invert) {
194 field = prop(field) || id
195 var seen = {}
196 return filter(function (data) {
197 var key = field(data)
198 if(seen[key]) return !!invert //false, by default
199 else seen[key] = true
200 return !invert //true by default
201 })
202}
203
204//passes an item through when you see it for the second time.
205var nonUnique = exports.nonUnique = function (field) {
206 return unique(field, true)
207}
208
209//convert a stream of arrays or streams into just a stream.
210var flatten = exports.flatten = function () {
211 return function (read) {
212 var _read
213 return function (abort, cb) {
214 if (abort) { //abort the current stream, and then stream of streams.
215 _read ? _read(abort, function(err) {
216 read(err || abort, cb)
217 }) : read(abort, cb)
218 }
219 else if(_read) nextChunk()
220 else nextStream()
221
222 function nextChunk () {
223 _read(null, function (err, data) {
224 if (err === true) nextStream()
225 else if (err) {
226 read(true, function(abortErr) {
227 // TODO: what do we do with the abortErr?
228 cb(err)
229 })
230 }
231 else cb(null, data)
232 })
233 }
234 function nextStream () {
235 _read = null
236 read(null, function (end, stream) {
237 if(end)
238 return cb(end)
239 if(Array.isArray(stream) || stream && 'object' === typeof stream)
240 stream = sources.values(stream)
241 else if('function' != typeof stream)
242 stream = sources.once(stream)
243 _read = stream
244 nextChunk()
245 })
246 }
247 }
248 }
249}
250
251
252
253
254
255
256
257

Built with git-ssb-web