Files: 6f0c7afd16d33393e0a972dcf17ba5d5a4d48df7 / legacy.js
3194 bytesRaw
1 | |
2 | var pull = require('pull-stream') |
3 | var pl = require('pull-level') |
4 | var Live = require('pull-live') |
5 | var paramap = require('pull-paramap') |
6 | var u = require('./util') |
7 | var stdopts = u.options |
8 | var Format = u.formatStream |
9 | var msgFmt = u.format |
10 | var timestamp = require('monotonic-timestamp') |
11 | |
12 | module.exports = function (db, flumedb) { |
13 | |
14 | var logDB = db.sublevel('log') |
15 | db.pre(function (op, add, _batch) { |
16 | var msg = op.value |
17 | var id = op.key |
18 | // index by sequence number |
19 | |
20 | var localtime = op.timestamp = timestamp() |
21 | |
22 | add({ |
23 | key: localtime, value: id, |
24 | type: 'put', prefix: logDB |
25 | }) |
26 | |
27 | }) |
28 | |
29 | function Limit (fn) { |
30 | return function (opts) { |
31 | if(opts && opts.limit && opts.limit > 0) { |
32 | var limit = opts.limit |
33 | var read = fn(opts) |
34 | return function (abort, cb) { |
35 | if(limit--) return read(abort, function (err, data) { |
36 | if(data && data.sync) limit ++ |
37 | cb(err, data) |
38 | }) |
39 | else read(true, cb) |
40 | } |
41 | } |
42 | else |
43 | return fn(opts) |
44 | } |
45 | } |
46 | |
47 | db.createLogStream = Limit(Live(function (opts) { |
48 | opts = stdopts(opts) |
49 | var keys = opts.keys; delete opts.keys |
50 | var values = opts.values; delete opts.values |
51 | return pull( |
52 | pl.old(logDB, stdopts(opts)), |
53 | //lookup2(keys, values, 'timestamp') |
54 | paramap(function (data, cb) { |
55 | var key = data.value |
56 | var seq = data.key |
57 | db.get(key, function (err, value) { |
58 | if (err) cb(err) |
59 | else cb(null, msgFmt(keys, values, {key: key, value: value, timestamp: seq})) |
60 | }) |
61 | }) |
62 | ) |
63 | }, function (opts) { |
64 | return pl.live(db, stdopts(opts)) |
65 | })) |
66 | |
67 | if(flumedb) { |
68 | var prog = {} |
69 | function one (opts, cb) { |
70 | pull( |
71 | db.createLogStream(opts), |
72 | pull.collect(function (err, ary) { |
73 | cb(err, ary[ary.length - 1]) |
74 | }) |
75 | ) |
76 | } |
77 | |
78 | function update (since) { |
79 | var start = (prog.start = prog.start ? prog.start : +since) |
80 | prog.current = +since |
81 | } |
82 | |
83 | one({reverse: true, limit: 1}, function (err, last) { |
84 | if(!last) ready() //empty legacy database. |
85 | else { |
86 | flumedb.since.once(function (v) { |
87 | if(v === -1) { |
88 | prog = flumedb.progress.migration = { |
89 | start: 0, |
90 | current: 0, |
91 | target: +last.timestamp |
92 | } |
93 | load(null) |
94 | } |
95 | else flumedb.get(v, function (err, data) { |
96 | if(err) throw err |
97 | if(data.timestamp < last.timestamp) { |
98 | prog = flumedb.progress.migration = { |
99 | start: data.timestamp, |
100 | current: 0, |
101 | target: +last.timestamp |
102 | } |
103 | load(data.timestamp) |
104 | } |
105 | else ready() |
106 | }) |
107 | }) |
108 | } |
109 | |
110 | function load(since) { |
111 | pull( |
112 | db.createLogStream({gt: since}), |
113 | paramap(function (data, cb) { |
114 | update(data.timestamp) |
115 | flumedb.append(data, cb) |
116 | }, 32), |
117 | pull.drain(null, ready) |
118 | ) |
119 | } |
120 | function ready () { |
121 | flumedb.ready.set(true) |
122 | } |
123 | }) |
124 | } |
125 | } |
126 | |
127 |
Built with git-ssb-web