Files: 9a7e43ac498a13a9720df2ba9fdd94eafe23d04d / legacy.js
3236 bytesRaw
1 | |
2 | var pull = require('pull-stream') |
3 | var pl = require('pull-level') |
4 | var Live = require('pull-live') |
5 | var paramap = require('pull-paramap') |
6 | var u = require('./util') |
7 | var stdopts = u.options |
8 | var Format = u.formatStream |
9 | var msgFmt = u.format |
10 | var timestamp = require('monotonic-timestamp') |
11 | |
12 | module.exports = function (db, flumedb) { |
13 | |
14 | var logDB = db.sublevel('log') |
15 | db.pre(function (op, add, _batch) { |
16 | var msg = op.value |
17 | var id = op.key |
18 | // index by sequence number |
19 | |
20 | var localtime = op.timestamp = timestamp() |
21 | |
22 | add({ |
23 | key: localtime, value: id, |
24 | type: 'put', prefix: logDB |
25 | }) |
26 | |
27 | }) |
28 | |
29 | function Limit (fn) { |
30 | return function (opts) { |
31 | if(opts && opts.limit && opts.limit > 0) { |
32 | var limit = opts.limit |
33 | var read = fn(opts) |
34 | return function (abort, cb) { |
35 | if(limit--) return read(abort, function (err, data) { |
36 | if(data && data.sync) limit ++ |
37 | cb(err, data) |
38 | }) |
39 | else read(true, cb) |
40 | } |
41 | } |
42 | else |
43 | return fn(opts) |
44 | } |
45 | } |
46 | |
47 | db.createLogStream = Limit(Live(function (opts) { |
48 | opts = stdopts(opts) |
49 | var keys = opts.keys; delete opts.keys |
50 | var values = opts.values; delete opts.values |
51 | return pull( |
52 | pl.old(logDB, stdopts(opts)), |
53 | //lookup2(keys, values, 'timestamp') |
54 | paramap(function (data, cb) { |
55 | if(values == false) return cb(null, {key:data.value}) |
56 | var key = data.value |
57 | var seq = data.key |
58 | db.get(key, function (err, value) { |
59 | if (err) cb(err) |
60 | else cb(null, msgFmt(keys, values, {key: key, value: value, timestamp: seq})) |
61 | }) |
62 | }) |
63 | ) |
64 | }, function (opts) { |
65 | return pl.live(db, stdopts(opts)) |
66 | })) |
67 | |
68 | if(flumedb) { |
69 | var prog = {current: 0, start: 0, target: 0} |
70 | |
71 | function one (opts, cb) { |
72 | pull( |
73 | db.createLogStream(opts), |
74 | pull.collect(function (err, ary) { |
75 | cb(err, ary[ary.length - 1]) |
76 | }) |
77 | ) |
78 | } |
79 | |
80 | one({reverse: true, limit: 1}, function (err, last) { |
81 | if(!last) ready() //empty legacy database. |
82 | else { |
83 | flumedb.since.once(function (v) { |
84 | if(v === -1) { |
85 | load(null) |
86 | } |
87 | else flumedb.get(v, function (err, data) { |
88 | if(err) throw err |
89 | if(data.timestamp < last.timestamp) { |
90 | load(data.timestamp) |
91 | } |
92 | else ready() |
93 | }) |
94 | }) |
95 | } |
96 | |
97 | function load(since) { |
98 | // fast track for more accurate progress |
99 | flumedb.progress.migration = prog |
100 | var c = 0 |
101 | pull( |
102 | pl.old(logDB, {gt: since, values: false}), |
103 | pull.drain(function () { |
104 | c++ |
105 | }, function () { |
106 | prog.target = c |
107 | migrate() |
108 | }) |
109 | ) |
110 | |
111 | function migrate () { |
112 | // actual upgrade |
113 | pull( |
114 | db.createLogStream({gt: since}), |
115 | paramap(function (data, cb) { |
116 | prog.current += 1 |
117 | flumedb.append(data, cb) |
118 | }, 32), |
119 | pull.drain(null, ready) |
120 | ) |
121 | } |
122 | } |
123 | function ready () { |
124 | flumedb.ready.set(true) |
125 | } |
126 | }) |
127 | } |
128 | } |
129 |
Built with git-ssb-web