git ssb

2+

Dominic / pull-stream



Tree: 4d8da84532dda783798716e49c5335f3daaffdf1

Files: 4d8da84532dda783798716e49c5335f3daaffdf1 / sources.js

2124 bytesRaw
1
2var readArray = exports.readArray = function (array) {
3 var i = 0
4 return function (end, cb) {
5 if(end)
6 return cb && cb(end)
7
8 cb(i >= array.length || null, array[i++])
9 }
10}
11
12var count = function () {
13 var i = 0
14 return function (end, cb) {
15 if(end) return cb && cb(end)
16 cb(null, i++)
17 }
18}
19
20var infinite = exports.infinite =
21function (generate) {
22 generate = generate || Math.random
23 return function (end, cb) {
24 if(end) return cb && cb(end)
25 return cb(null, generate())
26 }
27}
28var defer = exports.defer = function () {
29 var _read, _cb, _end
30
31 var read = function (end, cb) {
32 if(_cb && !_read) throw new Error(
33 'do not read twice' +
34 'without waiting for callback'
35 )
36 if(_read) return _read(end, cb)
37 _end = end, _cb = cb
38 }
39 read.resolve = function (read) {
40 if(_read) throw new Error('already resolved')
41 _read = read
42 if(_cb) _read(_end, _cb)
43 }
44 read.abort = function(err) {
45 read.resolve(function (_, cb) {
46 cb(err || true)
47 })
48 }
49 return read
50}
51
52var PushBuffer = exports.PushBuffer = function () {
53 var buffer = [], cbs = [], waiting = [], ended
54
55 function drain() {
56 while(waiting.length && (buffer.length || ended)) {
57 var data = buffer.shift()
58 var cb = cbs.shift()
59
60 waiting.shift()(ended, data)
61 cb && cb(ended)
62 }
63 }
64
65 function read (end, cb) {
66 ended = ended || end
67 waiting.push(cb)
68 drain()
69 }
70
71 read.push = function (data, cb) {
72 buffer.push(data); cbs.push(cb)
73 drain()
74 }
75
76 read.end = function (end, cb) {
77 if('function' === typeof end)
78 cb = end, end = true
79 ended = ended || end || true; cbs.push(cb)
80 drain()
81 }
82
83 return read
84
85}
86
87var depthFirst = exports.depthFirst =
88function (start, createStream) {
89 var reads = []
90
91 reads.unshift(createStream(start))
92
93 return function next (end, cb) {
94 if(!reads.length)
95 return cb(true)
96 reads[0](end, function (end, data) {
97 if(end) {
98 reads.shift()
99 return next(end === true ? null : end, cb)
100 }
101 reads.unshift(createStream(data))
102 cb(end, data)
103 })
104 }
105}
106

Built with git-ssb-web