Files: 19f8415bfb3ca6c771792dd025ec7db5b6927e03 / index.js
3749 bytesRaw
1 | var fs = require('fs') |
2 | |
3 | var isBuffer = Buffer.isBuffer |
4 | var Notify = require('pull-notify') |
5 | var Live = require('pull-live') |
6 | var pull = require('pull-stream/pull') |
7 | var Map = require('pull-stream/throughs/map') |
8 | |
9 | var Blocks = require('block-reader') |
10 | |
11 | function frame (data) { |
12 | var length = data.reduce(function (total, e) { return total + e.value.length }, 0) |
13 | var b = new Buffer(length + data.length * 8) |
14 | var offset = 0 |
15 | for(var i = 0; i < data.length; i++) { |
16 | var item = data[i] |
17 | //mutate the items |
18 | var buf = item.value |
19 | item.offset = 0 + offset |
20 | b.writeUInt32BE(buf.length, 0 + offset) //start |
21 | b.writeUInt32BE(buf.length, 4+buf.length + offset) //end |
22 | item.value.copy(b, 4 + offset, 0, buf.length) |
23 | offset += buf.length + 8 |
24 | } |
25 | return b |
26 | } |
27 | |
28 | function format (opts) { |
29 | var keys = opts.keys === true //default to false |
30 | var values = opts.value !== false //default to true |
31 | return Map(function (data) { |
32 | return keys && values ? data : values ? data.value : data.key |
33 | }) |
34 | } |
35 | |
36 | module.exports = function (file, length) { |
37 | |
38 | var notify = Notify() |
39 | length = length || 1024 |
40 | var blocks = Blocks(file, length, 'a+') |
41 | |
42 | var queue = [], writing = false |
43 | //TODO: check current size of file! |
44 | var offset = 0 |
45 | |
46 | function write () { |
47 | if(writing) return |
48 | if(!queue.length) return |
49 | writing = true |
50 | var data = [] |
51 | var framed = frame(queue) |
52 | var _queue = queue |
53 | queue = [] |
54 | blocks.append(framed, function (err, offset) { |
55 | writing = false |
56 | while(_queue.length) { |
57 | var q = _queue.shift() |
58 | q.cb(err, (offset - framed.length) + q.offset) |
59 | } |
60 | if(queue.length) write() |
61 | }) |
62 | } |
63 | |
64 | var log |
65 | return log = { |
66 | //create a stream between any two records. |
67 | //read the first value, then scan forward or backwards |
68 | //in the direction of the log |
69 | stream: Live(function (opts) { |
70 | var reverse = opts && opts.reverse |
71 | var next = reverse ? (opts && opts.max || blocks.size()) : (opts && opts.min || 0) |
72 | var diff = reverse ? -1 : 1 |
73 | var get = reverse ? log.getPrevious : log.get |
74 | |
75 | return pull(function (abort, cb) { |
76 | if(abort) cb(abort) |
77 | else if(reverse && next <= 0) |
78 | cb(true) |
79 | else |
80 | get(next, function (err, value) { |
81 | if(err) return cb(true) //err) |
82 | else if(!value || !value.length) return cb(true) |
83 | else { |
84 | var _offset = next |
85 | next = next + (value.length + 8)*diff |
86 | cb(null, {key: _offset, value: value}) |
87 | } |
88 | }) |
89 | }, format(opts)) |
90 | }, function (opts) { |
91 | return pull(notify.listen(), format(opts)) |
92 | }), |
93 | //if value is an array of buffers, then treat that as a batch. |
94 | append: function (value, cb) { |
95 | if(!isBuffer(value)) throw new Error('value must be a buffer') |
96 | queue.push({value: value, cb: function (err, offset) { |
97 | if(err) return cb(err) |
98 | notify({key: offset, value: value}) |
99 | cb(null, offset) |
100 | }}) |
101 | write() |
102 | }, |
103 | get: function (offset, cb) { |
104 | //read the block that offset is in. |
105 | //if offset is near the end of the block, read two blocks. |
106 | blocks.readUInt32BE(offset, function (err, length) { |
107 | if(err) return cb(err) |
108 | blocks.read(offset + 4, offset+4 + length, cb) |
109 | }) |
110 | }, |
111 | //get the record _before_ the given offset. |
112 | getPrevious: function (_offset, cb) { |
113 | //don't read before start of file... |
114 | _offset = _offset || blocks.size() |
115 | if(_offset == 0) return cb(new Error('attempted read previous to first object')) |
116 | blocks.readUInt32BE(_offset - 4, function (err, length) { |
117 | if(err) return cb(err) |
118 | blocks.read(_offset - 4 - length, _offset - 4, cb) |
119 | }) |
120 | }, |
121 | } |
122 | } |
123 | |
124 |
Built with git-ssb-web