Files: 8f18c58c8aa15bbde2a5464401866c1dc61bf372 / index.js
3916 bytesRaw
1 | |
2 | var cont = require('cont') |
3 | var PullCont = require('pull-cont') |
4 | var pull = require('pull-stream') |
5 | var path = require('path') |
6 | var Obv = require('obv') |
7 | //take a log, and return a log driver. |
8 | //the log has an api with `read`, `get` `since` |
9 | |
10 | function wrap(sv, since, isReady) { |
11 | var waiting = [] |
12 | |
13 | sv.since(function (upto) { |
14 | if(!isReady.value) return |
15 | while(waiting.length && waiting[0].seq <= upto) |
16 | waiting.shift().cb() |
17 | }) |
18 | |
19 | isReady(function (ready) { |
20 | if(!ready) return |
21 | var upto = sv.since.value |
22 | if(upto == undefined) return |
23 | while(waiting.length && waiting[0].seq <= upto) |
24 | waiting.shift().cb() |
25 | }) |
26 | |
27 | function ready (cb) { |
28 | if(isReady.value && since.value != null && since.value === sv.since.value) cb() |
29 | else |
30 | since.once(function (upto) { |
31 | if(isReady.value && upto === sv.since.value) cb() |
32 | else waiting.push({seq: upto, cb: cb}) |
33 | }) |
34 | } |
35 | |
36 | var wrapper = { |
37 | source: function (fn) { |
38 | return function (opts) { |
39 | return PullCont(function (cb) { |
40 | ready(function () { cb(null, fn(opts)) }) |
41 | }) |
42 | } |
43 | }, |
44 | async: function (fn) { |
45 | return function (opts, cb) { |
46 | ready(function () { |
47 | fn(opts, cb) |
48 | }) |
49 | } |
50 | }, |
51 | sync: function (fn) { return fn } |
52 | } |
53 | |
54 | var o = {ready: ready, since: sv.since, close: wrapper.async(sv.close || function (cb) { return cb() }) } |
55 | if(!sv.methods) throw new Error('a stream view must have methods property') |
56 | |
57 | for(var key in sv.methods) { |
58 | var type = sv.methods[key] |
59 | var fn = sv[key] |
60 | if(typeof fn !== 'function') throw new Error('expected function named:'+key+'of type: '+type) |
61 | //type must be either source, async, or sync |
62 | o[key] = wrapper[type](fn) |
63 | } |
64 | |
65 | o.methods = sv.methods |
66 | |
67 | return o |
68 | } |
69 | |
70 | |
71 | function map(obj, iter) { |
72 | var o = {} |
73 | for(var k in obj) |
74 | o[k] = iter(obj[k], k, obj) |
75 | return o |
76 | } |
77 | |
78 | module.exports = function (log, isReady) { |
79 | var views = [] |
80 | var ready = Obv() |
81 | ready.set(isReady !== undefined ? isReady : true) |
82 | var flume = { |
83 | dir: log.filename ? path.dirname(log.filename) : null, |
84 | //stream from the log |
85 | since: log.since, |
86 | ready: ready, |
87 | append: function (value, cb) { |
88 | return log.append(value, cb) |
89 | }, |
90 | stream: function (opts) { |
91 | return PullCont(function (cb) { |
92 | log.since.once(function () { |
93 | cb(null, log.stream(opts)) |
94 | }) |
95 | }) |
96 | }, |
97 | get: function (seq, cb) { |
98 | log.since.once(function () { |
99 | log.get(seq, cb) |
100 | }) |
101 | }, |
102 | use: function (name, createView) { |
103 | if(~Object.keys(flume).indexOf(name)) |
104 | throw new Error(name + ' is already in use!') |
105 | |
106 | var sv = createView(log, name) |
107 | |
108 | views[name] = flume[name] = wrap(sv, log.since, ready) |
109 | |
110 | sv.since.once(function (upto) { |
111 | pull( |
112 | log.stream({gt: upto, live: true, seqs: true, values: true}), |
113 | sv.createSink(function (err) { |
114 | if(err) console.error(err) |
115 | }) |
116 | ) |
117 | }) |
118 | |
119 | return flume |
120 | }, |
121 | rebuild: function (cb) { |
122 | return cont.para(map(views, function (sv) { |
123 | return function (cb) { |
124 | sv.destroy(function (err) { |
125 | if(err) return cb(err) |
126 | sv.since.once(function (upto) { |
127 | pull( |
128 | log.stream({gt: upto, live: true, seqs: true, values: true}), |
129 | sv.createSink(function (err) { |
130 | if(err) console.error(err) |
131 | }) |
132 | ) |
133 | }) |
134 | }) |
135 | } |
136 | })) |
137 | (function (err) { |
138 | if(err) cb(err) //hopefully never happens |
139 | |
140 | //then restream each streamview, and callback when it's uptodate with the main log. |
141 | }) |
142 | }, |
143 | close: function (cb) { |
144 | cont.para(map(views, function (sv) { |
145 | return function (cb) { |
146 | if(sv.close) sv.close(cb) |
147 | else cb() |
148 | } |
149 | })) (cb) |
150 | |
151 | } |
152 | } |
153 | return flume |
154 | } |
155 | |
156 |
Built with git-ssb-web