git ssb

1+

Dominic / secure-scuttlebutt



Tree: 90ce5321ef13ea1e8baf2789adf6cc2e3beccf13

Files: 90ce5321ef13ea1e8baf2789adf6cc2e3beccf13 / legacy.js

3269 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var pl = require('pull-level')
4var Live = require('pull-live')
5var paramap = require('pull-paramap')
6var u = require('./util')
7var stdopts = u.options
8var Format = u.formatStream
9var msgFmt = u.format
10var timestamp = require('monotonic-timestamp')
11
12module.exports = function (db, flumedb) {
13
14 var logDB = db.sublevel('log')
15 db.pre(function (op, add, _batch) {
16 var msg = op.value
17 var id = op.key
18 // index by sequence number
19
20 var localtime = op.timestamp = timestamp()
21
22 add({
23 key: localtime, value: id,
24 type: 'put', prefix: logDB
25 })
26
27 })
28
29 function Limit (fn) {
30 return function (opts) {
31 if(opts && opts.limit && opts.limit > 0) {
32 var limit = opts.limit
33 var read = fn(opts)
34 return function (abort, cb) {
35 if(limit--) return read(abort, function (err, data) {
36 if(data && data.sync) limit ++
37 cb(err, data)
38 })
39 else read(true, cb)
40 }
41 }
42 else
43 return fn(opts)
44 }
45 }
46
47 db.createLogStream = Limit(Live(function (opts) {
48 opts = stdopts(opts)
49 var keys = opts.keys; delete opts.keys
50 var values = opts.values; delete opts.values
51 return pull(
52 pl.old(logDB, stdopts(opts)),
53 //lookup2(keys, values, 'timestamp')
54 paramap(function (data, cb) {
55 if(values == false) return cb(null, {key:data.value})
56 var key = data.value
57 var seq = data.key
58 db.get(key, function (err, value) {
59 if (err) cb(err)
60 else cb(null, msgFmt(keys, values, {key: key, value: value, timestamp: seq}))
61 })
62 })
63 )
64 }, function (opts) {
65 return pl.live(db, stdopts(opts))
66 }))
67
68 if(flumedb) {
69 var prog = {current: 0, start: 0, target: 0}
70
71 function one (opts, cb) {
72 pull(
73 db.createLogStream(opts),
74 pull.collect(function (err, ary) {
75 cb(err, ary[ary.length - 1])
76 })
77 )
78 }
79
80 one({reverse: true, limit: 1}, function (err, last) {
81 if(!last) ready() //empty legacy database.
82 else {
83 flumedb.since.once(function (v) {
84 if(v === -1) {
85 load(null)
86 }
87 else flumedb.get(v, function (err, data) {
88 if(err) throw err
89 if(data.timestamp < last.timestamp) {
90 load(data.timestamp)
91 }
92 else ready()
93 })
94 })
95 }
96
97 function load(since) {
98 // fast track for more accurate progress
99 flumedb.progress.migration = prog
100 var c = 0
101 pull(
102 pl.old(logDB, {gt: since, values: false}),
103 pull.drain(function () {
104 c++
105 }, function () {
106 prog.target = c
107 migrate()
108 })
109 )
110
111 function migrate () {
112 // actual upgrade
113 pull(
114 db.createLogStream({gt: since}),
115 paramap(function (data, cb) {
116 prog.current += 1
117 flumedb.rawAppend(data, cb)
118 }, 32),
119 pull.drain(null, ready)
120 )
121 }
122 }
123 function ready (err) {
124 if(err) throw err
125 flumedb.ready.set(true)
126 }
127 })
128 }
129}
130
131

Built with git-ssb-web