git ssb

2+

Dominic / pull-stream



Tree: 17996d82d3e483a8721b9aa2b40cd1fed89e869a

Files: 17996d82d3e483a8721b9aa2b40cd1fed89e869a / sources.js

3277 bytesRaw
1
2var readArray = exports.readArray = function (array) {
3 var i = 0
4 return function (end, cb) {
5 if(end)
6 return cb && cb(end)
7
8 cb(i >= array.length || null, array[i++])
9 }
10}
11
12var count = function (max) {
13 var i = 0; max = max || Infinity
14 return function (end, cb) {
15 if(end) return cb && cb(end)
16 if(max <= i)
17 return cb(true)
18 cb(null, i++)
19 }
20}
21
22var infinite = exports.infinite =
23function (generate) {
24 generate = generate || Math.random
25 return function (end, cb) {
26 if(end) return cb && cb(end)
27 return cb(null, generate())
28 }
29}
30var defer = exports.defer = function () {
31 var _read, _cb, _end
32
33 var read = function (end, cb) {
34 if(_cb && !_read) throw new Error(
35 'do not read twice' +
36 'without waiting for callback'
37 )
38 if(_read) return _read(end, cb)
39 _end = end, _cb = cb
40 }
41 read.resolve = function (read) {
42 if(_read) throw new Error('already resolved')
43 _read = read
44 if(_cb) _read(_end, _cb)
45 }
46 read.abort = function(err) {
47 read.resolve(function (_, cb) {
48 cb(err || true)
49 })
50 }
51 return read
52}
53var pushable = exports.pushable = function () {
54 var buffer = [], cbs = [], waiting = [], ended
55
56 function drain() {
57 while(waiting.length && (buffer.length || ended)) {
58 var data = buffer.shift()
59 var cb = cbs.shift()
60
61 waiting.shift()(ended, data)
62 cb && cb(ended)
63 }
64 }
65
66 function read (end, cb) {
67 ended = ended || end
68 waiting.push(cb)
69 drain()
70 }
71
72 read.push = function (data, cb) {
73 buffer.push(data); cbs.push(cb)
74 drain()
75 }
76
77 read.end = function (end, cb) {
78 if('function' === typeof end)
79 cb = end, end = true
80 ended = ended || end || true; cbs.push(cb)
81 drain()
82 }
83
84 return read
85
86}
87
88var depthFirst = exports.depthFirst =
89function (start, createStream) {
90 var reads = []
91
92 reads.unshift(createStream(start))
93
94 return function next (end, cb) {
95 if(!reads.length)
96 return cb(true)
97 reads[0](end, function (end, data) {
98 if(end) {
99 //if this stream has ended, go to the next queue
100 reads.shift()
101 return next(null, cb)
102 }
103 reads.unshift(createStream(data))
104 cb(end, data)
105 })
106 }
107}
108//width first is just like depth first,
109//but push each new stream onto the end of the queue
110var widthFirst = exports.widthFirst =
111function (start, createStream) {
112 var reads = []
113
114 reads.push(createStream(start))
115
116 return function next (end, cb) {
117 if(!reads.length)
118 return cb(true)
119 reads[0](end, function (end, data) {
120 if(end) {
121 reads.shift()
122 return next(null, cb)
123 }
124 reads.push(createStream(data))
125 cb(end, data)
126 })
127 }
128}
129
130//this came out different to the first (strm)
131//attempt at leafFirst, but it's still a valid
132//topological sort.
133var leafFirst = exports.leafFirst =
134function (start, createStream) {
135 var reads = []
136 var output = []
137 reads.push(createStream(start))
138
139 return function next (end, cb) {
140 reads[0](end, function (end, data) {
141 if(end) {
142 reads.shift()
143 if(!output.length)
144 return cb(true)
145 return cb(null, output.shift())
146 }
147 reads.unshift(createStream(data))
148 output.unshift(data)
149 next(null, cb)
150 })
151 }
152}
153
154

Built with git-ssb-web