git ssb

1+

Dominic / offset-log



Tree: 1a10bc9d9fd3eba08dd44cd0ee208bdc2651f38f

Files: 1a10bc9d9fd3eba08dd44cd0ee208bdc2651f38f / index.js

3312 bytesRaw
1var fs = require('fs')
2
3var isBuffer = Buffer.isBuffer
4var Notify = require('pull-notify')
5var Live = require('pull-live')
6
7var Blocks = require('block-reader')
8
9function frame (data) {
10 var length = data.reduce(function (total, e) { return total + e.value.length }, 0)
11 var b = new Buffer(length + data.length * 8)
12 var offset = 0
13 for(var i = 0; i < data.length; i++) {
14 var item = data[i]
15 //mutate the items
16 var buf = item.value
17 item.offset = 0 + offset
18 b.writeUInt32BE(buf.length, 0 + offset) //start
19 b.writeUInt32BE(buf.length, 4+buf.length + offset) //end
20 item.value.copy(b, 4 + offset, 0, buf.length)
21 offset += buf.length + 8
22 }
23 return b
24}
25
26module.exports = function (file, length) {
27
28 var notify = Notify()
29 length = length || 1024
30 var blocks = Blocks(file, length, 'a+')
31
32 var queue = [], writing = false
33 //TODO: check current size of file!
34 var offset = 0
35
36 function write () {
37 if(writing) return
38 if(!queue.length) return
39 writing = true
40 var data = []
41 var framed = frame(queue)
42 var _queue = queue
43 queue = []
44 blocks.append(framed, function (err, offset) {
45 writing = false
46 while(_queue.length) {
47 var q = _queue.shift()
48 q.cb(err, (offset - framed.length) + q.offset)
49 }
50 if(queue.length) write()
51 })
52 }
53
54 var log
55 return log = {
56 //create a stream between any two records.
57 //read the first value, then scan forward or backwards
58 //in the direction of the log
59 stream: Live(function (opts) {
60 var reverse = opts && opts.reverse
61 var next = reverse ? (opts && opts.max || blocks.size()) : (opts && opts.min || 0)
62 var diff = reverse ? -1 : 1
63 var get = reverse ? log.getPrevious : log.get
64
65 return function (abort, cb) {
66 if(abort) cb(abort)
67 else if(reverse && next <= 0)
68 cb(true)
69 else
70 get(next, function (err, value) {
71 if(err) return cb(true) //err)
72 else if(!value || !value.length) return cb(true)
73 else {
74 next = next + (value.length + 8)*diff
75 cb(null, value)
76 }
77 })
78 }
79 }, function (opts) {
80 return notify.listen()
81 }),
82 //if value is an array of buffers, then treat that as a batch.
83 append: function (value, cb) {
84 if(!isBuffer(value)) throw new Error('value must be a buffer')
85 queue.push({value: value, cb: function (err, offset) {
86 if(err) return cb(err)
87 notify(value)
88 cb(null, offset)
89 }})
90 write()
91 },
92 get: function (offset, cb) {
93 //read the block that offset is in.
94 //if offset is near the end of the block, read two blocks.
95 blocks.readUInt32BE(offset, function (err, length) {
96 if(err) return cb(err)
97 blocks.read(offset + 4, offset+4 + length, cb)
98 })
99 },
100 //get the record _before_ the given offset.
101 getPrevious: function (_offset, cb) {
102 //don't read before start of file...
103 _offset = _offset || blocks.size()
104 if(_offset == 0) return cb(new Error('attempted read previous to first object'))
105 blocks.readUInt32BE(_offset - 4, function (err, length) {
106 if(err) return cb(err)
107 blocks.read(_offset - 4 - length, _offset - 4, cb)
108 })
109 },
110 }
111}
112
113
114

Built with git-ssb-web