git ssb

2+

Dominic / pull-stream



Tree: a99aae179ec016d2d93b179e1a4b316be4343dc0

Files: a99aae179ec016d2d93b179e1a4b316be4343dc0 / throughs.js

4153 bytesRaw
1function prop (map) {
2 if('string' == typeof map) {
3 var key = map
4 return function (data) { return data[key] }
5 }
6 return map
7}
8
9function id (item) {
10 return item
11}
12
13var k = 0
14var map = exports.map =
15function (read, map) {
16 var _k = k++
17 map = prop(map) || id
18 return function (end, cb) {
19 read(end, function (end, data) {
20 var data = !end ? map(data) : null
21 cb(end, data)
22 })
23 }
24}
25
26var asyncMap = exports.asyncMap =
27function (read, map) {
28 if(!map) return read
29 return function (end, cb) {
30 if(end) return read(end, cb) //abort
31 read(null, function (end, data) {
32 if(end) return cb(end, data)
33 map(data, cb)
34 })
35 }
36}
37
38var filter = exports.filter =
39function (read, test) {
40 //regexp
41 if('object' === typeof test
42 && 'function' === typeof test.test)
43 test = test.test.bind(test)
44 test = prop(test) || id
45 return function next (end, cb) {
46 read(end, function (end, data) {
47 if(!end && !test(data))
48 return next(end, cb)
49 cb(end, data)
50 })
51 }
52}
53
54var through = exports.through =
55function (read, op) {
56 return function (end, cb) {
57 return read(end, function (end, data) {
58 if(!end) op && op(data)
59 cb(end, data)
60 })
61 }
62}
63
64var take = exports.take =
65function (read, test) {
66 var ended = false
67 if('number' === typeof test) {
68 var n = test; test = function () {
69 return n-- > 0
70 }
71 }
72 return function (end, cb) {
73 if(end) {
74 if(!ended) return ended = end, read(end, cb)
75 cb(ended)
76 }
77 return read(null, function (end, data) {
78 if(ended) return
79 if(end) return cb(ended = end)
80 //TODO, CHECK THAT END LOGIC IS CORRECT WITH TAKE!!!
81 if(!test(data)) {
82 ended = true
83 nextTick(function () {
84 read(true, function (){})
85 })
86 return cb(true)
87 }
88 return cb(null, data)
89 })
90 }
91}
92
93var unique = exports.unique = function (read, field, invert) {
94 field = prop(field) || id
95 var seen = {}
96 return filter(read, function (data) {
97 var key = field(data)
98 if(seen[key]) return !!invert //false, by default
99 else seen[key] = true
100 return !invert //true by default
101 })
102}
103
104var nonUnique = exports.nonUnique = function (read, field) {
105 return unique(read, field, true)
106}
107
108var group = exports.group =
109function (read, size) {
110 var ended; size = size || 5
111 var queue = []
112
113 return function (end, cb) {
114 //this means that the upstream is sending an error.
115 if(end) return read(ended = end, cb)
116 //this means that we read an end before.
117 if(ended) return cb(ended)
118
119 read(null, function next(end, data) {
120 if(ended = ended || end) {
121 if(!queue.length)
122 return cb(ended)
123
124 var _queue = queue; queue = []
125 return cb(null, _queue)
126 }
127 queue.push(data)
128 if(queue.length < size)
129 return read(null, next)
130
131 var _queue = queue; queue = []
132 cb(null, _queue)
133 })
134 }
135}
136
137var flatten = exports.flatten = function (read) {
138 var chunk
139 return function (end, cb) {
140 //this means that the upstream is sending an error.
141 if(end) return read(ended = end, cb)
142
143 if(chunk && chunk.length)
144 return cb(null, chunk.shift())
145
146 read(null, function (err, data) {
147 if(err) return cb(err)
148 chunk = data
149
150 if(chunk && chunk.length)
151 return cb(null, chunk.shift())
152 })
153 }
154}
155
156var nextTick = process.nextTick
157
158var highWaterMark = exports.highWaterMark =
159function (read, highWaterMark) {
160 var buffer = [], waiting = [], ended, reading = false
161 highWaterMark = highWaterMark || 10
162
163 function readAhead () {
164 while(waiting.length && (buffer.length || ended))
165 waiting.shift()(ended, ended ? null : buffer.shift())
166 }
167
168 function next () {
169 if(ended || reading || buffer.length >= highWaterMark)
170 return
171 reading = true
172 return read(ended, function (end, data) {
173 reading = false
174 ended = ended || end
175 if(data != null) buffer.push(data)
176
177 next(); readAhead()
178 })
179 }
180
181 nextTick(next)
182
183 return function (end, cb) {
184 ended = ended || end
185 waiting.push(cb)
186
187 next(); readAhead()
188 }
189}
190
191

Built with git-ssb-web