Files: 77e8971d09fb121162d025a7851aca84b3d5562f / sinks.js
2527 bytesRaw
1 | |
2 | |
3 | function id (item) { return item } |
4 | |
5 | function prop (key) { |
6 | return ( |
7 | 'string' == typeof key |
8 | ? function (data) { return data[key] } |
9 | : key && 'object' === typeof key && 'function' === typeof key.exec //regexp |
10 | ? function (data) { var v = map.exec(data); return v && v[0] } |
11 | : key || id |
12 | ) |
13 | } |
14 | |
15 | |
16 | var drain = exports.drain = function (op, done) { |
17 | var read, abort |
18 | |
19 | function sink (_read) { |
20 | read = _read |
21 | if(abort) return sink.abort() |
22 | //this function is much simpler to write if you |
23 | //just use recursion, but by using a while loop |
24 | //we do not blow the stack if the stream happens to be sync. |
25 | ;(function next() { |
26 | var loop = true, cbed = false |
27 | while(loop) { |
28 | cbed = false |
29 | read(null, function (end, data) { |
30 | cbed = true |
31 | if(end = end || abort) { |
32 | loop = false |
33 | if(done) done(end === true ? null : end) |
34 | else if(end && end !== true) |
35 | throw end |
36 | } |
37 | else if(op && false === op(data) || abort) { |
38 | loop = false |
39 | read(abort || true, done || function () {}) |
40 | } |
41 | else if(!loop){ |
42 | next() |
43 | } |
44 | }) |
45 | if(!cbed) { |
46 | loop = false |
47 | return |
48 | } |
49 | } |
50 | })() |
51 | } |
52 | |
53 | sink.abort = function (err, cb) { |
54 | if('function' == typeof err) |
55 | cb = err, err = true |
56 | abort = err || true |
57 | if(read) return read(abort, cb || function () {}) |
58 | } |
59 | |
60 | return sink |
61 | } |
62 | |
63 | var onEnd = exports.onEnd = function (done) { |
64 | return drain(null, done) |
65 | } |
66 | |
67 | var log = exports.log = function (done) { |
68 | return drain(function (data) { |
69 | console.log(data) |
70 | }, done) |
71 | } |
72 | |
73 | var find = |
74 | exports.find = function (test, cb) { |
75 | var ended = false |
76 | if(!cb) |
77 | cb = test, test = id |
78 | else |
79 | test = prop(test) || id |
80 | |
81 | return drain(function (data) { |
82 | if(test(data)) { |
83 | ended = true |
84 | cb(null, data) |
85 | return false |
86 | } |
87 | }, function (err) { |
88 | if(ended) return //already called back |
89 | cb(err === true ? null : err, null) |
90 | }) |
91 | } |
92 | |
93 | var reduce = exports.reduce = function (reduce, acc, cb) { |
94 | |
95 | return drain(function (data) { |
96 | acc = reduce(acc, data) |
97 | }, function (err) { |
98 | cb(err, acc) |
99 | }) |
100 | |
101 | } |
102 | |
103 | var collect = exports.collect = |
104 | function (cb) { |
105 | return reduce(function (arr, item) { |
106 | arr.push(item) |
107 | return arr |
108 | }, [], cb) |
109 | } |
110 | |
111 | var concat = exports.concat = |
112 | function (cb) { |
113 | return reduce(function (a, b) { |
114 | return a + b |
115 | }, '', cb) |
116 | } |
117 | |
118 | |
119 | |
120 | |
121 | |
122 |
Built with git-ssb-web