git ssb

2+

Dominic / pull-stream



Tree: 9cbec93bf2fcddb09f5c4154a97fdcc4112a7328

Files: 9cbec93bf2fcddb09f5c4154a97fdcc4112a7328 / throughs.js

4988 bytesRaw
1'use strict';
2
3function id (item) { return item }
4
5function prop (key) {
6 return (
7 'string' == typeof key
8 ? function (data) { return data[key] }
9 : 'object' === typeof key && 'function' === typeof key.exec //regexp
10 ? function (data) { var v = map.exec(data); return v && v[0] }
11 : key
12 )
13}
14
15function tester (test) {
16 return (
17 'object' === typeof test && 'function' === typeof test.test //regexp
18 ? function (data) { return test.test(data) }
19 : prop (test) || id
20 )
21}
22
23var sources = require('./sources')
24var sinks = require('./sinks')
25
26var map = exports.map =
27function (map) {
28 if(!map) return id
29 map = prop(map)
30 return function (read) {
31 return function (abort, cb) {
32 read(abort, function (end, data) {
33 try {
34 data = !end ? map(data) : null
35 } catch (err) {
36 return read(err, function () {
37 return cb(err)
38 })
39 }
40 cb(end, data)
41 })
42 }
43 }
44}
45
46var asyncMap = exports.asyncMap =
47function (map) {
48 if(!map) return id //when read is passed, pass it on.
49 return function (read) {
50 return function (end, cb) {
51 if(end) return read(end, cb) //abort
52 read(null, function (end, data) {
53 if(end) return cb(end, data)
54 map(data, cb)
55 })
56 }
57 }
58}
59
60var filter = exports.filter =
61function (test) {
62 //regexp
63 test = tester(test)
64 return function (read) {
65 return function next (end, cb) {
66 var sync, loop = true
67 while(loop) {
68 loop = false
69 sync = true
70 read(end, function (end, data) {
71 if(!end && !test(data))
72 return sync ? loop = true : next(end, cb)
73 cb(end, data)
74 })
75 sync = false
76 }
77 }
78 }
79}
80
81var filterNot = exports.filterNot =
82function (test) {
83 test = tester(test)
84 return filter(function (data) { return !test(data) })
85}
86
87//a pass through stream that doesn't change the value.
88var through = exports.through =
89function (op, onEnd) {
90 var a = false
91
92 function once (abort) {
93 if(a || !onEnd) return
94 a = true
95 onEnd(abort === true ? null : abort)
96 }
97
98 return function (read) {
99 return function (end, cb) {
100 if(end) once(end)
101 return read(end, function (end, data) {
102 if(!end) op && op(data)
103 else once(end)
104 cb(end, data)
105 })
106 }
107 }
108}
109
110//read a number of items and then stop.
111var take = exports.take =
112function (test, opts) {
113 opts = opts || {}
114 var last = opts.last || false // whether the first item for which !test(item) should still pass
115 var ended = false
116 if('number' === typeof test) {
117 last = true
118 var n = test; test = function () {
119 return --n
120 }
121 }
122
123 return function (read) {
124
125 function terminate (cb) {
126 read(true, function (err) {
127 last = false; cb(err || true)
128 })
129 }
130
131 return function (end, cb) {
132 if(ended) last ? terminate(cb) : cb(ended)
133 else if(ended = end) read(ended, cb)
134 else
135 read(null, function (end, data) {
136 if(ended = ended || end) {
137 //last ? terminate(cb) :
138 cb(ended)
139 }
140 else if(!test(data)) {
141 ended = true
142 last ? cb(null, data) : terminate(cb)
143 }
144 else
145 cb(null, data)
146 })
147 }
148 }
149}
150
151//drop items you have already seen.
152var unique = exports.unique = function (field, invert) {
153 field = prop(field) || id
154 var seen = {}
155 return filter(function (data) {
156 var key = field(data)
157 if(seen[key]) return !!invert //false, by default
158 else seen[key] = true
159 return !invert //true by default
160 })
161}
162
163//passes an item through when you see it for the second time.
164var nonUnique = exports.nonUnique = function (field) {
165 return unique(field, true)
166}
167
168//convert a stream of arrays or streams into just a stream.
169var flatten = exports.flatten = function () {
170 return function (read) {
171 var _read
172 return function (abort, cb) {
173 if (abort) { //abort the current stream, and then stream of streams.
174 _read ? _read(abort, function(err) {
175 read(err || abort, cb)
176 }) : read(abort, cb)
177 }
178 else if(_read) nextChunk()
179 else nextStream()
180
181 function nextChunk () {
182 _read(null, function (err, data) {
183 if (err === true) nextStream()
184 else if (err) {
185 read(true, function(abortErr) {
186 // TODO: what do we do with the abortErr?
187 cb(err)
188 })
189 }
190 else cb(null, data)
191 })
192 }
193 function nextStream () {
194 _read = null
195 read(null, function (end, stream) {
196 if(end)
197 return cb(end)
198 if(Array.isArray(stream) || stream && 'object' === typeof stream)
199 stream = sources.values(stream)
200 else if('function' != typeof stream)
201 throw new Error('expected stream of streams')
202 _read = stream
203 nextChunk()
204 })
205 }
206 }
207 }
208}
209
210

Built with git-ssb-web