Files: de9181b8c2bcabd82926896eb949284d98bb634e / lib / flumeview-level.js
4269 bytesRaw
1 | // FORKED until https://github.com/flumedb/flumeview-level/pull/3 is merged! |
2 | var pull = require('pull-stream') |
3 | var Level = require('level') |
4 | var bytewise = require('bytewise') |
5 | var Write = require('pull-write') |
6 | var pl = require('pull-level') |
7 | var Obv = require('obv') |
8 | var path = require('path') |
9 | var Paramap = require('pull-paramap') |
10 | var ltgt = require('ltgt') |
11 | var explain = require('explain-error') |
12 | |
13 | module.exports = function (version, map) { |
14 | return function (log, name) { |
15 | var db = create(path), writer |
16 | |
17 | var META = '\x00', since = Obv() |
18 | |
19 | var written = 0, closed |
20 | |
21 | function create() { |
22 | closed = false |
23 | if(!log.filename) |
24 | throw new Error('flumeview-level can only be used with a log that provides a directory') |
25 | var dir = path.dirname(log.filename) |
26 | return Level(path.join(dir, name), {keyEncoding: bytewise, valueEncoding: 'json'}) |
27 | } |
28 | |
29 | function close (cb) { |
30 | closed = true |
31 | //todo: move this bit into pull-write |
32 | if(writer) writer.abort(function () { db.close(cb) }) |
33 | else db.close(cb) |
34 | } |
35 | |
36 | function destroy (cb) { |
37 | close(function () { |
38 | var dbPath = path.join(path.dirname(log.filename), name) |
39 | Level.destroy(dbPath, cb) |
40 | }) |
41 | } |
42 | |
43 | db.get(META, {keyEncoding: 'utf8'}, function (err, value) { |
44 | if(err) since.set(-1) |
45 | else if(value.version === version) |
46 | since.set(value.since) |
47 | else //version has changed, wipe db and start over. |
48 | destroy(function () { |
49 | db = create(path); since.set(-1) |
50 | }) |
51 | }) |
52 | |
53 | var since = Obv() |
54 | |
55 | return { |
56 | since: since, |
57 | methods: { get: 'async', read: 'source'}, |
58 | createSink: function (cb) { |
59 | return writer = Write(function (batch, cb) { |
60 | if(closed) return cb(new Error('database closed while index was building')) |
61 | db.batch(batch, function (err) { |
62 | if(err) return cb(err) |
63 | since.set(batch[0].value.since) |
64 | //callback to anyone waiting for this point. |
65 | cb() |
66 | }) |
67 | }, function reduce (batch, data) { |
68 | if(data.sync) return batch |
69 | var seq = data.seq |
70 | |
71 | if(!batch) |
72 | batch = [{ |
73 | key: META, |
74 | value: {version: version, since: seq}, |
75 | valueEncoding: 'json', keyEncoding:'utf8', type: 'put' |
76 | }] |
77 | |
78 | //map must return an array (like flatmap) with zero or more values |
79 | var indexed = map(data.value, data.seq) |
80 | batch = batch.concat(indexed.map(function (key) { return { key: key, value: seq, type: 'put' }})) |
81 | batch[0].value.since = Math.max(batch[0].value.since, seq) |
82 | return batch |
83 | }, 512, cb) |
84 | }, |
85 | |
86 | get: function (key, cb) { |
87 | //wait until the log has been processed up to the current point. |
88 | db.get(key, function (err, seq) { |
89 | if(err && err.name === 'NotFoundError') return cb(err) |
90 | if(err) cb(explain(err, 'flumeview-level.get: key not found:'+key)) |
91 | else |
92 | log.get(seq, function (err, value) { |
93 | if(err) cb(explain(err, 'flumeview-level.get: index for:'+key+'pointed at:'+seq+'but log error')) |
94 | else cb(null, value) |
95 | }) |
96 | }) |
97 | }, |
98 | read: function (opts) { |
99 | var keys = opts.keys |
100 | var values = opts.values |
101 | opts.keys = true; opts.values = true |
102 | //TODO: preserve whatever the user passed in on opts... |
103 | |
104 | var lower = ltgt.lowerBound(opts) |
105 | if(lower == null) opts.gt = null |
106 | |
107 | return pull( |
108 | pl.read(db, opts), |
109 | pull.filter(function (op) { |
110 | //this is an ugly hack! ); but it stops the index metadata appearing in the live stream |
111 | return op.key !== '\u0000' |
112 | }), |
113 | Paramap(function (data, cb) { |
114 | if(data.sync) return cb(null, data) |
115 | log.get(data.value, function (err, value) { |
116 | if(err) cb(explain(err, 'when trying to retrive:'+data.key+'at since:'+log.since.value)) |
117 | else cb(null, {key: data.key, seq: data.value, value: value}) |
118 | }) |
119 | }) |
120 | ) |
121 | }, |
122 | close: close, |
123 | destroy: destroy |
124 | //put, del, batch - leave these out for now, since the indexes just map. |
125 | } |
126 | } |
127 | } |
128 |
Built with git-ssb-web