git ssb

2+

Dominic / pull-stream



Tree: 5f1a1047fd206aafe8d09749bcde2ffbf78cc7b1

Files: 5f1a1047fd206aafe8d09749bcde2ffbf78cc7b1 / sinks.js

1693 bytesRaw
1var u = require('./util')
2var prop = u.prop
3var id = u.id
4
5var drain = exports.drain = function (read, op, done) {
6 ;(function next() {
7 var sync = true, returned = false, loop = true
8 do {
9 returned = false; sync = true
10 read(null, function (err, data) {
11 returned = true
12
13 if(err) {
14 done && done(err === true ? null : err)
15 return loop = false
16 }
17
18 if(op) {
19 //return false to abort!
20 if(false === op(data)) {
21 loop = false
22 return read(true, done || function () {})
23 }
24 }
25 if(!sync) next()
26 })
27 sync = false
28 if(!returned) return
29 } while (loop);
30 })()
31}
32
33var find =
34exports.find = function (read, test, cb) {
35 var ended = false
36 if(!cb)
37 cb = test, test = id
38 else
39 test = prop(test) || id
40 drain(read, function (data) {
41 if(test(data)) {
42 ended = true
43 cb(null, data)
44 return false
45 }
46 }, function (err) {
47 if(ended) return //already called back
48 cb(err === true ? null : err, null)
49 })
50}
51
52var reduce = exports.reduce =
53function (read, reduce, acc, cb) {
54 drain(read, function (data) {
55 acc = reduce(acc, data)
56 }, function (err) {
57 cb(err, acc)
58 })
59}
60
61var collect = exports.collect = exports.writeArray =
62function (read, cb) {
63 return reduce(read, function (arr, item) {
64 arr.push(item)
65 return arr
66 }, [], cb)
67}
68
69//if the source callsback sync, then loop
70//rather than recurse
71
72var onEnd = exports.onEnd = function (read, done) {
73 return drain(read, null, done)
74}
75
76var log = exports.log = function (read, done) {
77 return drain(read, console.log.bind(console), done)
78}
79
80

Built with git-ssb-web