git ssb

2+

Dominic / pull-stream



Tree: e75b6ac2254345793473e239af19644d38aa30d1

Files: e75b6ac2254345793473e239af19644d38aa30d1 / throughs.js

1607 bytesRaw
1var map = function (map) {
2 map = map || function (e) {return e}
3 return function (readable) {
4 return function (reader) {
5 return reader(function (end, cb) {
6 readable(end, function (end, data) {
7 cb(end, map(data))
8 })
9 })
10 }
11 }
12}
13
14var dumb = function (read, op) {
15 return function (end, cb) {
16 return read(end, function (end, data) {
17 op && op(data)
18 cb(end, data)
19 })
20 }
21}
22
23var take = function (read, test) {
24 var ended = false
25 if('number' === typeof test) {
26 var n = test; test = function () {
27 return n-- > 0
28 }
29 }
30 return function (end, cb) {
31 if(end) {
32 if(!ended) return ended = end, readable(end, cb)
33 cb(ended)
34 }
35 return read(null, function (end, data) {
36 if(end || !test(data)) return read(end || true, cb)
37 return cb(null, data)
38 })
39 }
40}
41
42
43var nextTick = process.nextTick
44var highWaterMark = function (read, highWaterMark) {
45 var buffer = [], waiting = [], ended, reading = false
46 highWaterMark = highWaterMark || 10
47
48 function readAhead () {
49 while(waiting.length && (buffer.length || ended))
50 waiting.shift()(ended, ended ? null : buffer.shift())
51 }
52
53 function next () {
54 if(ended || reading || buffer.length >= highWaterMark)
55 return
56 reading = true
57 return read(ended, function (end, data) {
58 reading = false
59 ended = ended || end
60 if(data != null) buffer.push(data)
61
62 next(); readAhead()
63 })
64 }
65
66 nextTick(next)
67
68 return function (end, cb) {
69 ended = ended || end
70 waiting.push(cb)
71
72 next(); readAhead()
73 }
74}
75
76

Built with git-ssb-web