Files: e009a08c7f44e7c7f9c2890d8f00780b1819ebe2 / sinks.js
1559 bytesRaw
1 | var drain = exports.drain = function (read, op, done) { |
2 | ;(function next() { |
3 | var sync = true, returned = false, loop = true |
4 | do { |
5 | returned = false; sync = true |
6 | read(null, function (err, data) { |
7 | returned = true |
8 | |
9 | if(err) { |
10 | done && done(err === true ? null : err) |
11 | return loop = false |
12 | } |
13 | |
14 | if(op) { |
15 | //return false to abort! |
16 | if(false === op(data)) { |
17 | loop = false |
18 | return read(true, done || function () {}) |
19 | } |
20 | } |
21 | if(!sync) next() |
22 | }) |
23 | sync = false |
24 | if(!returned) return |
25 | } while (loop); |
26 | })() |
27 | } |
28 | |
29 | var find = |
30 | exports.find = function (read, test, cb) { |
31 | var ended = false |
32 | drain(read, function (data) { |
33 | if(test(data)) { |
34 | ended = true |
35 | cb(null, data) |
36 | return false |
37 | } |
38 | }, function (err) { |
39 | if(ended) return //already called back |
40 | cb(err === true ? null : err, null) |
41 | }) |
42 | } |
43 | |
44 | var reduce = exports.reduce = |
45 | function (read, reduce, acc, cb) { |
46 | drain(read, function (data) { |
47 | acc = reduce(acc, data) |
48 | }, function (err) { |
49 | cb(err, acc) |
50 | }) |
51 | } |
52 | |
53 | var collect = exports.collect = exports.writeArray = |
54 | function (read, cb) { |
55 | return reduce(read, function (arr, item) { |
56 | arr.push(item) |
57 | return arr |
58 | }, [], cb) |
59 | } |
60 | |
61 | //if the source callsback sync, then loop |
62 | //rather than recurse |
63 | |
64 | var onEnd = exports.onEnd = function (read, done) { |
65 | return drain(read, null, done) |
66 | } |
67 | |
68 | var log = exports.log = function (read, done) { |
69 | return drain(read, console.log.bind(console), done) |
70 | } |
71 | |
72 |
Built with git-ssb-web