Files: 4d8da84532dda783798716e49c5335f3daaffdf1 / index.js
3169 bytesRaw
1 | |
2 | var sources = require('./sources') |
3 | var sinks = require('./sinks') |
4 | var throughs = require('./throughs') |
5 | |
6 | for(var k in sources) |
7 | exports[k] = pipeableSource(sources[k]) |
8 | |
9 | for(var k in throughs) |
10 | exports[k] = pipeable(throughs[k]) |
11 | |
12 | for(var k in sinks) |
13 | exports[k] = pipeableSink(sinks[k]) |
14 | |
15 | exports.pipeableSource = pipeableSource |
16 | exports.pipeable = pipeable |
17 | exports.pipeableSink = pipeableSink |
18 | |
19 | function addPipe(read) { |
20 | if('function' !== typeof read) |
21 | return read |
22 | |
23 | read.pipe = read.pipe || function (reader) { |
24 | if('function' != typeof reader) |
25 | throw new Error('must pipe to reader') |
26 | return addPipe(reader(read)) |
27 | } |
28 | |
29 | return read |
30 | } |
31 | |
32 | function pipeableSource (createRead) { |
33 | return function () { |
34 | var args = [].slice.call(arguments) |
35 | return addPipe(createRead.apply(null, args)) |
36 | } |
37 | } |
38 | |
39 | function pipeable (createRead) { |
40 | return function () { |
41 | var args = [].slice.call(arguments) |
42 | var piped = [] |
43 | function reader (read) { |
44 | args.unshift(read) |
45 | read = createRead.apply(null, args) |
46 | while(piped.length) |
47 | read = piped.shift()(read) |
48 | return read |
49 | //pipeing to from this reader should compose... |
50 | } |
51 | reader.pipe = function (read) { |
52 | piped.push(read) |
53 | return reader |
54 | } |
55 | return reader |
56 | } |
57 | } |
58 | |
59 | function pipeableSink(createReader) { |
60 | return function () { |
61 | var args = [].slice.call(arguments) |
62 | return function (read) { |
63 | args.unshift(read) |
64 | return createReader.apply(null, args) |
65 | } |
66 | } |
67 | } |
68 | |
69 | var destack = function (n) { |
70 | var i = 0; n = n || 10, waiting = [], queued = false, ended = false |
71 | return function (readable) { |
72 | return function (reader) { |
73 | return reader(function (end, cb) { |
74 | ended = ended || end |
75 | if(i ++ < n) { |
76 | return readable(end, cb) |
77 | } else { |
78 | process.nextTick(function () { |
79 | i = 0 |
80 | readable(end, cb) |
81 | }) |
82 | } |
83 | }) |
84 | } |
85 | } |
86 | } |
87 | |
88 | |
89 | function writeArray(cb) { |
90 | var array = [] |
91 | return function (readable) { |
92 | ;(function next () { |
93 | return readable(null, function (end, data) { |
94 | if(end) return cb(end == true ? null : end, array) |
95 | array.push(data) |
96 | next() |
97 | }) |
98 | })() |
99 | return function () { throw new Error('write-only') } |
100 | } |
101 | } |
102 | |
103 | /* |
104 | var compose = function () { |
105 | var streams = [].slice.call(arguments) |
106 | return function (readable) { |
107 | return function (reader) { |
108 | while(streams.length) |
109 | readable = streams.shift()(readable) |
110 | |
111 | return reader(readable) |
112 | } |
113 | } |
114 | } |
115 | |
116 | var duplex = function (_reader, _readable) { |
117 | return function (readable) { |
118 | return function (reader) { |
119 | _reader(readable); |
120 | reader(_readable); |
121 | } |
122 | } |
123 | } |
124 | */ |
125 | |
126 | if(!module.parent) |
127 | |
128 | count() |
129 | (destack ()) |
130 | (take(20)) |
131 | (highWaterMark(2)) |
132 | /*(compose(map(function (e) { |
133 | return e * 1000 |
134 | }), |
135 | map(function (e) { |
136 | return Math.round(e / 3) |
137 | })) |
138 | )*/ |
139 | (writeArray(console.log)) |
140 | // (drain(console.log)) |
141 | |
142 | /* (function (readable) { |
143 | return readable(null, function next (e, d) { |
144 | if (e) return |
145 | return readable(e, next) |
146 | }) |
147 | })*/ |
148 | //*/ |
149 | // drain (destack (count()), console.log) |
150 | |
151 |
Built with git-ssb-web