Files: d58249c539906c2fad920beece6e0b99b1afe1ca / index.js
4080 bytesRaw
1 | var fs = require('fs') |
2 | |
3 | var isBuffer = Buffer.isBuffer |
4 | var Notify = require('pull-notify') |
5 | var Live = require('pull-live') |
6 | var pull = require('pull-stream/pull') |
7 | var Map = require('pull-stream/throughs/map') |
8 | |
9 | var Blocks = require('block-reader') |
10 | |
11 | function frame (data) { |
12 | var length = data.reduce(function (total, e) { return total + e.value.length }, 0) |
13 | var b = new Buffer(length + data.length * 8) |
14 | var offset = 0 |
15 | for(var i = 0; i < data.length; i++) { |
16 | var item = data[i] |
17 | //mutate the items |
18 | var buf = item.value |
19 | item.offset = 0 + offset |
20 | b.writeUInt32BE(buf.length, 0 + offset) //start |
21 | b.writeUInt32BE(buf.length, 4+buf.length + offset) //end |
22 | item.value.copy(b, 4 + offset, 0, buf.length) |
23 | offset += buf.length + 8 |
24 | } |
25 | return b |
26 | } |
27 | |
28 | function format (opts) { |
29 | var keys = opts.keys === true //default to false |
30 | var values = opts.value !== false //default to true |
31 | return Map(function (data) { |
32 | return keys && values ? data : values ? data.value : data.key |
33 | }) |
34 | } |
35 | |
36 | module.exports = function (file, length) { |
37 | |
38 | var notify = Notify() |
39 | length = length || 1024 |
40 | var blocks = Blocks(file, length, 'a+') |
41 | |
42 | var queue = [], writing = false |
43 | //TODO: check current size of file! |
44 | var offset = -1 |
45 | |
46 | function write () { |
47 | if(writing) return |
48 | if(!queue.length) return |
49 | writing = true |
50 | var data = [] |
51 | var framed = frame(queue) |
52 | var _queue = queue |
53 | queue = [] |
54 | blocks.append(framed, function (err, _offset) { |
55 | writing = false |
56 | while(_queue.length) { |
57 | var q = _queue.shift() |
58 | var o = (_offset - framed.length) + q.offset |
59 | offset = Math.max(offset, o) |
60 | q.cb(err, o) |
61 | } |
62 | if(queue.length) write() |
63 | }) |
64 | } |
65 | |
66 | var log |
67 | return log = { |
68 | //create a stream between any two records. |
69 | //read the first value, then scan forward or backwards |
70 | //in the direction of the log |
71 | |
72 | //using pull-live this way means that things added in real-time are buffered |
73 | //in memory until they are read, that means less predictable memory usage. |
74 | //instead, we should track the offset we are up to, and wait if necessary. |
75 | stream: Live(function (opts) { |
76 | var reverse = opts && opts.reverse |
77 | var next = reverse ? (opts && opts.max || blocks.size()) : (opts && opts.min || 0) |
78 | var diff = reverse ? -1 : 1 |
79 | var get = reverse ? log.getPrevious : log.get |
80 | var end = offset |
81 | return pull(function (abort, cb) { |
82 | if(abort) cb(abort) |
83 | else if(reverse ? next <= 0 : next > end) |
84 | cb(true) |
85 | else |
86 | get(next, function (err, value) { |
87 | if(err) return cb(true) //err) |
88 | else if(!value || !value.length) return cb(true) |
89 | else { |
90 | var _offset = next |
91 | next = next + (value.length + 8)*diff |
92 | cb(null, {key: _offset, value: value}) |
93 | } |
94 | }) |
95 | }, format(opts)) |
96 | }, function (opts) { |
97 | return pull(notify.listen(), format(opts)) |
98 | }), |
99 | //if value is an array of buffers, then treat that as a batch. |
100 | append: function (value, cb) { |
101 | if(!isBuffer(value)) throw new Error('value must be a buffer') |
102 | queue.push({value: value, cb: function (err, offset) { |
103 | if(err) return cb(err) |
104 | notify({key: offset, value: value}) |
105 | cb(null, offset) |
106 | }}) |
107 | write() |
108 | }, |
109 | get: function (offset, cb) { |
110 | //read the block that offset is in. |
111 | //if offset is near the end of the block, read two blocks. |
112 | blocks.readUInt32BE(offset, function (err, length) { |
113 | if(err) return cb(err) |
114 | blocks.read(offset + 4, offset+4 + length, cb) |
115 | }) |
116 | }, |
117 | //get the record _before_ the given offset. |
118 | getPrevious: function (_offset, cb) { |
119 | //don't read before start of file... |
120 | _offset = _offset || blocks.size() |
121 | if(_offset == 0) return cb(new Error('attempted read previous to first object')) |
122 | blocks.readUInt32BE(_offset - 4, function (err, length) { |
123 | if(err) return cb(err) |
124 | blocks.read(_offset - 4 - length, _offset - 4, cb) |
125 | }) |
126 | }, |
127 | } |
128 | } |
129 |
Built with git-ssb-web