git ssb

1+

Dominic / offset-log



Commit 6524aa00b6739b51cda65822bc3d3f80ab7f8801

rewrite stream

Dominic Tarr committed on 10/22/2016, 3:39:43 PM
Parent: 7147d8edeb40da6ff51e3b200ed7e65c5eb32c27

Files changed

index.jschanged
index.jsView
@@ -5,9 +5,12 @@
55 var Live = require('pull-live')
66 var pull = require('pull-stream/pull')
77 var Map = require('pull-stream/throughs/map')
88
9 +var Eventually = require('eventually')
10 +
911 var Blocks = require('block-reader')
12 +var isInteger = Number.isInteger
1013
1114 function frame (data) {
1215 var length = data.reduce(function (total, e) { return total + e.value.length }, 0)
1316 var b = new Buffer(length + data.length * 8)
@@ -16,38 +19,36 @@
1619 var item = data[i]
1720 //mutate the items
1821 var buf = item.value
1922 item.offset = 0 + offset
23 + console.log(buf.length, buf)
2024 b.writeUInt32BE(buf.length, 0 + offset) //start
2125 b.writeUInt32BE(buf.length, 4+buf.length + offset) //end
2226 item.value.copy(b, 4 + offset, 0, buf.length)
2327 offset += buf.length + 8
2428 }
2529 return b
2630 }
2731
28-function format (opts) {
29- var keys = opts.keys === true //default to false
30- var values = opts.value !== false //default to true
31- return Map(function (data) {
32- return keys && values ? data : values ? data.value : data.key
33- })
32 +function format (keys, values, key, value) {
33 + return (
34 + keys !== false
35 + ? values !== false
36 + ? {key: key, value: value}
37 + : key
38 + : value
39 + )
3440 }
3541
3642 module.exports = function (file, length) {
3743
44 + var since = Eventually()
3845 var notify = Notify()
3946 length = length || 1024
4047 var blocks = Blocks(file, length, 'a+')
4148
4249 var queue = [], writing = false
43- //TODO: check current size of file!
44- var offset = -1
4550
46- try {
47- offset = fs.statSync(file).size
48- } catch(_) {}
49-
5051 function write () {
5152 if(writing) return
5253 if(!queue.length) return
5354 writing = true
@@ -59,94 +60,121 @@
5960 writing = false
6061 while(_queue.length) {
6162 var q = _queue.shift()
6263 var o = (_offset - framed.length) + q.offset
63- offset = Math.max(offset, o)
6464 q.cb(err, o)
6565 }
66 + //updates since.
6667 if(queue.length) write()
6768 })
6869 }
6970
71 + var offset = blocks.offset
7072 var log
7173 return log = {
74 + offset: offset,
7275 //create a stream between any two records.
7376 //read the first value, then scan forward or backwards
7477 //in the direction of the log
7578
7679 //using pull-live this way means that things added in real-time are buffered
7780 //in memory until they are read, that means less predictable memory usage.
7881 //instead, we should track the offset we are up to, and wait if necessary.
79- stream: Live(function (opts) {
80- var reverse = opts && opts.reverse
81- var next = reverse ? (opts && opts.max || blocks.size()) : (opts && opts.min || 0)
82 + stream: function (opts) {
83 + opts = opts || {}
84 + var cursor
85 + var reverse = !!opts.reverse
86 + var get = reverse ? log.getPrevious : log.get
8287 var diff = reverse ? -1 : 1
83- var get = reverse ? log.getPrevious : log.get
84- var end = offset
85- return pull(function (abort, cb) {
86- if(abort) cb(abort)
87- else if(reverse ? next <= 0 : next > end)
88- cb(true)
89- else
90- get(next, function (err, value) {
91- if(err) return cb(true) //err)
92- else if(!value || !value.length) return cb(true)
93- else {
94- var _offset = next
95- next = next + (value.length + 8)*diff
96- cb(null, {key: _offset, value: value})
97- }
98- })
99- }, format(opts))
100- }, function (opts) {
101- return pull(notify.listen(), format(opts))
102- }),
88 + var live = opts.live
89 + if(!reverse && opts.gte == null) {
90 + cursor = 0
91 + }
92 + else
93 + cursor = reverse ? opts.lt : opts.gte
94 +
95 + function next (cb) {
96 + get(cursor, function (err, value, length) {
97 + if(!value.length) throw new Error('read empty value')
98 + _cursor = cursor
99 + cursor += (length * diff)
100 + cb(err, format(opts.keys, opts.value, _cursor, value))
101 + })
102 + }
103 +
104 + return function (abort, cb) {
105 + offset.once(function (_offset) {
106 + //if(_offset < cursor) //throw new Error('offset smaller than cursor')
107 + if(cursor == null && reverse)
108 + offset.once(function (_offset) {
109 + cursor = _offset
110 + next(cb)
111 + })
112 + else if(reverse ? cursor > 0 : cursor < _offset) next(cb)
113 + else if(reverse ? cursor <= 0 : cursor >= _offset) {
114 + if(!live) cb(true)
115 + else offset.once(function (_offset) {
116 + if(cursor == _offset) throw new Error('expected offset to update')
117 + next(cb)
118 + }, false)
119 + }
120 + else
121 + throw new Error('should never happen: cursor is invalid state:'+cursor+' offset:'+_offset)
122 + })
123 + }
124 + },
125 +
103126 //if value is an array of buffers, then treat that as a batch.
104127 append: function (value, cb) {
105128 //TODO: make this like, actually durable...
106129 if(Array.isArray(value)) {
107130 var offsets = []
108131 value.forEach(function (v) {
109132 queue.push({value: v, cb: function (err, offset) {
110133 offsets.push(offset)
111- if(offsets.length === value.length)
134 + if(offsets.length === value.length) {
135 + for(var i = 0; i < offsets.length; i++)
136 + notify({key: offsets[i], value: value[i]})
112137 cb(null, offsets)
138 + }
113139 }})
114140 })
115141
116142 return write()
117143 }
118144 if(!isBuffer(value)) throw new Error('value must be a buffer')
119- queue.push({value: value, cb: function (err, offset) {
145 + queue.push({value: value, cb: function (err, _offset) {
120146 if(err) return cb(err)
121147 notify({key: offset, value: value})
122- cb(null, offset)
148 + cb(null, _offset)
123149 }})
124150 write()
125151 },
126- get: function (offset, cb) {
152 + get: function (_offset, cb) {
153 + if(!isInteger(_offset)) throw new Error('get: offset must be integer')
127154 //read the block that offset is in.
128155 //if offset is near the end of the block, read two blocks.
129- blocks.readUInt32BE(offset, function (err, length) {
156 + blocks.readUInt32BE(_offset, function (err, length) {
130157 if(err) return cb(err)
131- blocks.read(offset + 4, offset+4 + length, cb)
158 + blocks.read(_offset + 4, _offset + 4 + length, function (err, value) {
159 + if(value.length !== length) throw new Error('incorrect length, expected:'+length+', was:'+value.length)
160 + cb(err, value, length + 8)
161 + })
132162 })
133163 },
134164 //get the record _before_ the given offset.
135165 getPrevious: function (_offset, cb) {
136166 //don't read before start of file...
137- _offset = _offset || blocks.size()
167 + if(!isInteger(_offset)) throw new Error('getPrevious: offset must be integer')
168 +
169 + _offset = _offset || blocks.size()
138170 if(_offset == 0) return cb(new Error('attempted read previous to first object'))
139171 blocks.readUInt32BE(_offset - 4, function (err, length) {
140172 if(err) return cb(err)
141- blocks.read(_offset - 4 - length, _offset - 4, cb)
173 + blocks.read(_offset - 4 - length, _offset - 4, function (err, value) {
174 + cb(err, value, length + 8)
175 + })
142176 })
143177 },
144178 }
145179 }
146180
147-
148-
149-
150-
151-
152-

Built with git-ssb-web