git ssb

2+

Dominic / flumedb



Tree: 5061fc482c53090c18f342b173684779699926bd

Files: 5061fc482c53090c18f342b173684779699926bd / index.js

2799 bytesRaw
1'use strict'
2var cont = require('cont')
3var pull = require('pull-stream')
4var PullCont = require('pull-cont')
5var path = require('path')
6var Obv = require('obv')
7//take a log, and return a log driver.
8//the log has an api with `read`, `get` `since`
9
10var wrap = require('./wrap')
11
12function map(obj, iter) {
13 var o = {}
14 for(var k in obj)
15 o[k] = iter(obj[k], k, obj)
16 return o
17}
18
19module.exports = function (log, isReady) {
20 var views = []
21 var meta = {}
22
23 log.get = count(log.get, 'get')
24
25 function count (fn, name) {
26 meta[name] = meta[name] || 0
27 return function (a, b) {
28 meta[name] ++
29 fn.call(this, a, b)
30 }
31 }
32
33 var ready = Obv()
34 ready.set(isReady !== undefined ? isReady : true)
35 var flume = {
36 closed: false,
37 dir: log.filename ? path.dirname(log.filename) : null,
38 //stream from the log
39 since: log.since,
40 ready: ready,
41 meta: meta,
42 append: function (value, cb) {
43 return log.append(value, cb)
44 },
45 stream: function (opts) {
46 return PullCont(function (cb) {
47 log.since.once(function () {
48 cb(null, log.stream(opts))
49 })
50 })
51 },
52 get: function (seq, cb) {
53 log.since.once(function () {
54 log.get(seq, cb)
55 })
56 },
57 use: function (name, createView) {
58 if(~Object.keys(flume).indexOf(name))
59 throw new Error(name + ' is already in use!')
60
61 var sv = createView(log, name)
62
63 views[name] = flume[name] = wrap(sv, log.since, ready)
64 meta[name] = flume[name].meta
65 sv.since.once(function rebuild (upto) {
66 pull(
67 log.stream({gt: upto, live: true, seqs: true, values: true}),
68 sv.createSink(function (err) {
69 if(err && !flume.closed) throw err
70 else if(!flume.closed)
71 sv.since.once(rebuild)
72 })
73 )
74 })
75
76 return flume
77 },
78 rebuild: function (cb) {
79 return cont.para(map(views, function (sv) {
80 return function (cb) {
81 sv.destroy(function (err) {
82 if(err) return cb(err)
83 //destroy should close the sink stream,
84 //which will restart the write.
85 var rm = sv.since(function (v) {
86 if(v === log.since.value) {
87 rm()
88 cb()
89 }
90 })
91 })
92 }
93 }))
94 (function (err) {
95 if(err) cb(err) //hopefully never happens
96
97 //then restream each streamview, and callback when it's uptodate with the main log.
98 })
99 },
100 close: function (cb) {
101 if(flume.closed) return cb()
102 flume.closed = true
103 cont.para(map(views, function (sv, k) {
104 return function (cb) {
105 if(sv.close) sv.close(cb)
106 else cb()
107 }
108 })) (cb)
109
110 }
111 }
112 return flume
113}
114

Built with git-ssb-web