git ssb

2+

Dominic / pull-stream



Tree: 270506f3aabb41d4a2d4ae309eef3029ee733587

Files: 270506f3aabb41d4a2d4ae309eef3029ee733587 / throughs.js

2787 bytesRaw
1function prop (map) {
2 if('string' == typeof map) {
3 var key = map
4 return function (data) { return data[key] }
5 }
6 return map
7}
8
9function id (item) {
10 return item
11}
12
13var k = 0
14var map = exports.map =
15function (read, map) {
16 var _k = k++
17 map = prop(map) || id
18 return function (end, cb) {
19 read(end, function (end, data) {
20 var data = !end ? map(data) : null
21 cb(end, data)
22 })
23 }
24}
25
26var filter = exports.filter =
27function (read, test) {
28 //regexp
29 if('object' === typeof test
30 && 'function' === typeof test.test)
31 test = test.test.bind(test)
32 test = prop(test) || id
33 return function next (end, cb) {
34 read(end, function (end, data) {
35 if(!end && !test(data))
36 return next(end, cb)
37 cb(end, data)
38 })
39 }
40}
41
42var through = exports.through =
43function (read, op) {
44 return function (end, cb) {
45 return read(end, function (end, data) {
46 if(!end) op && op(data)
47 cb(end, data)
48 })
49 }
50}
51
52var take = exports.take =
53function (read, test) {
54 var ended = false
55 if('number' === typeof test) {
56 var n = test; test = function () {
57 return n-- > 0
58 }
59 }
60 return function (end, cb) {
61 if(end) {
62 if(!ended) return ended = end, read(end, cb)
63 cb(ended)
64 }
65 return read(null, function (end, data) {
66 if(ended) return
67 if(end) return cb(ended = end)
68 //TODO, CHECK THAT END LOGIC IS CORRECT WITH TAKE!!!
69 if(!test(data)) {
70 ended = true
71 nextTick(function () {
72 read(true, function (){})
73 })
74 return cb(true)
75 }
76 return cb(null, data)
77 })
78 }
79}
80
81var unique = exports.unique = function (read, field, invert) {
82 field = prop(field) || id
83 var seen = {}
84 return filter(read, function (data) {
85 var key = field(data)
86 if(seen[key]) return !!invert //false, by default
87 else seen[key] = true
88 return !invert //true by default
89 })
90}
91
92var nonUnique = exports.nonUnique = function (read, field) {
93 return unique(read, field, true)
94}
95
96var nextTick = process.nextTick
97
98var highWaterMark = exports.highWaterMark =
99function (read, highWaterMark) {
100 var buffer = [], waiting = [], ended, reading = false
101 highWaterMark = highWaterMark || 10
102
103 function readAhead () {
104 while(waiting.length && (buffer.length || ended))
105 waiting.shift()(ended, ended ? null : buffer.shift())
106 }
107
108 function next () {
109 if(ended || reading || buffer.length >= highWaterMark)
110 return
111 reading = true
112 return read(ended, function (end, data) {
113 reading = false
114 ended = ended || end
115 if(data != null) buffer.push(data)
116
117 next(); readAhead()
118 })
119 }
120
121 nextTick(next)
122
123 return function (end, cb) {
124 ended = ended || end
125 waiting.push(cb)
126
127 next(); readAhead()
128 }
129}
130
131

Built with git-ssb-web