git ssb

2+

Dominic / pull-stream



Tree: 4d8da84532dda783798716e49c5335f3daaffdf1

Files: 4d8da84532dda783798716e49c5335f3daaffdf1 / index.js

3169 bytesRaw
1
2var sources = require('./sources')
3var sinks = require('./sinks')
4var throughs = require('./throughs')
5
6for(var k in sources)
7 exports[k] = pipeableSource(sources[k])
8
9for(var k in throughs)
10 exports[k] = pipeable(throughs[k])
11
12for(var k in sinks)
13 exports[k] = pipeableSink(sinks[k])
14
15exports.pipeableSource = pipeableSource
16exports.pipeable = pipeable
17exports.pipeableSink = pipeableSink
18
19function addPipe(read) {
20 if('function' !== typeof read)
21 return read
22
23 read.pipe = read.pipe || function (reader) {
24 if('function' != typeof reader)
25 throw new Error('must pipe to reader')
26 return addPipe(reader(read))
27 }
28
29 return read
30}
31
32function pipeableSource (createRead) {
33 return function () {
34 var args = [].slice.call(arguments)
35 return addPipe(createRead.apply(null, args))
36 }
37}
38
39function pipeable (createRead) {
40 return function () {
41 var args = [].slice.call(arguments)
42 var piped = []
43 function reader (read) {
44 args.unshift(read)
45 read = createRead.apply(null, args)
46 while(piped.length)
47 read = piped.shift()(read)
48 return read
49 //pipeing to from this reader should compose...
50 }
51 reader.pipe = function (read) {
52 piped.push(read)
53 return reader
54 }
55 return reader
56 }
57}
58
59function pipeableSink(createReader) {
60 return function () {
61 var args = [].slice.call(arguments)
62 return function (read) {
63 args.unshift(read)
64 return createReader.apply(null, args)
65 }
66 }
67}
68
69var destack = function (n) {
70 var i = 0; n = n || 10, waiting = [], queued = false, ended = false
71 return function (readable) {
72 return function (reader) {
73 return reader(function (end, cb) {
74 ended = ended || end
75 if(i ++ < n) {
76 return readable(end, cb)
77 } else {
78 process.nextTick(function () {
79 i = 0
80 readable(end, cb)
81 })
82 }
83 })
84 }
85 }
86}
87
88
89function writeArray(cb) {
90 var array = []
91 return function (readable) {
92 ;(function next () {
93 return readable(null, function (end, data) {
94 if(end) return cb(end == true ? null : end, array)
95 array.push(data)
96 next()
97 })
98 })()
99 return function () { throw new Error('write-only') }
100 }
101}
102
103/*
104var compose = function () {
105 var streams = [].slice.call(arguments)
106 return function (readable) {
107 return function (reader) {
108 while(streams.length)
109 readable = streams.shift()(readable)
110
111 return reader(readable)
112 }
113 }
114}
115
116var duplex = function (_reader, _readable) {
117 return function (readable) {
118 return function (reader) {
119 _reader(readable);
120 reader(_readable);
121 }
122 }
123}
124*/
125
126if(!module.parent)
127
128 count()
129 (destack ())
130 (take(20))
131 (highWaterMark(2))
132 /*(compose(map(function (e) {
133 return e * 1000
134 }),
135 map(function (e) {
136 return Math.round(e / 3)
137 }))
138 )*/
139 (writeArray(console.log))
140// (drain(console.log))
141
142/* (function (readable) {
143 return readable(null, function next (e, d) {
144 if (e) return
145 return readable(e, next)
146 })
147 })*/
148//*/
149// drain (destack (count()), console.log)
150
151

Built with git-ssb-web