Files: 5061fc482c53090c18f342b173684779699926bd / index.js
2799 bytesRaw
1 | |
2 | var cont = require('cont') |
3 | var pull = require('pull-stream') |
4 | var PullCont = require('pull-cont') |
5 | var path = require('path') |
6 | var Obv = require('obv') |
7 | //take a log, and return a log driver. |
8 | //the log has an api with `read`, `get` `since` |
9 | |
10 | var wrap = require('./wrap') |
11 | |
12 | function map(obj, iter) { |
13 | var o = {} |
14 | for(var k in obj) |
15 | o[k] = iter(obj[k], k, obj) |
16 | return o |
17 | } |
18 | |
19 | module.exports = function (log, isReady) { |
20 | var views = [] |
21 | var meta = {} |
22 | |
23 | log.get = count(log.get, 'get') |
24 | |
25 | function count (fn, name) { |
26 | meta[name] = meta[name] || 0 |
27 | return function (a, b) { |
28 | meta[name] ++ |
29 | fn.call(this, a, b) |
30 | } |
31 | } |
32 | |
33 | var ready = Obv() |
34 | ready.set(isReady !== undefined ? isReady : true) |
35 | var flume = { |
36 | closed: false, |
37 | dir: log.filename ? path.dirname(log.filename) : null, |
38 | //stream from the log |
39 | since: log.since, |
40 | ready: ready, |
41 | meta: meta, |
42 | append: function (value, cb) { |
43 | return log.append(value, cb) |
44 | }, |
45 | stream: function (opts) { |
46 | return PullCont(function (cb) { |
47 | log.since.once(function () { |
48 | cb(null, log.stream(opts)) |
49 | }) |
50 | }) |
51 | }, |
52 | get: function (seq, cb) { |
53 | log.since.once(function () { |
54 | log.get(seq, cb) |
55 | }) |
56 | }, |
57 | use: function (name, createView) { |
58 | if(~Object.keys(flume).indexOf(name)) |
59 | throw new Error(name + ' is already in use!') |
60 | |
61 | var sv = createView(log, name) |
62 | |
63 | views[name] = flume[name] = wrap(sv, log.since, ready) |
64 | meta[name] = flume[name].meta |
65 | sv.since.once(function rebuild (upto) { |
66 | pull( |
67 | log.stream({gt: upto, live: true, seqs: true, values: true}), |
68 | sv.createSink(function (err) { |
69 | if(err && !flume.closed) throw err |
70 | else if(!flume.closed) |
71 | sv.since.once(rebuild) |
72 | }) |
73 | ) |
74 | }) |
75 | |
76 | return flume |
77 | }, |
78 | rebuild: function (cb) { |
79 | return cont.para(map(views, function (sv) { |
80 | return function (cb) { |
81 | sv.destroy(function (err) { |
82 | if(err) return cb(err) |
83 | //destroy should close the sink stream, |
84 | //which will restart the write. |
85 | var rm = sv.since(function (v) { |
86 | if(v === log.since.value) { |
87 | rm() |
88 | cb() |
89 | } |
90 | }) |
91 | }) |
92 | } |
93 | })) |
94 | (function (err) { |
95 | if(err) cb(err) //hopefully never happens |
96 | |
97 | //then restream each streamview, and callback when it's uptodate with the main log. |
98 | }) |
99 | }, |
100 | close: function (cb) { |
101 | if(flume.closed) return cb() |
102 | flume.closed = true |
103 | cont.para(map(views, function (sv, k) { |
104 | return function (cb) { |
105 | if(sv.close) sv.close(cb) |
106 | else cb() |
107 | } |
108 | })) (cb) |
109 | |
110 | } |
111 | } |
112 | return flume |
113 | } |
114 |
Built with git-ssb-web