Files: 66945405e614808df83374c5ee2b994ef5782b9a / index.js
2788 bytesRaw
1 | |
2 | var cont = require('cont') |
3 | var pull = require('pull-stream') |
4 | var PullCont = require('pull-cont') |
5 | var path = require('path') |
6 | var Obv = require('obv') |
7 | //take a log, and return a log driver. |
8 | //the log has an api with `read`, `get` `since` |
9 | |
10 | var wrap = require('./wrap') |
11 | |
12 | function map(obj, iter) { |
13 | var o = {} |
14 | for(var k in obj) |
15 | o[k] = iter(obj[k], k, obj) |
16 | return o |
17 | } |
18 | |
19 | module.exports = function (log, isReady) { |
20 | var views = [] |
21 | var meta = {} |
22 | |
23 | log.get = count(log.get, 'get') |
24 | |
25 | function count (fn, name) { |
26 | meta[name] = meta[name] || 0 |
27 | return function (a, b) { |
28 | meta[name] ++ |
29 | fn.call(this, a, b) |
30 | } |
31 | } |
32 | |
33 | var ready = Obv() |
34 | ready.set(isReady !== undefined ? isReady : true) |
35 | var flume = { |
36 | closed: false, |
37 | dir: log.filename ? path.dirname(log.filename) : null, |
38 | //stream from the log |
39 | since: log.since, |
40 | ready: ready, |
41 | meta: meta, |
42 | append: function (value, cb) { |
43 | return log.append(value, cb) |
44 | }, |
45 | stream: function (opts) { |
46 | return PullCont(function (cb) { |
47 | log.since.once(function () { |
48 | cb(null, log.stream(opts)) |
49 | }) |
50 | }) |
51 | }, |
52 | get: function (seq, cb) { |
53 | log.since.once(function () { |
54 | log.get(seq, cb) |
55 | }) |
56 | }, |
57 | use: function (name, createView) { |
58 | if(~Object.keys(flume).indexOf(name)) |
59 | throw new Error(name + ' is already in use!') |
60 | |
61 | var sv = createView(log, name) |
62 | |
63 | views[name] = flume[name] = wrap(sv, log.since, ready) |
64 | meta[name] = flume[name].meta |
65 | sv.since.once(function (upto) { |
66 | pull( |
67 | log.stream({gt: upto, live: true, seqs: true, values: true}), |
68 | sv.createSink(function (err) { |
69 | if(err && !flume.closed) console.error(err) |
70 | }) |
71 | ) |
72 | }) |
73 | |
74 | return flume |
75 | }, |
76 | rebuild: function (cb) { |
77 | return cont.para(map(views, function (sv) { |
78 | return function (cb) { |
79 | sv.destroy(function (err) { |
80 | if(err) return cb(err) |
81 | sv.since.once(function (upto) { |
82 | pull( |
83 | log.stream({gt: upto, live: true, seqs: true, values: true}), |
84 | sv.createSink(function (err) { |
85 | if(err) console.error(err) |
86 | }) |
87 | ) |
88 | }) |
89 | }) |
90 | } |
91 | })) |
92 | (function (err) { |
93 | if(err) cb(err) //hopefully never happens |
94 | |
95 | //then restream each streamview, and callback when it's uptodate with the main log. |
96 | }) |
97 | }, |
98 | close: function (cb) { |
99 | if(flume.closed) throw new Error('already closed') |
100 | flume.closed = true |
101 | cont.para(map(views, function (sv, k) { |
102 | return function (cb) { |
103 | if(sv.close) sv.close(cb) |
104 | else cb() |
105 | } |
106 | })) (cb) |
107 | |
108 | } |
109 | } |
110 | return flume |
111 | } |
112 | |
113 | |
114 | |
115 | |
116 | |
117 | |
118 | |
119 | |
120 | |
121 | |
122 |
Built with git-ssb-web