Commit ffd00da88bd46f6200bd22699acbb8a980ff260f
pass test-flumelog
Dominic Tarr committed on 11/7/2016, 10:09:47 PMParent: 54ab0ea7ce57ed943ff2950b28462a5aa3d72db4
Files changed
index.js | changed |
index.js | ||
---|---|---|
@@ -5,8 +5,9 @@ | ||
5 | 5 … | var Obv = require('obv') |
6 | 6 … | var Append = require('append-batch') |
7 | 7 … | var Blocks = require('aligned-block-file') |
8 | 8 … | var isInteger = Number.isInteger |
9 … | +var ltgt = require('ltgt') | |
9 | 10 … | |
10 | 11 … | function frame (data) { |
11 | 12 … | var length = data.reduce(function (total, value) { return total + value.length }, 0) |
12 | 13 … | var b = new Buffer(length + data.length * 8) |
@@ -22,27 +23,33 @@ | ||
22 | 23 … | } |
23 | 24 … | return b |
24 | 25 … | } |
25 | 26 … | |
26 | -function format (keys, values, key, value, cursor) { | |
27 … | +function format (seqs, values, seq, value, cursor) { | |
27 | 28 … | return ( |
28 | - keys !== false | |
29 … | + seqs !== false | |
29 | 30 … | ? values !== false |
30 | - ? {key: key, value: value, seq: cursor} | |
31 | - : key | |
31 … | + ? {value: value, seq: seq} | |
32 … | + : seq | |
32 | 33 … | : value |
33 | 34 … | ) |
34 | 35 … | } |
35 | 36 … | |
36 | 37 … | var k = 0 |
37 | 38 … | |
38 | -module.exports = function (file, length) { | |
39 … | +function id (v) { return v } | |
40 … | +var id_codec = {encode: id, decode: id} | |
39 | 41 … | |
42 … | +module.exports = function (file, length, codec) { | |
43 … | + if(!codec) codec = id_codec | |
40 | 44 … | var since = Obv() |
41 | 45 … | length = length || 1024 |
42 | 46 … | var blocks = Blocks(file, length, 'a+') |
43 | 47 … | |
44 | 48 … | var append = Append(function (batch, cb) { |
49 … | + batch = batch.map(codec.encode).map(function (e) { | |
50 … | + return Buffer.isBuffer(e) ? e : new Buffer(e) | |
51 … | + }) | |
45 | 52 … | blocks.append(frame(batch), function (err) { |
46 | 53 … | if(err) return cb(err) |
47 | 54 … | //else, get offset of last item. |
48 | 55 … | since.set(blocks.offset.value - (batch[batch.length - 1].length + 8)) |
@@ -74,21 +81,51 @@ | ||
74 | 81 … | var get = reverse ? log.getPrevious : log.get |
75 | 82 … | var diff = reverse ? -1 : 1 |
76 | 83 … | var live = opts.live |
77 | 84 … | var aborted = false |
78 | - if(!reverse && opts.gte == null) { | |
79 | - cursor = 0 | |
85 … | + var skip = false | |
86 … | + | |
87 … | + if(reverse) { | |
88 … | + if(opts.lt != null) cursor = opts.lt | |
89 … | + else if(opts.lte != null) { | |
90 … | + cursor = opts.lte; skip = true | |
91 … | + } | |
80 | 92 … | } |
81 | - else | |
82 | - cursor = reverse ? opts.lt : opts.gte | |
93 … | + else { | |
94 … | + if(opts.gte != null) cursor = opts.gte | |
95 … | + else if(opts.gt != null) { | |
96 … | + cursor = opts.gt; skip = true | |
97 … | + } | |
98 … | + else cursor = 0 | |
99 … | + } | |
83 | 100 … | |
101 … | + var lower = ltgt.lowerBound(opts) || 0 | |
102 … | + var includeLower = ltgt.lowerBoundInclusive(opts) | |
103 … | + var upper = ltgt.upperBound(opts) | |
104 … | + var includeUpper = ltgt.upperBoundInclusive(opts) | |
105 … | + | |
106 … | + | |
84 | 107 … | function next (cb) { |
85 | 108 … | if(aborted) return cb(aborted) |
109 … | + | |
110 … | + if(!reverse && upper != null && includeUpper ? cursor > upper : cursor >= upper) { | |
111 … | + return cb(true) | |
112 … | + } | |
113 … | + | |
86 | 114 … | get(cursor, function (err, value, length) { |
87 | 115 … | if(!value.length) throw new Error('read empty value') |
88 | - var _cursor = cursor | |
116 … | + var _cursor = reverse ? cursor - length : cursor | |
89 | 117 … | cursor += (length * diff) |
90 | - cb(err, format(opts.keys, opts.value, _cursor, value, cursor)) | |
118 … | + | |
119 … | + if(reverse && (includeLower ? cursor < lower : cursor <= lower)) | |
120 … | + return cb(true) | |
121 … | + | |
122 … | + if(skip) { | |
123 … | + skip = false | |
124 … | + return next(cb) | |
125 … | + } | |
126 … | + | |
127 … | + cb(err, format(opts.seqs, opts.values, _cursor, value)) | |
91 | 128 … | }) |
92 | 129 … | } |
93 | 130 … | |
94 | 131 … | return function (abort, cb) { |
@@ -121,9 +158,9 @@ | ||
121 | 158 … | if(err) return cb(err) |
122 | 159 … | blocks.read(_offset + 4, _offset + 4 + length, function (err, value) { |
123 | 160 … | if(value.length !== length) throw new Error('incorrect length, expected:'+length+', was:'+value.length) |
124 | 161 … | setImmediate(function () { |
125 | - cb(err, value, length + 8) | |
162 … | + cb(err, codec.decode(value), length + 8) | |
126 | 163 … | }) |
127 | 164 … | }) |
128 | 165 … | }) |
129 | 166 … | }, |
@@ -136,12 +173,11 @@ | ||
136 | 173 … | if(_offset == 0) return cb(new Error('attempted read previous to first object')) |
137 | 174 … | blocks.readUInt32BE(_offset - 4, function (err, length) { |
138 | 175 … | if(err) return cb(err) |
139 | 176 … | blocks.read(_offset - 4 - length, _offset - 4, function (err, value) { |
140 | - cb(err, value, length + 8) | |
177 … | + cb(err, codec.decode(value), length + 8) | |
141 | 178 … | }) |
142 | 179 … | }) |
143 | 180 … | }, |
144 | 181 … | } |
145 | 182 … | } |
146 | 183 … | |
147 | - |
Built with git-ssb-web