lib/flumeview-level.jsView |
---|
1 | | - |
2 | | -var pull = require('pull-stream') |
3 | | -var Level = require('level') |
4 | | -var bytewise = require('bytewise') |
5 | | -var Write = require('pull-write') |
6 | | -var pl = require('pull-level') |
7 | | -var Obv = require('obv') |
8 | | -var path = require('path') |
9 | | -var Paramap = require('pull-paramap') |
10 | | -var ltgt = require('ltgt') |
11 | | -var explain = require('explain-error') |
12 | | - |
13 | | -module.exports = function (version, map) { |
14 | | - return function (log, name) { |
15 | | - var db = create(path), writer |
16 | | - |
17 | | - var META = '\x00', since = Obv() |
18 | | - |
19 | | - var written = 0, closed |
20 | | - |
21 | | - function create() { |
22 | | - closed = false |
23 | | - if(!log.filename) |
24 | | - throw new Error('flumeview-level can only be used with a log that provides a directory') |
25 | | - var dir = path.dirname(log.filename) |
26 | | - return Level(path.join(dir, name), {keyEncoding: bytewise, valueEncoding: 'json'}) |
27 | | - } |
28 | | - |
29 | | - function close (cb) { |
30 | | - closed = true |
31 | | - |
32 | | - if(writer) writer.abort(function () { db.close(cb) }) |
33 | | - else db.close(cb) |
34 | | - } |
35 | | - |
36 | | - function destroy (cb) { |
37 | | - close(function () { |
38 | | - var dbPath = path.join(path.dirname(log.filename), name) |
39 | | - Level.destroy(dbPath, cb) |
40 | | - }) |
41 | | - } |
42 | | - |
43 | | - db.get(META, {keyEncoding: 'utf8'}, function (err, value) { |
44 | | - if(err) since.set(-1) |
45 | | - else if(value.version === version) |
46 | | - since.set(value.since) |
47 | | - else |
48 | | - destroy(function () { |
49 | | - db = create(path); since.set(-1) |
50 | | - }) |
51 | | - }) |
52 | | - |
53 | | - var since = Obv() |
54 | | - |
55 | | - return { |
56 | | - since: since, |
57 | | - methods: { get: 'async', read: 'source'}, |
58 | | - createSink: function (cb) { |
59 | | - return writer = Write(function (batch, cb) { |
60 | | - if(closed) return cb(new Error('database closed while index was building')) |
61 | | - db.batch(batch, function (err) { |
62 | | - if(err) return cb(err) |
63 | | - since.set(batch[0].value.since) |
64 | | - |
65 | | - cb() |
66 | | - }) |
67 | | - }, function reduce (batch, data) { |
68 | | - if(data.sync) return batch |
69 | | - var seq = data.seq |
70 | | - |
71 | | - if(!batch) |
72 | | - batch = [{ |
73 | | - key: META, |
74 | | - value: {version: version, since: seq}, |
75 | | - valueEncoding: 'json', keyEncoding:'utf8', type: 'put' |
76 | | - }] |
77 | | - |
78 | | - |
79 | | - var indexed = map(data.value, data.seq) |
80 | | - batch = batch.concat(indexed.map(function (key) { return { key: key, value: seq, type: 'put' }})) |
81 | | - batch[0].value.since = Math.max(batch[0].value.since, seq) |
82 | | - return batch |
83 | | - }, 512, cb) |
84 | | - }, |
85 | | - |
86 | | - get: function (key, cb) { |
87 | | - |
88 | | - db.get(key, function (err, seq) { |
89 | | - if(err && err.name === 'NotFoundError') return cb(err) |
90 | | - if(err) cb(explain(err, 'flumeview-level.get: key not found:'+key)) |
91 | | - else |
92 | | - log.get(seq, function (err, value) { |
93 | | - if(err) cb(explain(err, 'flumeview-level.get: index for:'+key+'pointed at:'+seq+'but log error')) |
94 | | - else cb(null, value) |
95 | | - }) |
96 | | - }) |
97 | | - }, |
98 | | - read: function (opts) { |
99 | | - var keys = opts.keys |
100 | | - var values = opts.values |
101 | | - opts.keys = true; opts.values = true |
102 | | - |
103 | | - |
104 | | - var lower = ltgt.lowerBound(opts) |
105 | | - if(lower == null) opts.gt = null |
106 | | - |
107 | | - return pull( |
108 | | - pl.read(db, opts), |
109 | | - pull.filter(function (op) { |
110 | | - |
111 | | - return op.key !== '\u0000' |
112 | | - }), |
113 | | - Paramap(function (data, cb) { |
114 | | - if(data.sync) return cb(null, data) |
115 | | - log.get(data.value, function (err, value) { |
116 | | - if(err) cb(explain(err, 'when trying to retrive:'+data.key+'at since:'+log.since.value)) |
117 | | - else cb(null, {key: data.key, seq: data.value, value: value}) |
118 | | - }) |
119 | | - }) |
120 | | - ) |
121 | | - }, |
122 | | - close: close, |
123 | | - destroy: destroy |
124 | | - |
125 | | - } |
126 | | - } |
127 | | -} |