git ssb

1+

Dominic / secure-scuttlebutt



Tree: 6f0c7afd16d33393e0a972dcf17ba5d5a4d48df7

Files: 6f0c7afd16d33393e0a972dcf17ba5d5a4d48df7 / legacy.js

3194 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var pl = require('pull-level')
4var Live = require('pull-live')
5var paramap = require('pull-paramap')
6var u = require('./util')
7var stdopts = u.options
8var Format = u.formatStream
9var msgFmt = u.format
10var timestamp = require('monotonic-timestamp')
11
12module.exports = function (db, flumedb) {
13
14 var logDB = db.sublevel('log')
15 db.pre(function (op, add, _batch) {
16 var msg = op.value
17 var id = op.key
18 // index by sequence number
19
20 var localtime = op.timestamp = timestamp()
21
22 add({
23 key: localtime, value: id,
24 type: 'put', prefix: logDB
25 })
26
27 })
28
29 function Limit (fn) {
30 return function (opts) {
31 if(opts && opts.limit && opts.limit > 0) {
32 var limit = opts.limit
33 var read = fn(opts)
34 return function (abort, cb) {
35 if(limit--) return read(abort, function (err, data) {
36 if(data && data.sync) limit ++
37 cb(err, data)
38 })
39 else read(true, cb)
40 }
41 }
42 else
43 return fn(opts)
44 }
45 }
46
47 db.createLogStream = Limit(Live(function (opts) {
48 opts = stdopts(opts)
49 var keys = opts.keys; delete opts.keys
50 var values = opts.values; delete opts.values
51 return pull(
52 pl.old(logDB, stdopts(opts)),
53 //lookup2(keys, values, 'timestamp')
54 paramap(function (data, cb) {
55 var key = data.value
56 var seq = data.key
57 db.get(key, function (err, value) {
58 if (err) cb(err)
59 else cb(null, msgFmt(keys, values, {key: key, value: value, timestamp: seq}))
60 })
61 })
62 )
63 }, function (opts) {
64 return pl.live(db, stdopts(opts))
65 }))
66
67 if(flumedb) {
68 var prog = {}
69 function one (opts, cb) {
70 pull(
71 db.createLogStream(opts),
72 pull.collect(function (err, ary) {
73 cb(err, ary[ary.length - 1])
74 })
75 )
76 }
77
78 function update (since) {
79 var start = (prog.start = prog.start ? prog.start : +since)
80 prog.current = +since
81 }
82
83 one({reverse: true, limit: 1}, function (err, last) {
84 if(!last) ready() //empty legacy database.
85 else {
86 flumedb.since.once(function (v) {
87 if(v === -1) {
88 prog = flumedb.progress.migration = {
89 start: 0,
90 current: 0,
91 target: +last.timestamp
92 }
93 load(null)
94 }
95 else flumedb.get(v, function (err, data) {
96 if(err) throw err
97 if(data.timestamp < last.timestamp) {
98 prog = flumedb.progress.migration = {
99 start: data.timestamp,
100 current: 0,
101 target: +last.timestamp
102 }
103 load(data.timestamp)
104 }
105 else ready()
106 })
107 })
108 }
109
110 function load(since) {
111 pull(
112 db.createLogStream({gt: since}),
113 paramap(function (data, cb) {
114 update(data.timestamp)
115 flumedb.append(data, cb)
116 }, 32),
117 pull.drain(null, ready)
118 )
119 }
120 function ready () {
121 flumedb.ready.set(true)
122 }
123 })
124 }
125}
126
127

Built with git-ssb-web