git ssb

2+

Dominic / pull-stream



Tree: 34424ec7200a18fdb8f79642df2598fd98f5c1ca

Files: 34424ec7200a18fdb8f79642df2598fd98f5c1ca / sources.js

3533 bytesRaw
1
2var keys = exports.keys =
3function (object) {
4 return values(Object.keys(object))
5}
6
7function abortCb(cb, abort, onAbort) {
8 cb(abort)
9 onAbort && onAbort(abort === true ? null: abort)
10 return
11}
12
13var once = exports.once =
14function (value, onAbort) {
15 return function (abort, cb) {
16 if(abort)
17 return abortCb(cb, abort, onAbort)
18 if(value != null) {
19 var _value = value; value = null
20 cb(null, _value)
21 } else
22 cb(true)
23 }
24}
25
26var values = exports.values = exports.readArray =
27function (array, onAbort) {
28 if(!array)
29 return function (abort, cb) {
30 if(abort) return abortCb(cb, abort, onAbort)
31 return cb(true)
32 }
33 if(!Array.isArray(array))
34 array = Object.keys(array).map(function (k) {
35 return array[k]
36 })
37 var i = 0
38 return function (abort, cb) {
39 if(abort)
40 return abortCb(cb, abort, onAbort)
41 cb(i >= array.length || null, array[i++])
42 }
43}
44
45
46var count = exports.count =
47function (max) {
48 var i = 0; max = max || Infinity
49 return function (end, cb) {
50 if(end) return cb && cb(end)
51 if(i > max)
52 return cb(true)
53 cb(null, i++)
54 }
55}
56
57var infinite = exports.infinite =
58function (generate) {
59 generate = generate || Math.random
60 return function (end, cb) {
61 if(end) return cb && cb(end)
62 return cb(null, generate())
63 }
64}
65
66var defer = exports.defer = function () {
67 var _read, cbs = [], _end
68
69 var read = function (end, cb) {
70 if(!_read) {
71 _end = end
72 cbs.push(cb)
73 }
74 else _read(end, cb)
75 }
76 read.resolve = function (read) {
77 if(_read) throw new Error('already resolved')
78 _read = read
79 if(!_read) throw new Error('no read cannot resolve!' + _read)
80 while(cbs.length)
81 _read(_end, cbs.shift())
82 }
83 read.abort = function(err) {
84 read.resolve(function (_, cb) {
85 cb(err || true)
86 })
87 }
88 return read
89}
90
91var empty = exports.empty = function () {
92 return function (abort, cb) {
93 cb(true)
94 }
95}
96
97var error = exports.error = function (err) {
98 return function (abort, cb) {
99 cb(err)
100 }
101}
102
103var depthFirst = exports.depthFirst =
104function (start, createStream) {
105 var reads = []
106
107 reads.unshift(once(start))
108
109 return function next (end, cb) {
110 if(!reads.length)
111 return cb(true)
112 reads[0](end, function (end, data) {
113 if(end) {
114 //if this stream has ended, go to the next queue
115 reads.shift()
116 return next(null, cb)
117 }
118 reads.unshift(createStream(data))
119 cb(end, data)
120 })
121 }
122}
123//width first is just like depth first,
124//but push each new stream onto the end of the queue
125var widthFirst = exports.widthFirst =
126function (start, createStream) {
127 var reads = []
128
129 reads.push(once(start))
130
131 return function next (end, cb) {
132 if(!reads.length)
133 return cb(true)
134 reads[0](end, function (end, data) {
135 if(end) {
136 reads.shift()
137 return next(null, cb)
138 }
139 reads.push(createStream(data))
140 cb(end, data)
141 })
142 }
143}
144
145//this came out different to the first (strm)
146//attempt at leafFirst, but it's still a valid
147//topological sort.
148var leafFirst = exports.leafFirst =
149function (start, createStream) {
150 var reads = []
151 var output = []
152 reads.push(once(start))
153
154 return function next (end, cb) {
155 reads[0](end, function (end, data) {
156 if(end) {
157 reads.shift()
158 if(!output.length)
159 return cb(true)
160 return cb(null, output.shift())
161 }
162 reads.unshift(createStream(data))
163 output.unshift(data)
164 next(null, cb)
165 })
166 }
167}
168
169

Built with git-ssb-web