git ssb

2+

Dominic / pull-stream



Tree: 9624740f4ee88cb0bc96ed4cc1a0c295f6cef895

Files: 9624740f4ee88cb0bc96ed4cc1a0c295f6cef895 / throughs.js

5600 bytesRaw
1'use strict';
2
3function id (item) { return item }
4
5function prop (key) {
6 return (
7 'string' == typeof key
8 ? function (data) { return data[key] }
9 : 'object' === typeof key && 'function' === typeof key.exec //regexp
10 ? function (data) { var v = map.exec(data); return v && v[0] }
11 : key
12 )
13}
14
15function tester (test) {
16 return (
17 'object' === typeof test && 'function' === typeof test.test //regexp
18 ? function (data) { return test.test(data) }
19 : prop (test) || id
20 )
21}
22
23var sources = require('./sources')
24var sinks = require('./sinks')
25
26var map = exports.map =
27
28function (map) {
29 if(!map) return id
30 map = prop(map)
31 return function (read) {
32 return function (abort, cb) {
33 read(abort, function (end, data) {
34 try {
35 data = !end ? map(data) : null
36 } catch (err) {
37 return read(err, function () {
38 return cb(err)
39 })
40 }
41 cb(end, data)
42 })
43 }
44 }
45}
46
47var asyncMap = exports.asyncMap =
48function async (map) {
49 if(!map) return id
50 map = prop(map)
51 var busy = false, abortCb, aborted
52 return function (read) {
53 return function next (abort, cb) {
54 if(aborted) return cb(aborted)
55 if(abort) {
56 aborted = abort
57 if(!busy) read(abort, cb)
58 else read(abort, function () {
59 //if we are still busy, wait for the mapper to complete.
60 if(busy) abortCb = cb
61 else cb(abort)
62 })
63 }
64 else
65 read(null, function (end, data) {
66 if(end) cb(end)
67 else if(aborted) cb(aborted)
68 else {
69 busy = true
70 map(data, function (err, data) {
71 busy = false
72 if(aborted) {
73 cb(aborted)
74 abortCb(aborted)
75 }
76 else if(err) next (err, cb)
77 else cb(null, data)
78 })
79 }
80 })
81 }
82 }
83}
84
85var filter = exports.filter =
86function (test) {
87 //regexp
88 test = tester(test)
89 return function (read) {
90 return function next (end, cb) {
91 var sync, loop = true
92 while(loop) {
93 loop = false
94 sync = true
95 read(end, function (end, data) {
96 if(!end && !test(data))
97 return sync ? loop = true : next(end, cb)
98 cb(end, data)
99 })
100 sync = false
101 }
102 }
103 }
104}
105
106var filterNot = exports.filterNot =
107function (test) {
108 test = tester(test)
109 return filter(function (data) { return !test(data) })
110}
111
112//a pass through stream that doesn't change the value.
113var through = exports.through =
114function (op, onEnd) {
115 var a = false
116
117 function once (abort) {
118 if(a || !onEnd) return
119 a = true
120 onEnd(abort === true ? null : abort)
121 }
122
123 return function (read) {
124 return function (end, cb) {
125 if(end) once(end)
126 return read(end, function (end, data) {
127 if(!end) op && op(data)
128 else once(end)
129 cb(end, data)
130 })
131 }
132 }
133}
134
135//read a number of items and then stop.
136var take = exports.take =
137function (test, opts) {
138 opts = opts || {}
139 var last = opts.last || false // whether the first item for which !test(item) should still pass
140 var ended = false
141 if('number' === typeof test) {
142 last = true
143 var n = test; test = function () {
144 return --n
145 }
146 }
147
148 return function (read) {
149
150 function terminate (cb) {
151 read(true, function (err) {
152 last = false; cb(err || true)
153 })
154 }
155
156 return function (end, cb) {
157 if(ended) last ? terminate(cb) : cb(ended)
158 else if(ended = end) read(ended, cb)
159 else
160 read(null, function (end, data) {
161 if(ended = ended || end) {
162 //last ? terminate(cb) :
163 cb(ended)
164 }
165 else if(!test(data)) {
166 ended = true
167 last ? cb(null, data) : terminate(cb)
168 }
169 else
170 cb(null, data)
171 })
172 }
173 }
174}
175
176//drop items you have already seen.
177var unique = exports.unique = function (field, invert) {
178 field = prop(field) || id
179 var seen = {}
180 return filter(function (data) {
181 var key = field(data)
182 if(seen[key]) return !!invert //false, by default
183 else seen[key] = true
184 return !invert //true by default
185 })
186}
187
188//passes an item through when you see it for the second time.
189var nonUnique = exports.nonUnique = function (field) {
190 return unique(field, true)
191}
192
193//convert a stream of arrays or streams into just a stream.
194var flatten = exports.flatten = function () {
195 return function (read) {
196 var _read
197 return function (abort, cb) {
198 if (abort) { //abort the current stream, and then stream of streams.
199 _read ? _read(abort, function(err) {
200 read(err || abort, cb)
201 }) : read(abort, cb)
202 }
203 else if(_read) nextChunk()
204 else nextStream()
205
206 function nextChunk () {
207 _read(null, function (err, data) {
208 if (err === true) nextStream()
209 else if (err) {
210 read(true, function(abortErr) {
211 // TODO: what do we do with the abortErr?
212 cb(err)
213 })
214 }
215 else cb(null, data)
216 })
217 }
218 function nextStream () {
219 _read = null
220 read(null, function (end, stream) {
221 if(end)
222 return cb(end)
223 if(Array.isArray(stream) || stream && 'object' === typeof stream)
224 stream = sources.values(stream)
225 else if('function' != typeof stream)
226 stream = sources.once(stream)
227 _read = stream
228 nextChunk()
229 })
230 }
231 }
232 }
233}
234
235

Built with git-ssb-web