git ssb

2+

Dominic / pull-stream



Commit 81f79662ff89630e177adfbe2880c52d5a5a97b4

initial

Dominic Tarr committed on 3/22/2013, 7:09:03 AM

Files changed

index.jsadded
index.jsView
@@ -1,0 +1,200 @@
1+
2+var readArray = exports.readArray = function (array) {
3+ var i = 0
4+ return function (reader) {
5+ reader(function (end, cb) {
6+ if(end)
7+ return cb && cb(end)
8+
9+ cb(i >= array.length || null, array[i++])
10+ })
11+ }
12+}
13+
14+var count = function () {
15+ var i = 0
16+ return function (reader) {
17+ return reader(function (end, cb) {
18+ if(end) return cb && cb(end)
19+ cb(null, i++)
20+ })
21+ }
22+}
23+
24+var destack = function (n) {
25+ var i = 0; n = n || 10, waiting = [], queued = false, ended = false
26+ return function (readable) {
27+ return function (reader) {
28+ return reader(function (end, cb) {
29+ ended = ended || end
30+ if(i ++ < n) {
31+ return readable(end, cb)
32+ } else {
33+ process.nextTick(function () {
34+ i = 0
35+ readable(end, cb)
36+ })
37+ }
38+ })
39+ }
40+ }
41+}
42+
43+var map = function (map) {
44+ map = map || function (e) {return e}
45+ return function (readable) {
46+ return function (reader) {
47+ return reader(function (end, cb) {
48+ readable(end, function (end, data) {
49+ cb(end, map(data))
50+ })
51+ })
52+ }
53+ }
54+}
55+
56+var drain = exports.drain = function (op) {
57+ return function (readable) {
58+ readable(null, function read (err, data) {
59+ if(err) return
60+ op && op(data)
61+ readable(null, read)
62+ })
63+ //write-only
64+ }
65+}
66+
67+var through = function () {
68+ return function (readable) {
69+ return function (reader) {
70+ return reader(function (end, cb) {
71+ return reader(end, cb)
72+ })
73+ }
74+ }
75+}
76+
77+function pipeable (_reader) {
78+ return function () {
79+ var args = [].slice.call(arguments)
80+ return function (readable) {
81+ args.unshift(readable)
82+ return function (reader) {
83+ return reader(_reader.apply(null, args))
84+ }
85+ }
86+ }
87+}
88+
89+var dumb = pipeable(function (readable, op) {
90+ return function (end, cb) {
91+ return readable(end, function (end, data) {
92+ op && op(data)
93+ cb(end, data)
94+ })
95+ }
96+})
97+
98+//var smart = pipeable(dumb)
99+
100+var take = pipeable(function (readable, test) {
101+ var ended = false
102+ if('number' === typeof test) {
103+ var n = test; test = function () {
104+ return n-- > 0
105+ }
106+ }
107+ return function (end, cb) {
108+ if(end) {
109+ if(!ended) return ended = end, readable(end, cb)
110+ cb(ended)
111+ }
112+ return readable(null, function (end, data) {
113+ if(end || !test(data)) return readable(end || true, cb)
114+ return cb(null, data)
115+ })
116+ }
117+})
118+
119+var nextTick = process.nextTick
120+var highWaterMark = pipeable(function (readable, highWaterMark) {
121+ var buffer = [], waiting = [], ended, reading = false
122+ highWaterMark = highWaterMark || 10
123+
124+ function read () {
125+ while(waiting.length && (buffer.length || ended))
126+ waiting.shift()(ended, ended ? null : buffer.shift())
127+ }
128+
129+ function next () {
130+ if(ended || reading || buffer.length >= highWaterMark)
131+ return
132+ reading = true
133+ return readable(ended, function (end, data) {
134+ reading = false
135+ ended = ended || end
136+ if(data != null) buffer.push(data)
137+
138+ next(); read()
139+ })
140+ }
141+
142+ process.nextTick(next)
143+
144+ return function (end, cb) {
145+ ended = ended || end
146+ waiting.push(cb)
147+
148+ next(); read()
149+ }
150+})
151+
152+function writeArray(cb) {
153+ var array = []
154+ return function (readable) {
155+ ;(function next () {
156+ return readable(null, function (end, data) {
157+ if(end) return cb(end == true ? null : end, array)
158+ array.push(data)
159+ next()
160+ })
161+ })()
162+ return function () { throw new Error('write-only') }
163+ }
164+}
165+
166+/*
167+var asyncMapSerial = pipeable(function (readable, map) {
168+ var reading = false
169+ return function (end, cb) {
170+ if(reading) throw new Error('one at a time, please!')
171+ reading == true
172+ return readable(end, function (end, data) {
173+ map(data, function (err, data) {
174+ reading = false
175+
176+
177+ })
178+ })
179+ }
180+})
181+*/
182+
183+if(!module.parent)
184+
185+ count()
186+ (destack ())
187+ (take(20))
188+ (highWaterMark(2))
189+ (writeArray(console.log))
190+// (drain(console.log))
191+
192+/* (function (readable) {
193+ return readable(null, function next (e, d) {
194+ if (e) return
195+ return readable(e, next)
196+ })
197+ })*/
198+//*/
199+// drain (destack (count()), console.log)
200+

Built with git-ssb-web