git ssb

2+

Dominic / pull-stream



Tree: 545734ec6384c1b2b71f539ea4acfe56af123b1e

Files: 545734ec6384c1b2b71f539ea4acfe56af123b1e / throughs.js

4192 bytesRaw
1function prop (map) {
2 if('string' == typeof map) {
3 var key = map
4 return function (data) { return data[key] }
5 }
6 return map
7}
8
9function id (item) {
10 return item
11}
12
13var k = 0
14var map = exports.map =
15function (read, map) {
16 var _k = k++
17 map = prop(map) || id
18 return function (end, cb) {
19 read(end, function (end, data) {
20 var data = !end ? map(data) : null
21 cb(end, data)
22 })
23 }
24}
25
26var asyncMap = exports.asyncMap =
27function (read, map) {
28 if(!map) return read
29 return function (end, cb) {
30 if(end) return read(end, cb) //abort
31 read(null, function (end, data) {
32 if(end) return cb(end, data)
33 map(data, cb)
34 })
35 }
36}
37
38var filter = exports.filter =
39function (read, test) {
40 //regexp
41 if('object' === typeof test
42 && 'function' === typeof test.test)
43 test = test.test.bind(test)
44 test = prop(test) || id
45 return function next (end, cb) {
46 read(end, function (end, data) {
47 if(!end && !test(data))
48 return next(end, cb)
49 cb(end, data)
50 })
51 }
52}
53
54var through = exports.through =
55function (read, op, onEnd) {
56 var a = false
57 function once (abort) {
58 if(a || !onEnd) return
59 a = true
60 onEnd(abort === true ? null : abort)
61 }
62
63 return function (end, cb) {
64 return read(end, function (end, data) {
65 if(!end) op && op(data)
66 else once(end)
67 cb(end, data)
68 })
69 }
70}
71
72var take = exports.take =
73function (read, test) {
74 var ended = false
75 if('number' === typeof test) {
76 var n = test; test = function () {
77 return n --> 0
78 }
79 }
80 return function (end, cb) {
81 if(ended = ended || end)
82 return read(ended, cb)
83
84 read(null, function (end, data) {
85 if(ended = ended || end) return cb(ended)
86 //TODO, CHECK THAT END LOGIC IS CORRECT WITH TAKE!!!
87 if(!test(data)) {
88 ended = true
89 read(true, cb)
90 }
91 else
92 cb(null, data)
93 })
94 }
95}
96
97var unique = exports.unique = function (read, field, invert) {
98 field = prop(field) || id
99 var seen = {}
100 return filter(read, function (data) {
101 var key = field(data)
102 if(seen[key]) return !!invert //false, by default
103 else seen[key] = true
104 return !invert //true by default
105 })
106}
107
108var nonUnique = exports.nonUnique = function (read, field) {
109 return unique(read, field, true)
110}
111
112var group = exports.group =
113function (read, size) {
114 var ended; size = size || 5
115 var queue = []
116
117 return function (end, cb) {
118 //this means that the upstream is sending an error.
119 if(end) return read(ended = end, cb)
120 //this means that we read an end before.
121 if(ended) return cb(ended)
122
123 read(null, function next(end, data) {
124 if(ended = ended || end) {
125 if(!queue.length)
126 return cb(ended)
127
128 var _queue = queue; queue = []
129 return cb(null, _queue)
130 }
131 queue.push(data)
132 if(queue.length < size)
133 return read(null, next)
134
135 var _queue = queue; queue = []
136 cb(null, _queue)
137 })
138 }
139}
140
141var flatten = exports.flatten = function (read) {
142 var chunk
143 return function (end, cb) {
144 //this means that the upstream is sending an error.
145 if(end) return read(ended = end, cb)
146
147 if(chunk && chunk.length)
148 return cb(null, chunk.shift())
149
150 read(null, function (err, data) {
151 if(err) return cb(err)
152 chunk = data
153
154 if(chunk && chunk.length)
155 return cb(null, chunk.shift())
156 })
157 }
158}
159
160var nextTick = process.nextTick
161
162var highWaterMark = exports.highWaterMark =
163function (read, highWaterMark) {
164 var buffer = [], waiting = [], ended, reading = false
165 highWaterMark = highWaterMark || 10
166
167 function readAhead () {
168 while(waiting.length && (buffer.length || ended))
169 waiting.shift()(ended, ended ? null : buffer.shift())
170 }
171
172 function next () {
173 if(ended || reading || buffer.length >= highWaterMark)
174 return
175 reading = true
176 return read(ended, function (end, data) {
177 reading = false
178 ended = ended || end
179 if(data != null) buffer.push(data)
180
181 next(); readAhead()
182 })
183 }
184
185 nextTick(next)
186
187 return function (end, cb) {
188 ended = ended || end
189 waiting.push(cb)
190
191 next(); readAhead()
192 }
193}
194
195
196
197

Built with git-ssb-web