git ssb

2+

Dominic / pull-stream



Tree: c5aa38a8e6f8e9147f38f32bcb908dc94f1bd651

Files: c5aa38a8e6f8e9147f38f32bcb908dc94f1bd651 / sources.js

3105 bytesRaw
1
2var readArray = exports.readArray = function (array) {
3 var i = 0
4 return function (end, cb) {
5 if(end)
6 return cb && cb(end)
7
8 cb(i >= array.length || null, array[i++])
9 }
10}
11
12var count = function () {
13 var i = 0
14 return function (end, cb) {
15 if(end) return cb && cb(end)
16 cb(null, i++)
17 }
18}
19
20var infinite = exports.infinite =
21function (generate) {
22 generate = generate || Math.random
23 return function (end, cb) {
24 if(end) return cb && cb(end)
25 return cb(null, generate())
26 }
27}
28var defer = exports.defer = function () {
29 var _read, _cb, _end
30
31 var read = function (end, cb) {
32 if(_cb && !_read) throw new Error(
33 'do not read twice' +
34 'without waiting for callback'
35 )
36 if(_read) return _read(end, cb)
37 _end = end, _cb = cb
38 }
39 read.resolve = function (read) {
40 if(_read) throw new Error('already resolved')
41 _read = read
42 if(_cb) _read(_end, _cb)
43 }
44 read.abort = function(err) {
45 read.resolve(function (_, cb) {
46 cb(err || true)
47 })
48 }
49 return read
50}
51
52var PushBuffer = exports.PushBuffer = function () {
53 var buffer = [], cbs = [], waiting = [], ended
54
55 function drain() {
56 while(waiting.length && (buffer.length || ended)) {
57 var data = buffer.shift()
58 var cb = cbs.shift()
59
60 waiting.shift()(ended, data)
61 cb && cb(ended)
62 }
63 }
64
65 function read (end, cb) {
66 ended = ended || end
67 waiting.push(cb)
68 drain()
69 }
70
71 read.push = function (data, cb) {
72 buffer.push(data); cbs.push(cb)
73 drain()
74 }
75
76 read.end = function (end, cb) {
77 if('function' === typeof end)
78 cb = end, end = true
79 ended = ended || end || true; cbs.push(cb)
80 drain()
81 }
82
83 return read
84
85}
86
87var depthFirst = exports.depthFirst =
88function (start, createStream) {
89 var reads = []
90
91 reads.unshift(createStream(start))
92
93 return function next (end, cb) {
94 if(!reads.length)
95 return cb(true)
96 reads[0](end, function (end, data) {
97 if(end) {
98 //if this stream has ended, go to the next queue
99 reads.shift()
100 return next(null, cb)
101 }
102 reads.unshift(createStream(data))
103 cb(end, data)
104 })
105 }
106}
107//width first is just like depth first,
108//but push each new stream onto the end of the queue
109var widthFirst = exports.widthFirst =
110function (start, createStream) {
111 var reads = []
112
113 reads.push(createStream(start))
114
115 return function next (end, cb) {
116 if(!reads.length)
117 return cb(true)
118 reads[0](end, function (end, data) {
119 if(end) {
120 reads.shift()
121 return next(null, cb)
122 }
123 reads.push(createStream(data))
124 cb(end, data)
125 })
126 }
127}
128
129
130var leafFirst = exports.leafFirst =
131function (start, createStream) {
132 var reads = []
133 var output = []
134 reads.push(createStream(start))
135
136 return function next (end, cb) {
137 reads[0](end, function (end, data) {
138 if(end) {
139 reads.shift()
140 if(!output.length)
141 return cb(true)
142 return cb(null, output.shift())
143 }
144 reads.unshift(createStream(data))
145 output.unshift(data)
146 next(null, cb)
147 })
148 }
149}
150
151

Built with git-ssb-web