git ssb

2+

Dominic / pull-stream



Tree: 17996d82d3e483a8721b9aa2b40cd1fed89e869a

Files: 17996d82d3e483a8721b9aa2b40cd1fed89e869a / throughs.js

1967 bytesRaw
1var k = 0
2var map = exports.map =
3function (read, map) {
4 var _k = k++
5 map = map || function (e) {return e}
6 return function (end, cb) {
7 read(end, function (end, data) {
8 cb(end, !end ? map(data) : null)
9 })
10 }
11}
12
13var filter = exports.filter =
14function (read, test) {
15 //regexp
16 if('object' === typeof test
17 && 'function' === typeof test.test)
18 test = test.test.bind(test)
19
20 return function next (end, cb) {
21 read(end, function (end, data) {
22 if(!end && !test(data))
23 return next(end, cb)
24 cb(end, data)
25 })
26 }
27}
28
29var through = exports.through =
30function (read, op) {
31 return function (end, cb) {
32 return read(end, function (end, data) {
33 if(!end) op && op(data)
34 cb(end, data)
35 })
36 }
37}
38
39var take = exports.take =
40function (read, test) {
41 var ended = false
42 if('number' === typeof test) {
43 var n = test; test = function () {
44 return n-- > 0
45 }
46 }
47 return function (end, cb) {
48 if(end) {
49 if(!ended) return ended = end, readable(end, cb)
50 cb(ended)
51 }
52 return read(null, function (end, data) {
53 if(end || !test(data)) return read(end || true, cb)
54 return cb(null, data)
55 })
56 }
57}
58
59var nextTick = process.nextTick
60
61var highWaterMark = exports.highWaterMark =
62function (read, highWaterMark) {
63 var buffer = [], waiting = [], ended, reading = false
64 highWaterMark = highWaterMark || 10
65
66 function readAhead () {
67 while(waiting.length && (buffer.length || ended))
68 waiting.shift()(ended, ended ? null : buffer.shift())
69 }
70
71 function next () {
72 if(ended || reading || buffer.length >= highWaterMark)
73 return
74 reading = true
75 return read(ended, function (end, data) {
76 reading = false
77 ended = ended || end
78 if(data != null) buffer.push(data)
79
80 next(); readAhead()
81 })
82 }
83
84 nextTick(next)
85
86 return function (end, cb) {
87 ended = ended || end
88 waiting.push(cb)
89
90 next(); readAhead()
91 }
92}
93
94

Built with git-ssb-web