git ssb

2+

Dominic / pull-stream



Tree: aafb43a723f744ee19b4b8d5815b8fb95b9f4386

Files: aafb43a723f744ee19b4b8d5815b8fb95b9f4386 / throughs.js

2353 bytesRaw
1var k = 0
2var map = exports.map =
3function (read, map) {
4 var _k = k++
5 if('string' == typeof map) {
6 var key = map
7 map = function (data) { return data[key] }
8 }
9 map = map || function (e) {return e}
10 return function (end, cb) {
11 read(end, function (end, data) {
12 var data = !end ? map(data) : null
13 cb(end, data)
14 })
15 }
16}
17
18var filter = exports.filter =
19function (read, test) {
20 //regexp
21 if('object' === typeof test
22 && 'function' === typeof test.test)
23 test = test.test.bind(test)
24 test = test || function (data) {return !!data}
25 return function next (end, cb) {
26 read(end, function (end, data) {
27 if(!end && !test(data))
28 return next(end, cb)
29 cb(end, data)
30 })
31 }
32}
33
34var through = exports.through =
35function (read, op) {
36 return function (end, cb) {
37 return read(end, function (end, data) {
38 if(!end) op && op(data)
39 cb(end, data)
40 })
41 }
42}
43
44var take = exports.take =
45function (read, test) {
46 var ended = false
47 if('number' === typeof test) {
48 var n = test; test = function () {
49 return n-- > 0
50 }
51 }
52 return function (end, cb) {
53 if(end) {
54 if(!ended) return ended = end, readable(end, cb)
55 cb(ended)
56 }
57 return read(null, function (end, data) {
58 if(ended) return
59 if(end) return cb(ended = end)
60 //TODO, CHECK THAT END LOGIC IS CORRECT WITH TAKE!!!
61 if(!test(data)) {
62 ended = true
63 nextTick(function () {
64 read(true, function (){})
65 })
66 return cb(true)
67 }
68 return cb(null, data)
69 })
70 }
71}
72
73var nextTick = process.nextTick
74
75var highWaterMark = exports.highWaterMark =
76function (read, highWaterMark) {
77 var buffer = [], waiting = [], ended, reading = false
78 highWaterMark = highWaterMark || 10
79
80 function readAhead () {
81 while(waiting.length && (buffer.length || ended))
82 waiting.shift()(ended, ended ? null : buffer.shift())
83 }
84
85 function next () {
86 if(ended || reading || buffer.length >= highWaterMark)
87 return
88 reading = true
89 return read(ended, function (end, data) {
90 reading = false
91 ended = ended || end
92 if(data != null) buffer.push(data)
93
94 next(); readAhead()
95 })
96 }
97
98 nextTick(next)
99
100 return function (end, cb) {
101 ended = ended || end
102 waiting.push(cb)
103
104 next(); readAhead()
105 }
106}
107
108

Built with git-ssb-web