git ssb

2+

Dominic / pull-stream



Tree: 7436b6f6b4a46b125b4bf85646d25f75408a56a3

Files: 7436b6f6b4a46b125b4bf85646d25f75408a56a3 / sinks.js

2527 bytesRaw
1'use strict'
2
3function id (item) { return item }
4
5function prop (key) {
6 return (
7 'string' == typeof key
8 ? function (data) { return data[key] }
9 : key && 'object' === typeof key && 'function' === typeof key.exec //regexp
10 ? function (data) { var v = map.exec(data); return v && v[0] }
11 : key || id
12 )
13}
14
15
16var drain = exports.drain = function (op, done) {
17 var read, abort
18
19 function sink (_read) {
20 read = _read
21 if(abort) return sink.abort()
22 //this function is much simpler to write if you
23 //just use recursion, but by using a while loop
24 //we do not blow the stack if the stream happens to be sync.
25 ;(function next() {
26 var loop = true, cbed = false
27 while(loop) {
28 cbed = false
29 read(null, function (end, data) {
30 cbed = true
31 if(end = end || abort) {
32 loop = false
33 if(done) done(end === true ? null : end)
34 else if(end && end !== true)
35 throw end
36 }
37 else if(op && false === op(data) || abort) {
38 loop = false
39 read(abort || true, done || function () {})
40 }
41 else if(!loop){
42 next()
43 }
44 })
45 if(!cbed) {
46 loop = false
47 return
48 }
49 }
50 })()
51 }
52
53 sink.abort = function (err, cb) {
54 if('function' == typeof err)
55 cb = err, err = true
56 abort = err || true
57 if(read) return read(abort, cb || function () {})
58 }
59
60 return sink
61}
62
63var onEnd = exports.onEnd = function (done) {
64 return drain(null, done)
65}
66
67var log = exports.log = function (done) {
68 return drain(function (data) {
69 console.log(data)
70 }, done)
71}
72
73var find =
74exports.find = function (test, cb) {
75 var ended = false
76 if(!cb)
77 cb = test, test = id
78 else
79 test = prop(test) || id
80
81 return drain(function (data) {
82 if(test(data)) {
83 ended = true
84 cb(null, data)
85 return false
86 }
87 }, function (err) {
88 if(ended) return //already called back
89 cb(err === true ? null : err, null)
90 })
91}
92
93var reduce = exports.reduce = function (reduce, acc, cb) {
94
95 return drain(function (data) {
96 acc = reduce(acc, data)
97 }, function (err) {
98 cb(err, acc)
99 })
100
101}
102
103var collect = exports.collect =
104function (cb) {
105 return reduce(function (arr, item) {
106 arr.push(item)
107 return arr
108 }, [], cb)
109}
110
111var concat = exports.concat =
112function (cb) {
113 return reduce(function (a, b) {
114 return a + b
115 }, '', cb)
116}
117
118
119
120
121
122

Built with git-ssb-web