git ssb

0+

Matt McKegg / ssb-backlinks



Commit 2bd24f99a6e985cb6f8d40b8f26bf2692eaab9f2

fork flumeview-level to fix reindex

until flumedb/flumeview-level#3 is merged!
Matt McKegg committed on 6/15/2017, 5:17:14 AM
Parent: 92fbdafe376f4faa01b901b89e29eeb6927ade99

Files changed

lib/flumeview-links-raw.jschanged
lib/flumeview-level.jsadded
lib/flumeview-links-raw.jsView
@@ -6,9 +6,9 @@
lib/flumeview-level.jsView
@@ -1,0 +1,127 @@
1 +// FORKED until https://github.com/flumedb/flumeview-level/pull/3 is merged!
2 +var pull = require('pull-stream')
3 +var Level = require('level')
4 +var bytewise = require('bytewise')
5 +var Write = require('pull-write')
6 +var pl = require('pull-level')
7 +var Obv = require('obv')
8 +var path = require('path')
9 +var Paramap = require('pull-paramap')
10 +var ltgt = require('ltgt')
11 +var explain = require('explain-error')
12 +
13 +module.exports = function (version, map) {
14 + return function (log, name) {
15 + var db = create(path), writer
16 +
17 + var META = '\x00', since = Obv()
18 +
19 + var written = 0, closed
20 +
21 + function create() {
22 + closed = false
23 + if(!log.filename)
24 + throw new Error('flumeview-level can only be used with a log that provides a directory')
25 + var dir = path.dirname(log.filename)
26 + return Level(path.join(dir, name), {keyEncoding: bytewise, valueEncoding: 'json'})
27 + }
28 +
29 + function close (cb) {
30 + closed = true
31 + //todo: move this bit into pull-write
32 + if(writer) writer.abort(function () { db.close(cb) })
33 + else db.close(cb)
34 + }
35 +
36 + function destroy (cb) {
37 + close(function () {
38 + var dbPath = path.join(path.dirname(log.filename), name)
39 + Level.destroy(dbPath, cb)
40 + })
41 + }
42 +
43 + db.get(META, {keyEncoding: 'utf8'}, function (err, value) {
44 + if(err) since.set(-1)
45 + else if(value.version === version)
46 + since.set(value.since)
47 + else //version has changed, wipe db and start over.
48 + destroy(function () {
49 + db = create(path); since.set(-1)
50 + })
51 + })
52 +
53 + var since = Obv()
54 +
55 + return {
56 + since: since,
57 + methods: { get: 'async', read: 'source'},
58 + createSink: function (cb) {
59 + return writer = Write(function (batch, cb) {
60 + if(closed) return cb(new Error('database closed while index was building'))
61 + db.batch(batch, function (err) {
62 + if(err) return cb(err)
63 + since.set(batch[0].value.since)
64 + //callback to anyone waiting for this point.
65 + cb()
66 + })
67 + }, function reduce (batch, data) {
68 + if(data.sync) return batch
69 + var seq = data.seq
70 +
71 + if(!batch)
72 + batch = [{
73 + key: META,
74 + value: {version: version, since: seq},
75 + valueEncoding: 'json', keyEncoding:'utf8', type: 'put'
76 + }]
77 +
78 + //map must return an array (like flatmap) with zero or more values
79 + var indexed = map(data.value, data.seq)
80 + batch = batch.concat(indexed.map(function (key) { return { key: key, value: seq, type: 'put' }}))
81 + batch[0].value.since = Math.max(batch[0].value.since, seq)
82 + return batch
83 + }, 512, cb)
84 + },
85 +
86 + get: function (key, cb) {
87 + //wait until the log has been processed up to the current point.
88 + db.get(key, function (err, seq) {
89 + if(err && err.name === 'NotFoundError') return cb(err)
90 + if(err) cb(explain(err, 'flumeview-level.get: key not found:'+key))
91 + else
92 + log.get(seq, function (err, value) {
93 + if(err) cb(explain(err, 'flumeview-level.get: index for:'+key+'pointed at:'+seq+'but log error'))
94 + else cb(null, value)
95 + })
96 + })
97 + },
98 + read: function (opts) {
99 + var keys = opts.keys
100 + var values = opts.values
101 + opts.keys = true; opts.values = true
102 + //TODO: preserve whatever the user passed in on opts...
103 +
104 + var lower = ltgt.lowerBound(opts)
105 + if(lower == null) opts.gt = null
106 +
107 + return pull(
108 + pl.read(db, opts),
109 + pull.filter(function (op) {
110 + //this is an ugly hack! ); but it stops the index metadata appearing in the live stream
111 + return op.key !== '\u0000'
112 + }),
113 + Paramap(function (data, cb) {
114 + if(data.sync) return cb(null, data)
115 + log.get(data.value, function (err, value) {
116 + if(err) cb(explain(err, 'when trying to retrive:'+data.key+'at since:'+log.since.value))
117 + else cb(null, {key: data.key, seq: data.value, value: value})
118 + })
119 + })
120 + )
121 + },
122 + close: close,
123 + destroy: destroy
124 + //put, del, batch - leave these out for now, since the indexes just map.
125 + }
126 + }
127 +}

Built with git-ssb-web