Files: 139ae4f839a516ef981ea28e0e41fb6f8cc9702e / throughs.js
3717 bytesRaw
1 | function prop (map) { |
2 | if('string' == typeof map) { |
3 | var key = map |
4 | return function (data) { return data[key] } |
5 | } |
6 | return map |
7 | } |
8 | |
9 | function id (item) { |
10 | return item |
11 | } |
12 | |
13 | var k = 0 |
14 | var map = exports.map = |
15 | function (read, map) { |
16 | var _k = k++ |
17 | map = prop(map) || id |
18 | return function (end, cb) { |
19 | read(end, function (end, data) { |
20 | var data = !end ? map(data) : null |
21 | cb(end, data) |
22 | }) |
23 | } |
24 | } |
25 | |
26 | var asyncMap = exports.asyncMap = |
27 | function (read, map) { |
28 | if(!map) return read |
29 | return function (end, cb) { |
30 | if(end) return read(end, cb) //abort |
31 | read(null, function (end, data) { |
32 | if(end) return cb(end, data) |
33 | map(data, cb) |
34 | }) |
35 | } |
36 | } |
37 | |
38 | var filter = exports.filter = |
39 | function (read, test) { |
40 | //regexp |
41 | if('object' === typeof test |
42 | && 'function' === typeof test.test) |
43 | test = test.test.bind(test) |
44 | test = prop(test) || id |
45 | return function next (end, cb) { |
46 | read(end, function (end, data) { |
47 | if(!end && !test(data)) |
48 | return next(end, cb) |
49 | cb(end, data) |
50 | }) |
51 | } |
52 | } |
53 | |
54 | var through = exports.through = |
55 | function (read, op) { |
56 | return function (end, cb) { |
57 | return read(end, function (end, data) { |
58 | if(!end) op && op(data) |
59 | cb(end, data) |
60 | }) |
61 | } |
62 | } |
63 | |
64 | var take = exports.take = |
65 | function (read, test) { |
66 | var ended = false |
67 | if('number' === typeof test) { |
68 | var n = test; test = function () { |
69 | return n-- > 0 |
70 | } |
71 | } |
72 | return function (end, cb) { |
73 | if(end) { |
74 | if(!ended) return ended = end, read(end, cb) |
75 | cb(ended) |
76 | } |
77 | return read(null, function (end, data) { |
78 | if(ended) return |
79 | if(end) return cb(ended = end) |
80 | //TODO, CHECK THAT END LOGIC IS CORRECT WITH TAKE!!! |
81 | if(!test(data)) { |
82 | ended = true |
83 | nextTick(function () { |
84 | read(true, function (){}) |
85 | }) |
86 | return cb(true) |
87 | } |
88 | return cb(null, data) |
89 | }) |
90 | } |
91 | } |
92 | |
93 | var unique = exports.unique = function (read, field, invert) { |
94 | field = prop(field) || id |
95 | var seen = {} |
96 | return filter(read, function (data) { |
97 | var key = field(data) |
98 | if(seen[key]) return !!invert //false, by default |
99 | else seen[key] = true |
100 | return !invert //true by default |
101 | }) |
102 | } |
103 | |
104 | var nonUnique = exports.nonUnique = function (read, field) { |
105 | return unique(read, field, true) |
106 | } |
107 | |
108 | var group = exports.group = |
109 | function (read, size) { |
110 | var ended; size = size || 5 |
111 | var queue = [] |
112 | |
113 | return function (end, cb) { |
114 | //this means that the upstream is sending an error. |
115 | if(end) return read(ended = end, cb) |
116 | //this means that we read an end before. |
117 | if(ended) return cb(ended) |
118 | |
119 | read(null, function next(end, data) { |
120 | if(ended = ended || end) { |
121 | if(!queue.length) |
122 | return cb(ended) |
123 | |
124 | var _queue = queue; queue = [] |
125 | return cb(null, _queue) |
126 | } |
127 | queue.push(data) |
128 | if(queue.length < size) |
129 | return read(null, next) |
130 | |
131 | var _queue = queue; queue = [] |
132 | cb(null, _queue) |
133 | }) |
134 | } |
135 | } |
136 | |
137 | var nextTick = process.nextTick |
138 | |
139 | var highWaterMark = exports.highWaterMark = |
140 | function (read, highWaterMark) { |
141 | var buffer = [], waiting = [], ended, reading = false |
142 | highWaterMark = highWaterMark || 10 |
143 | |
144 | function readAhead () { |
145 | while(waiting.length && (buffer.length || ended)) |
146 | waiting.shift()(ended, ended ? null : buffer.shift()) |
147 | } |
148 | |
149 | function next () { |
150 | if(ended || reading || buffer.length >= highWaterMark) |
151 | return |
152 | reading = true |
153 | return read(ended, function (end, data) { |
154 | reading = false |
155 | ended = ended || end |
156 | if(data != null) buffer.push(data) |
157 | |
158 | next(); readAhead() |
159 | }) |
160 | } |
161 | |
162 | nextTick(next) |
163 | |
164 | return function (end, cb) { |
165 | ended = ended || end |
166 | waiting.push(cb) |
167 | |
168 | next(); readAhead() |
169 | } |
170 | } |
171 | |
172 |
Built with git-ssb-web