git ssb

2+

Dominic / pull-stream



Tree: 139ae4f839a516ef981ea28e0e41fb6f8cc9702e

Files: 139ae4f839a516ef981ea28e0e41fb6f8cc9702e / throughs.js

3717 bytesRaw
1function prop (map) {
2 if('string' == typeof map) {
3 var key = map
4 return function (data) { return data[key] }
5 }
6 return map
7}
8
9function id (item) {
10 return item
11}
12
13var k = 0
14var map = exports.map =
15function (read, map) {
16 var _k = k++
17 map = prop(map) || id
18 return function (end, cb) {
19 read(end, function (end, data) {
20 var data = !end ? map(data) : null
21 cb(end, data)
22 })
23 }
24}
25
26var asyncMap = exports.asyncMap =
27function (read, map) {
28 if(!map) return read
29 return function (end, cb) {
30 if(end) return read(end, cb) //abort
31 read(null, function (end, data) {
32 if(end) return cb(end, data)
33 map(data, cb)
34 })
35 }
36}
37
38var filter = exports.filter =
39function (read, test) {
40 //regexp
41 if('object' === typeof test
42 && 'function' === typeof test.test)
43 test = test.test.bind(test)
44 test = prop(test) || id
45 return function next (end, cb) {
46 read(end, function (end, data) {
47 if(!end && !test(data))
48 return next(end, cb)
49 cb(end, data)
50 })
51 }
52}
53
54var through = exports.through =
55function (read, op) {
56 return function (end, cb) {
57 return read(end, function (end, data) {
58 if(!end) op && op(data)
59 cb(end, data)
60 })
61 }
62}
63
64var take = exports.take =
65function (read, test) {
66 var ended = false
67 if('number' === typeof test) {
68 var n = test; test = function () {
69 return n-- > 0
70 }
71 }
72 return function (end, cb) {
73 if(end) {
74 if(!ended) return ended = end, read(end, cb)
75 cb(ended)
76 }
77 return read(null, function (end, data) {
78 if(ended) return
79 if(end) return cb(ended = end)
80 //TODO, CHECK THAT END LOGIC IS CORRECT WITH TAKE!!!
81 if(!test(data)) {
82 ended = true
83 nextTick(function () {
84 read(true, function (){})
85 })
86 return cb(true)
87 }
88 return cb(null, data)
89 })
90 }
91}
92
93var unique = exports.unique = function (read, field, invert) {
94 field = prop(field) || id
95 var seen = {}
96 return filter(read, function (data) {
97 var key = field(data)
98 if(seen[key]) return !!invert //false, by default
99 else seen[key] = true
100 return !invert //true by default
101 })
102}
103
104var nonUnique = exports.nonUnique = function (read, field) {
105 return unique(read, field, true)
106}
107
108var group = exports.group =
109function (read, size) {
110 var ended; size = size || 5
111 var queue = []
112
113 return function (end, cb) {
114 //this means that the upstream is sending an error.
115 if(end) return read(ended = end, cb)
116 //this means that we read an end before.
117 if(ended) return cb(ended)
118
119 read(null, function next(end, data) {
120 if(ended = ended || end) {
121 if(!queue.length)
122 return cb(ended)
123
124 var _queue = queue; queue = []
125 return cb(null, _queue)
126 }
127 queue.push(data)
128 if(queue.length < size)
129 return read(null, next)
130
131 var _queue = queue; queue = []
132 cb(null, _queue)
133 })
134 }
135}
136
137var nextTick = process.nextTick
138
139var highWaterMark = exports.highWaterMark =
140function (read, highWaterMark) {
141 var buffer = [], waiting = [], ended, reading = false
142 highWaterMark = highWaterMark || 10
143
144 function readAhead () {
145 while(waiting.length && (buffer.length || ended))
146 waiting.shift()(ended, ended ? null : buffer.shift())
147 }
148
149 function next () {
150 if(ended || reading || buffer.length >= highWaterMark)
151 return
152 reading = true
153 return read(ended, function (end, data) {
154 reading = false
155 ended = ended || end
156 if(data != null) buffer.push(data)
157
158 next(); readAhead()
159 })
160 }
161
162 nextTick(next)
163
164 return function (end, cb) {
165 ended = ended || end
166 waiting.push(cb)
167
168 next(); readAhead()
169 }
170}
171
172

Built with git-ssb-web