git ssb

2+

Dominic / pull-stream



Tree: c4caefd7c6cd6984ee0223c391cd4ce43118cf29

Files: c4caefd7c6cd6984ee0223c391cd4ce43118cf29 / throughs.js

2368 bytesRaw
1var k = 0
2var map = exports.map =
3function (read, map) {
4 var _k = k++
5 if('string' == typeof map)
6 function (data) { return data[key] }
7 map = map || function (e) {return e}
8 return function (end, cb) {
9 read(end, function (end, data) {
10 var data = !end ? map(data) : null
11 cb(end, data)
12 })
13 }
14}
15
16var filter = exports.filter =
17function (read, test) {
18 //regexp
19 if('object' === typeof test
20 && 'function' === typeof test.test)
21 test = test.test.bind(test)
22 test = test || function (data) {return !!data}
23 return function next (end, cb) {
24 read(end, function (end, data) {
25 if(!end && !test(data))
26 return next(end, cb)
27 cb(end, data)
28 })
29 }
30}
31
32var through = exports.through =
33function (read, op) {
34 return function (end, cb) {
35 return read(end, function (end, data) {
36 if(!end) op && op(data)
37 cb(end, data)
38 })
39 }
40}
41
42var take = exports.take =
43function (read, test) {
44 var ended = false
45 if('number' === typeof test) {
46 var n = test; test = function () {
47 return n-- > 0
48 }
49 }
50 return function (end, cb) {
51 if(end) {
52 if(!ended) return ended = end, readable(end, cb)
53 cb(ended)
54 }
55 return read(null, function (end, data) {
56 if(ended) return
57 console.log('take?', end, !test(data))
58 if(end) return cb(ended = end)
59 //TODO, CHECK THAT END LOGIC IS CORRECT WITH TAKE!!!
60 if(!test(data)) {
61 ended = true
62 nextTick(function () {
63 read(true, function (){})
64 })
65 return cb(true)
66 }
67 return cb(null, data)
68 })
69 }
70}
71
72var nextTick = process.nextTick
73
74var highWaterMark = exports.highWaterMark =
75function (read, highWaterMark) {
76 var buffer = [], waiting = [], ended, reading = false
77 highWaterMark = highWaterMark || 10
78
79 function readAhead () {
80 while(waiting.length && (buffer.length || ended))
81 waiting.shift()(ended, ended ? null : buffer.shift())
82 }
83
84 function next () {
85 if(ended || reading || buffer.length >= highWaterMark)
86 return
87 reading = true
88 return read(ended, function (end, data) {
89 reading = false
90 ended = ended || end
91 if(data != null) buffer.push(data)
92
93 next(); readAhead()
94 })
95 }
96
97 nextTick(next)
98
99 return function (end, cb) {
100 ended = ended || end
101 waiting.push(cb)
102
103 next(); readAhead()
104 }
105}
106
107

Built with git-ssb-web