git ssb

2+

Dominic / flumedb



Tree: 1aa3be6280ca2010ee2bb6ded5abc8324e4a924d

Files: 1aa3be6280ca2010ee2bb6ded5abc8324e4a924d / index.js

2788 bytesRaw
1'use strict'
2var cont = require('cont')
3var pull = require('pull-stream')
4var PullCont = require('pull-cont')
5var path = require('path')
6var Obv = require('obv')
7//take a log, and return a log driver.
8//the log has an api with `read`, `get` `since`
9
10var wrap = require('./wrap')
11
12function map(obj, iter) {
13 var o = {}
14 for(var k in obj)
15 o[k] = iter(obj[k], k, obj)
16 return o
17}
18
19module.exports = function (log, isReady) {
20 var views = []
21 var meta = {}
22
23 log.get = count(log.get, 'get')
24
25 function count (fn, name) {
26 meta[name] = meta[name] || 0
27 return function (a, b) {
28 meta[name] ++
29 fn.call(this, a, b)
30 }
31 }
32
33 var ready = Obv()
34 ready.set(isReady !== undefined ? isReady : true)
35 var flume = {
36 closed: false,
37 dir: log.filename ? path.dirname(log.filename) : null,
38 //stream from the log
39 since: log.since,
40 ready: ready,
41 meta: meta,
42 append: function (value, cb) {
43 return log.append(value, cb)
44 },
45 stream: function (opts) {
46 return PullCont(function (cb) {
47 log.since.once(function () {
48 cb(null, log.stream(opts))
49 })
50 })
51 },
52 get: function (seq, cb) {
53 log.since.once(function () {
54 log.get(seq, cb)
55 })
56 },
57 use: function (name, createView) {
58 if(~Object.keys(flume).indexOf(name))
59 throw new Error(name + ' is already in use!')
60
61 var sv = createView(log, name)
62
63 views[name] = flume[name] = wrap(sv, log.since, ready)
64 meta[name] = flume[name].meta
65 sv.since.once(function (upto) {
66 pull(
67 log.stream({gt: upto, live: true, seqs: true, values: true}),
68 sv.createSink(function (err) {
69 if(err && !flume.closed) console.error(err)
70 })
71 )
72 })
73
74 return flume
75 },
76 rebuild: function (cb) {
77 return cont.para(map(views, function (sv) {
78 return function (cb) {
79 sv.destroy(function (err) {
80 if(err) return cb(err)
81 sv.since.once(function (upto) {
82 pull(
83 log.stream({gt: upto, live: true, seqs: true, values: true}),
84 sv.createSink(function (err) {
85 if(err) console.error(err)
86 })
87 )
88 })
89 })
90 }
91 }))
92 (function (err) {
93 if(err) cb(err) //hopefully never happens
94
95 //then restream each streamview, and callback when it's uptodate with the main log.
96 })
97 },
98 close: function (cb) {
99 if(flume.closed) throw new Error('already closed')
100 flume.closed = true
101 cont.para(map(views, function (sv, k) {
102 return function (cb) {
103 if(sv.close) sv.close(cb)
104 else cb()
105 }
106 })) (cb)
107
108 }
109 }
110 return flume
111}
112
113
114
115
116
117
118
119
120
121
122

Built with git-ssb-web