Files: ffd00da88bd46f6200bd22699acbb8a980ff260f / index.js
5455 bytesRaw
1 | |
2 | var fs = require('fs') |
3 | |
4 | var isBuffer = Buffer.isBuffer |
5 | var Obv = require('obv') |
6 | var Append = require('append-batch') |
7 | var Blocks = require('aligned-block-file') |
8 | var isInteger = Number.isInteger |
9 | var ltgt = require('ltgt') |
10 | |
11 | function frame (data) { |
12 | var length = data.reduce(function (total, value) { return total + value.length }, 0) |
13 | var b = new Buffer(length + data.length * 8) |
14 | var offset = 0 |
15 | for(var i = 0; i < data.length; i++) { |
16 | var buf = data[i] |
17 | //mutate the items |
18 | //var buf = item.value |
19 | b.writeUInt32BE(buf.length, 0 + offset) //start |
20 | b.writeUInt32BE(buf.length, 4+buf.length + offset) //end |
21 | buf.copy(b, 4 + offset, 0, buf.length) |
22 | offset += buf.length + 8 |
23 | } |
24 | return b |
25 | } |
26 | |
27 | function format (seqs, values, seq, value, cursor) { |
28 | return ( |
29 | seqs !== false |
30 | ? values !== false |
31 | ? {value: value, seq: seq} |
32 | : seq |
33 | : value |
34 | ) |
35 | } |
36 | |
37 | var k = 0 |
38 | |
39 | function id (v) { return v } |
40 | var id_codec = {encode: id, decode: id} |
41 | |
42 | module.exports = function (file, length, codec) { |
43 | if(!codec) codec = id_codec |
44 | var since = Obv() |
45 | length = length || 1024 |
46 | var blocks = Blocks(file, length, 'a+') |
47 | |
48 | var append = Append(function (batch, cb) { |
49 | batch = batch.map(codec.encode).map(function (e) { |
50 | return Buffer.isBuffer(e) ? e : new Buffer(e) |
51 | }) |
52 | blocks.append(frame(batch), function (err) { |
53 | if(err) return cb(err) |
54 | //else, get offset of last item. |
55 | since.set(blocks.offset.value - (batch[batch.length - 1].length + 8)) |
56 | cb(null, since.value) |
57 | }) |
58 | }) |
59 | |
60 | var since = Obv() |
61 | var offset = blocks.offset |
62 | |
63 | offset.once(function (offset) { |
64 | if(offset === 0) return since.set(-1) |
65 | log.getPrevious(offset, function (err, value, length) { |
66 | since.set(offset - length) |
67 | }) |
68 | }) |
69 | |
70 | var log |
71 | return log = { |
72 | since: since, |
73 | //create a stream between any two records. |
74 | //read the first value, then scan forward or backwards |
75 | //in the direction of the log |
76 | |
77 | stream: function (opts) { |
78 | opts = opts || {} |
79 | var cursor |
80 | var reverse = !!opts.reverse |
81 | var get = reverse ? log.getPrevious : log.get |
82 | var diff = reverse ? -1 : 1 |
83 | var live = opts.live |
84 | var aborted = false |
85 | var skip = false |
86 | |
87 | if(reverse) { |
88 | if(opts.lt != null) cursor = opts.lt |
89 | else if(opts.lte != null) { |
90 | cursor = opts.lte; skip = true |
91 | } |
92 | } |
93 | else { |
94 | if(opts.gte != null) cursor = opts.gte |
95 | else if(opts.gt != null) { |
96 | cursor = opts.gt; skip = true |
97 | } |
98 | else cursor = 0 |
99 | } |
100 | |
101 | var lower = ltgt.lowerBound(opts) || 0 |
102 | var includeLower = ltgt.lowerBoundInclusive(opts) |
103 | var upper = ltgt.upperBound(opts) |
104 | var includeUpper = ltgt.upperBoundInclusive(opts) |
105 | |
106 | |
107 | function next (cb) { |
108 | if(aborted) return cb(aborted) |
109 | |
110 | if(!reverse && upper != null && includeUpper ? cursor > upper : cursor >= upper) { |
111 | return cb(true) |
112 | } |
113 | |
114 | get(cursor, function (err, value, length) { |
115 | if(!value.length) throw new Error('read empty value') |
116 | var _cursor = reverse ? cursor - length : cursor |
117 | cursor += (length * diff) |
118 | |
119 | if(reverse && (includeLower ? cursor < lower : cursor <= lower)) |
120 | return cb(true) |
121 | |
122 | if(skip) { |
123 | skip = false |
124 | return next(cb) |
125 | } |
126 | |
127 | cb(err, format(opts.seqs, opts.values, _cursor, value)) |
128 | }) |
129 | } |
130 | |
131 | return function (abort, cb) { |
132 | if(aborted = abort) return cb(abort) |
133 | |
134 | offset.once(function (_offset) { |
135 | //if(_offset < cursor) //throw new Error('offset smaller than cursor') |
136 | if(cursor == null && reverse) { |
137 | cursor = _offset; next(cb) |
138 | } |
139 | else if(reverse ? cursor > 0 : cursor < _offset) next(cb) |
140 | else if(reverse ? cursor <= 0 : cursor >= _offset) { |
141 | if(!live) return cb(true) |
142 | offset.once(function () { next(cb) }, false) |
143 | } |
144 | else |
145 | throw new Error('should never happen: cursor is invalid state:'+cursor+' offset:'+_offset) |
146 | }) |
147 | } |
148 | }, |
149 | |
150 | //if value is an array of buffers, then treat that as a batch. |
151 | append: append, |
152 | |
153 | get: function (_offset, cb) { |
154 | if(!isInteger(_offset)) throw new Error('get: offset must be integer') |
155 | //read the block that offset is in. |
156 | //if offset is near the end of the block, read two blocks. |
157 | blocks.readUInt32BE(_offset, function (err, length) { |
158 | if(err) return cb(err) |
159 | blocks.read(_offset + 4, _offset + 4 + length, function (err, value) { |
160 | if(value.length !== length) throw new Error('incorrect length, expected:'+length+', was:'+value.length) |
161 | setImmediate(function () { |
162 | cb(err, codec.decode(value), length + 8) |
163 | }) |
164 | }) |
165 | }) |
166 | }, |
167 | //get the record _before_ the given offset. |
168 | getPrevious: function (_offset, cb) { |
169 | //don't read before start of file... |
170 | if(!isInteger(_offset)) throw new Error('getPrevious: offset must be integer') |
171 | |
172 | _offset = _offset || blocks.size() |
173 | if(_offset == 0) return cb(new Error('attempted read previous to first object')) |
174 | blocks.readUInt32BE(_offset - 4, function (err, length) { |
175 | if(err) return cb(err) |
176 | blocks.read(_offset - 4 - length, _offset - 4, function (err, value) { |
177 | cb(err, codec.decode(value), length + 8) |
178 | }) |
179 | }) |
180 | }, |
181 | } |
182 | } |
183 | |
184 |
Built with git-ssb-web