git ssb

0+

Matt McKegg / ssb-backlinks



Tree: de9181b8c2bcabd82926896eb949284d98bb634e

Files: de9181b8c2bcabd82926896eb949284d98bb634e / lib / flumeview-level.js

4269 bytesRaw
1// FORKED until https://github.com/flumedb/flumeview-level/pull/3 is merged!
2var pull = require('pull-stream')
3var Level = require('level')
4var bytewise = require('bytewise')
5var Write = require('pull-write')
6var pl = require('pull-level')
7var Obv = require('obv')
8var path = require('path')
9var Paramap = require('pull-paramap')
10var ltgt = require('ltgt')
11var explain = require('explain-error')
12
13module.exports = function (version, map) {
14 return function (log, name) {
15 var db = create(path), writer
16
17 var META = '\x00', since = Obv()
18
19 var written = 0, closed
20
21 function create() {
22 closed = false
23 if(!log.filename)
24 throw new Error('flumeview-level can only be used with a log that provides a directory')
25 var dir = path.dirname(log.filename)
26 return Level(path.join(dir, name), {keyEncoding: bytewise, valueEncoding: 'json'})
27 }
28
29 function close (cb) {
30 closed = true
31 //todo: move this bit into pull-write
32 if(writer) writer.abort(function () { db.close(cb) })
33 else db.close(cb)
34 }
35
36 function destroy (cb) {
37 close(function () {
38 var dbPath = path.join(path.dirname(log.filename), name)
39 Level.destroy(dbPath, cb)
40 })
41 }
42
43 db.get(META, {keyEncoding: 'utf8'}, function (err, value) {
44 if(err) since.set(-1)
45 else if(value.version === version)
46 since.set(value.since)
47 else //version has changed, wipe db and start over.
48 destroy(function () {
49 db = create(path); since.set(-1)
50 })
51 })
52
53 var since = Obv()
54
55 return {
56 since: since,
57 methods: { get: 'async', read: 'source'},
58 createSink: function (cb) {
59 return writer = Write(function (batch, cb) {
60 if(closed) return cb(new Error('database closed while index was building'))
61 db.batch(batch, function (err) {
62 if(err) return cb(err)
63 since.set(batch[0].value.since)
64 //callback to anyone waiting for this point.
65 cb()
66 })
67 }, function reduce (batch, data) {
68 if(data.sync) return batch
69 var seq = data.seq
70
71 if(!batch)
72 batch = [{
73 key: META,
74 value: {version: version, since: seq},
75 valueEncoding: 'json', keyEncoding:'utf8', type: 'put'
76 }]
77
78 //map must return an array (like flatmap) with zero or more values
79 var indexed = map(data.value, data.seq)
80 batch = batch.concat(indexed.map(function (key) { return { key: key, value: seq, type: 'put' }}))
81 batch[0].value.since = Math.max(batch[0].value.since, seq)
82 return batch
83 }, 512, cb)
84 },
85
86 get: function (key, cb) {
87 //wait until the log has been processed up to the current point.
88 db.get(key, function (err, seq) {
89 if(err && err.name === 'NotFoundError') return cb(err)
90 if(err) cb(explain(err, 'flumeview-level.get: key not found:'+key))
91 else
92 log.get(seq, function (err, value) {
93 if(err) cb(explain(err, 'flumeview-level.get: index for:'+key+'pointed at:'+seq+'but log error'))
94 else cb(null, value)
95 })
96 })
97 },
98 read: function (opts) {
99 var keys = opts.keys
100 var values = opts.values
101 opts.keys = true; opts.values = true
102 //TODO: preserve whatever the user passed in on opts...
103
104 var lower = ltgt.lowerBound(opts)
105 if(lower == null) opts.gt = null
106
107 return pull(
108 pl.read(db, opts),
109 pull.filter(function (op) {
110 //this is an ugly hack! ); but it stops the index metadata appearing in the live stream
111 return op.key !== '\u0000'
112 }),
113 Paramap(function (data, cb) {
114 if(data.sync) return cb(null, data)
115 log.get(data.value, function (err, value) {
116 if(err) cb(explain(err, 'when trying to retrive:'+data.key+'at since:'+log.since.value))
117 else cb(null, {key: data.key, seq: data.value, value: value})
118 })
119 })
120 )
121 },
122 close: close,
123 destroy: destroy
124 //put, del, batch - leave these out for now, since the indexes just map.
125 }
126 }
127}
128

Built with git-ssb-web