Files: 1a10bc9d9fd3eba08dd44cd0ee208bdc2651f38f / index.js
3312 bytesRaw
1 | var fs = require('fs') |
2 | |
3 | var isBuffer = Buffer.isBuffer |
4 | var Notify = require('pull-notify') |
5 | var Live = require('pull-live') |
6 | |
7 | var Blocks = require('block-reader') |
8 | |
9 | function frame (data) { |
10 | var length = data.reduce(function (total, e) { return total + e.value.length }, 0) |
11 | var b = new Buffer(length + data.length * 8) |
12 | var offset = 0 |
13 | for(var i = 0; i < data.length; i++) { |
14 | var item = data[i] |
15 | //mutate the items |
16 | var buf = item.value |
17 | item.offset = 0 + offset |
18 | b.writeUInt32BE(buf.length, 0 + offset) //start |
19 | b.writeUInt32BE(buf.length, 4+buf.length + offset) //end |
20 | item.value.copy(b, 4 + offset, 0, buf.length) |
21 | offset += buf.length + 8 |
22 | } |
23 | return b |
24 | } |
25 | |
26 | module.exports = function (file, length) { |
27 | |
28 | var notify = Notify() |
29 | length = length || 1024 |
30 | var blocks = Blocks(file, length, 'a+') |
31 | |
32 | var queue = [], writing = false |
33 | //TODO: check current size of file! |
34 | var offset = 0 |
35 | |
36 | function write () { |
37 | if(writing) return |
38 | if(!queue.length) return |
39 | writing = true |
40 | var data = [] |
41 | var framed = frame(queue) |
42 | var _queue = queue |
43 | queue = [] |
44 | blocks.append(framed, function (err, offset) { |
45 | writing = false |
46 | while(_queue.length) { |
47 | var q = _queue.shift() |
48 | q.cb(err, (offset - framed.length) + q.offset) |
49 | } |
50 | if(queue.length) write() |
51 | }) |
52 | } |
53 | |
54 | var log |
55 | return log = { |
56 | //create a stream between any two records. |
57 | //read the first value, then scan forward or backwards |
58 | //in the direction of the log |
59 | stream: Live(function (opts) { |
60 | var reverse = opts && opts.reverse |
61 | var next = reverse ? (opts && opts.max || blocks.size()) : (opts && opts.min || 0) |
62 | var diff = reverse ? -1 : 1 |
63 | var get = reverse ? log.getPrevious : log.get |
64 | |
65 | return function (abort, cb) { |
66 | if(abort) cb(abort) |
67 | else if(reverse && next <= 0) |
68 | cb(true) |
69 | else |
70 | get(next, function (err, value) { |
71 | if(err) return cb(true) //err) |
72 | else if(!value || !value.length) return cb(true) |
73 | else { |
74 | next = next + (value.length + 8)*diff |
75 | cb(null, value) |
76 | } |
77 | }) |
78 | } |
79 | }, function (opts) { |
80 | return notify.listen() |
81 | }), |
82 | //if value is an array of buffers, then treat that as a batch. |
83 | append: function (value, cb) { |
84 | if(!isBuffer(value)) throw new Error('value must be a buffer') |
85 | queue.push({value: value, cb: function (err, offset) { |
86 | if(err) return cb(err) |
87 | notify(value) |
88 | cb(null, offset) |
89 | }}) |
90 | write() |
91 | }, |
92 | get: function (offset, cb) { |
93 | //read the block that offset is in. |
94 | //if offset is near the end of the block, read two blocks. |
95 | blocks.readUInt32BE(offset, function (err, length) { |
96 | if(err) return cb(err) |
97 | blocks.read(offset + 4, offset+4 + length, cb) |
98 | }) |
99 | }, |
100 | //get the record _before_ the given offset. |
101 | getPrevious: function (_offset, cb) { |
102 | //don't read before start of file... |
103 | _offset = _offset || blocks.size() |
104 | if(_offset == 0) return cb(new Error('attempted read previous to first object')) |
105 | blocks.readUInt32BE(_offset - 4, function (err, length) { |
106 | if(err) return cb(err) |
107 | blocks.read(_offset - 4 - length, _offset - 4, cb) |
108 | }) |
109 | }, |
110 | } |
111 | } |
112 | |
113 | |
114 |
Built with git-ssb-web