git ssb

1+

Dominic / offset-log



Tree: d58249c539906c2fad920beece6e0b99b1afe1ca

Files: d58249c539906c2fad920beece6e0b99b1afe1ca / index.js

4080 bytesRaw
1var fs = require('fs')
2
3var isBuffer = Buffer.isBuffer
4var Notify = require('pull-notify')
5var Live = require('pull-live')
6var pull = require('pull-stream/pull')
7var Map = require('pull-stream/throughs/map')
8
9var Blocks = require('block-reader')
10
11function frame (data) {
12 var length = data.reduce(function (total, e) { return total + e.value.length }, 0)
13 var b = new Buffer(length + data.length * 8)
14 var offset = 0
15 for(var i = 0; i < data.length; i++) {
16 var item = data[i]
17 //mutate the items
18 var buf = item.value
19 item.offset = 0 + offset
20 b.writeUInt32BE(buf.length, 0 + offset) //start
21 b.writeUInt32BE(buf.length, 4+buf.length + offset) //end
22 item.value.copy(b, 4 + offset, 0, buf.length)
23 offset += buf.length + 8
24 }
25 return b
26}
27
28function format (opts) {
29 var keys = opts.keys === true //default to false
30 var values = opts.value !== false //default to true
31 return Map(function (data) {
32 return keys && values ? data : values ? data.value : data.key
33 })
34}
35
36module.exports = function (file, length) {
37
38 var notify = Notify()
39 length = length || 1024
40 var blocks = Blocks(file, length, 'a+')
41
42 var queue = [], writing = false
43 //TODO: check current size of file!
44 var offset = -1
45
46 function write () {
47 if(writing) return
48 if(!queue.length) return
49 writing = true
50 var data = []
51 var framed = frame(queue)
52 var _queue = queue
53 queue = []
54 blocks.append(framed, function (err, _offset) {
55 writing = false
56 while(_queue.length) {
57 var q = _queue.shift()
58 var o = (_offset - framed.length) + q.offset
59 offset = Math.max(offset, o)
60 q.cb(err, o)
61 }
62 if(queue.length) write()
63 })
64 }
65
66 var log
67 return log = {
68 //create a stream between any two records.
69 //read the first value, then scan forward or backwards
70 //in the direction of the log
71
72 //using pull-live this way means that things added in real-time are buffered
73 //in memory until they are read, that means less predictable memory usage.
74 //instead, we should track the offset we are up to, and wait if necessary.
75 stream: Live(function (opts) {
76 var reverse = opts && opts.reverse
77 var next = reverse ? (opts && opts.max || blocks.size()) : (opts && opts.min || 0)
78 var diff = reverse ? -1 : 1
79 var get = reverse ? log.getPrevious : log.get
80 var end = offset
81 return pull(function (abort, cb) {
82 if(abort) cb(abort)
83 else if(reverse ? next <= 0 : next > end)
84 cb(true)
85 else
86 get(next, function (err, value) {
87 if(err) return cb(true) //err)
88 else if(!value || !value.length) return cb(true)
89 else {
90 var _offset = next
91 next = next + (value.length + 8)*diff
92 cb(null, {key: _offset, value: value})
93 }
94 })
95 }, format(opts))
96 }, function (opts) {
97 return pull(notify.listen(), format(opts))
98 }),
99 //if value is an array of buffers, then treat that as a batch.
100 append: function (value, cb) {
101 if(!isBuffer(value)) throw new Error('value must be a buffer')
102 queue.push({value: value, cb: function (err, offset) {
103 if(err) return cb(err)
104 notify({key: offset, value: value})
105 cb(null, offset)
106 }})
107 write()
108 },
109 get: function (offset, cb) {
110 //read the block that offset is in.
111 //if offset is near the end of the block, read two blocks.
112 blocks.readUInt32BE(offset, function (err, length) {
113 if(err) return cb(err)
114 blocks.read(offset + 4, offset+4 + length, cb)
115 })
116 },
117 //get the record _before_ the given offset.
118 getPrevious: function (_offset, cb) {
119 //don't read before start of file...
120 _offset = _offset || blocks.size()
121 if(_offset == 0) return cb(new Error('attempted read previous to first object'))
122 blocks.readUInt32BE(_offset - 4, function (err, length) {
123 if(err) return cb(err)
124 blocks.read(_offset - 4 - length, _offset - 4, cb)
125 })
126 },
127 }
128}
129

Built with git-ssb-web