Files: 5f1a1047fd206aafe8d09749bcde2ffbf78cc7b1 / sinks.js
1693 bytesRaw
1 | var u = require('./util') |
2 | var prop = u.prop |
3 | var id = u.id |
4 | |
5 | var drain = exports.drain = function (read, op, done) { |
6 | ;(function next() { |
7 | var sync = true, returned = false, loop = true |
8 | do { |
9 | returned = false; sync = true |
10 | read(null, function (err, data) { |
11 | returned = true |
12 | |
13 | if(err) { |
14 | done && done(err === true ? null : err) |
15 | return loop = false |
16 | } |
17 | |
18 | if(op) { |
19 | //return false to abort! |
20 | if(false === op(data)) { |
21 | loop = false |
22 | return read(true, done || function () {}) |
23 | } |
24 | } |
25 | if(!sync) next() |
26 | }) |
27 | sync = false |
28 | if(!returned) return |
29 | } while (loop); |
30 | })() |
31 | } |
32 | |
33 | var find = |
34 | exports.find = function (read, test, cb) { |
35 | var ended = false |
36 | if(!cb) |
37 | cb = test, test = id |
38 | else |
39 | test = prop(test) || id |
40 | drain(read, function (data) { |
41 | if(test(data)) { |
42 | ended = true |
43 | cb(null, data) |
44 | return false |
45 | } |
46 | }, function (err) { |
47 | if(ended) return //already called back |
48 | cb(err === true ? null : err, null) |
49 | }) |
50 | } |
51 | |
52 | var reduce = exports.reduce = |
53 | function (read, reduce, acc, cb) { |
54 | drain(read, function (data) { |
55 | acc = reduce(acc, data) |
56 | }, function (err) { |
57 | cb(err, acc) |
58 | }) |
59 | } |
60 | |
61 | var collect = exports.collect = exports.writeArray = |
62 | function (read, cb) { |
63 | return reduce(read, function (arr, item) { |
64 | arr.push(item) |
65 | return arr |
66 | }, [], cb) |
67 | } |
68 | |
69 | //if the source callsback sync, then loop |
70 | //rather than recurse |
71 | |
72 | var onEnd = exports.onEnd = function (read, done) { |
73 | return drain(read, null, done) |
74 | } |
75 | |
76 | var log = exports.log = function (read, done) { |
77 | return drain(read, console.log.bind(console), done) |
78 | } |
79 | |
80 |
Built with git-ssb-web