git ssb

1+

Dominic / offset-log



Tree: 19f8415bfb3ca6c771792dd025ec7db5b6927e03

Files: 19f8415bfb3ca6c771792dd025ec7db5b6927e03 / index.js

3749 bytesRaw
1var fs = require('fs')
2
3var isBuffer = Buffer.isBuffer
4var Notify = require('pull-notify')
5var Live = require('pull-live')
6var pull = require('pull-stream/pull')
7var Map = require('pull-stream/throughs/map')
8
9var Blocks = require('block-reader')
10
11function frame (data) {
12 var length = data.reduce(function (total, e) { return total + e.value.length }, 0)
13 var b = new Buffer(length + data.length * 8)
14 var offset = 0
15 for(var i = 0; i < data.length; i++) {
16 var item = data[i]
17 //mutate the items
18 var buf = item.value
19 item.offset = 0 + offset
20 b.writeUInt32BE(buf.length, 0 + offset) //start
21 b.writeUInt32BE(buf.length, 4+buf.length + offset) //end
22 item.value.copy(b, 4 + offset, 0, buf.length)
23 offset += buf.length + 8
24 }
25 return b
26}
27
28function format (opts) {
29 var keys = opts.keys === true //default to false
30 var values = opts.value !== false //default to true
31 return Map(function (data) {
32 return keys && values ? data : values ? data.value : data.key
33 })
34}
35
36module.exports = function (file, length) {
37
38 var notify = Notify()
39 length = length || 1024
40 var blocks = Blocks(file, length, 'a+')
41
42 var queue = [], writing = false
43 //TODO: check current size of file!
44 var offset = 0
45
46 function write () {
47 if(writing) return
48 if(!queue.length) return
49 writing = true
50 var data = []
51 var framed = frame(queue)
52 var _queue = queue
53 queue = []
54 blocks.append(framed, function (err, offset) {
55 writing = false
56 while(_queue.length) {
57 var q = _queue.shift()
58 q.cb(err, (offset - framed.length) + q.offset)
59 }
60 if(queue.length) write()
61 })
62 }
63
64 var log
65 return log = {
66 //create a stream between any two records.
67 //read the first value, then scan forward or backwards
68 //in the direction of the log
69 stream: Live(function (opts) {
70 var reverse = opts && opts.reverse
71 var next = reverse ? (opts && opts.max || blocks.size()) : (opts && opts.min || 0)
72 var diff = reverse ? -1 : 1
73 var get = reverse ? log.getPrevious : log.get
74
75 return pull(function (abort, cb) {
76 if(abort) cb(abort)
77 else if(reverse && next <= 0)
78 cb(true)
79 else
80 get(next, function (err, value) {
81 if(err) return cb(true) //err)
82 else if(!value || !value.length) return cb(true)
83 else {
84 var _offset = next
85 next = next + (value.length + 8)*diff
86 cb(null, {key: _offset, value: value})
87 }
88 })
89 }, format(opts))
90 }, function (opts) {
91 return pull(notify.listen(), format(opts))
92 }),
93 //if value is an array of buffers, then treat that as a batch.
94 append: function (value, cb) {
95 if(!isBuffer(value)) throw new Error('value must be a buffer')
96 queue.push({value: value, cb: function (err, offset) {
97 if(err) return cb(err)
98 notify({key: offset, value: value})
99 cb(null, offset)
100 }})
101 write()
102 },
103 get: function (offset, cb) {
104 //read the block that offset is in.
105 //if offset is near the end of the block, read two blocks.
106 blocks.readUInt32BE(offset, function (err, length) {
107 if(err) return cb(err)
108 blocks.read(offset + 4, offset+4 + length, cb)
109 })
110 },
111 //get the record _before_ the given offset.
112 getPrevious: function (_offset, cb) {
113 //don't read before start of file...
114 _offset = _offset || blocks.size()
115 if(_offset == 0) return cb(new Error('attempted read previous to first object'))
116 blocks.readUInt32BE(_offset - 4, function (err, length) {
117 if(err) return cb(err)
118 blocks.read(_offset - 4 - length, _offset - 4, cb)
119 })
120 },
121 }
122}
123
124

Built with git-ssb-web