git ssb

1+

Dominic / offset-log



Tree: ffd00da88bd46f6200bd22699acbb8a980ff260f

Files: ffd00da88bd46f6200bd22699acbb8a980ff260f / index.js

5455 bytesRaw
1'use strict'
2var fs = require('fs')
3
4var isBuffer = Buffer.isBuffer
5var Obv = require('obv')
6var Append = require('append-batch')
7var Blocks = require('aligned-block-file')
8var isInteger = Number.isInteger
9var ltgt = require('ltgt')
10
11function frame (data) {
12 var length = data.reduce(function (total, value) { return total + value.length }, 0)
13 var b = new Buffer(length + data.length * 8)
14 var offset = 0
15 for(var i = 0; i < data.length; i++) {
16 var buf = data[i]
17 //mutate the items
18 //var buf = item.value
19 b.writeUInt32BE(buf.length, 0 + offset) //start
20 b.writeUInt32BE(buf.length, 4+buf.length + offset) //end
21 buf.copy(b, 4 + offset, 0, buf.length)
22 offset += buf.length + 8
23 }
24 return b
25}
26
27function format (seqs, values, seq, value, cursor) {
28 return (
29 seqs !== false
30 ? values !== false
31 ? {value: value, seq: seq}
32 : seq
33 : value
34 )
35}
36
37var k = 0
38
39function id (v) { return v }
40var id_codec = {encode: id, decode: id}
41
42module.exports = function (file, length, codec) {
43 if(!codec) codec = id_codec
44 var since = Obv()
45 length = length || 1024
46 var blocks = Blocks(file, length, 'a+')
47
48 var append = Append(function (batch, cb) {
49 batch = batch.map(codec.encode).map(function (e) {
50 return Buffer.isBuffer(e) ? e : new Buffer(e)
51 })
52 blocks.append(frame(batch), function (err) {
53 if(err) return cb(err)
54 //else, get offset of last item.
55 since.set(blocks.offset.value - (batch[batch.length - 1].length + 8))
56 cb(null, since.value)
57 })
58 })
59
60 var since = Obv()
61 var offset = blocks.offset
62
63 offset.once(function (offset) {
64 if(offset === 0) return since.set(-1)
65 log.getPrevious(offset, function (err, value, length) {
66 since.set(offset - length)
67 })
68 })
69
70 var log
71 return log = {
72 since: since,
73 //create a stream between any two records.
74 //read the first value, then scan forward or backwards
75 //in the direction of the log
76
77 stream: function (opts) {
78 opts = opts || {}
79 var cursor
80 var reverse = !!opts.reverse
81 var get = reverse ? log.getPrevious : log.get
82 var diff = reverse ? -1 : 1
83 var live = opts.live
84 var aborted = false
85 var skip = false
86
87 if(reverse) {
88 if(opts.lt != null) cursor = opts.lt
89 else if(opts.lte != null) {
90 cursor = opts.lte; skip = true
91 }
92 }
93 else {
94 if(opts.gte != null) cursor = opts.gte
95 else if(opts.gt != null) {
96 cursor = opts.gt; skip = true
97 }
98 else cursor = 0
99 }
100
101 var lower = ltgt.lowerBound(opts) || 0
102 var includeLower = ltgt.lowerBoundInclusive(opts)
103 var upper = ltgt.upperBound(opts)
104 var includeUpper = ltgt.upperBoundInclusive(opts)
105
106
107 function next (cb) {
108 if(aborted) return cb(aborted)
109
110 if(!reverse && upper != null && includeUpper ? cursor > upper : cursor >= upper) {
111 return cb(true)
112 }
113
114 get(cursor, function (err, value, length) {
115 if(!value.length) throw new Error('read empty value')
116 var _cursor = reverse ? cursor - length : cursor
117 cursor += (length * diff)
118
119 if(reverse && (includeLower ? cursor < lower : cursor <= lower))
120 return cb(true)
121
122 if(skip) {
123 skip = false
124 return next(cb)
125 }
126
127 cb(err, format(opts.seqs, opts.values, _cursor, value))
128 })
129 }
130
131 return function (abort, cb) {
132 if(aborted = abort) return cb(abort)
133
134 offset.once(function (_offset) {
135 //if(_offset < cursor) //throw new Error('offset smaller than cursor')
136 if(cursor == null && reverse) {
137 cursor = _offset; next(cb)
138 }
139 else if(reverse ? cursor > 0 : cursor < _offset) next(cb)
140 else if(reverse ? cursor <= 0 : cursor >= _offset) {
141 if(!live) return cb(true)
142 offset.once(function () { next(cb) }, false)
143 }
144 else
145 throw new Error('should never happen: cursor is invalid state:'+cursor+' offset:'+_offset)
146 })
147 }
148 },
149
150 //if value is an array of buffers, then treat that as a batch.
151 append: append,
152
153 get: function (_offset, cb) {
154 if(!isInteger(_offset)) throw new Error('get: offset must be integer')
155 //read the block that offset is in.
156 //if offset is near the end of the block, read two blocks.
157 blocks.readUInt32BE(_offset, function (err, length) {
158 if(err) return cb(err)
159 blocks.read(_offset + 4, _offset + 4 + length, function (err, value) {
160 if(value.length !== length) throw new Error('incorrect length, expected:'+length+', was:'+value.length)
161 setImmediate(function () {
162 cb(err, codec.decode(value), length + 8)
163 })
164 })
165 })
166 },
167 //get the record _before_ the given offset.
168 getPrevious: function (_offset, cb) {
169 //don't read before start of file...
170 if(!isInteger(_offset)) throw new Error('getPrevious: offset must be integer')
171
172 _offset = _offset || blocks.size()
173 if(_offset == 0) return cb(new Error('attempted read previous to first object'))
174 blocks.readUInt32BE(_offset - 4, function (err, length) {
175 if(err) return cb(err)
176 blocks.read(_offset - 4 - length, _offset - 4, function (err, value) {
177 cb(err, codec.decode(value), length + 8)
178 })
179 })
180 },
181 }
182}
183
184

Built with git-ssb-web