git ssb

2+

Dominic / flumedb



Tree: 8f18c58c8aa15bbde2a5464401866c1dc61bf372

Files: 8f18c58c8aa15bbde2a5464401866c1dc61bf372 / index.js

3916 bytesRaw
1'use strict'
2var cont = require('cont')
3var PullCont = require('pull-cont')
4var pull = require('pull-stream')
5var path = require('path')
6var Obv = require('obv')
7//take a log, and return a log driver.
8//the log has an api with `read`, `get` `since`
9
10function wrap(sv, since, isReady) {
11 var waiting = []
12
13 sv.since(function (upto) {
14 if(!isReady.value) return
15 while(waiting.length && waiting[0].seq <= upto)
16 waiting.shift().cb()
17 })
18
19 isReady(function (ready) {
20 if(!ready) return
21 var upto = sv.since.value
22 if(upto == undefined) return
23 while(waiting.length && waiting[0].seq <= upto)
24 waiting.shift().cb()
25 })
26
27 function ready (cb) {
28 if(isReady.value && since.value != null && since.value === sv.since.value) cb()
29 else
30 since.once(function (upto) {
31 if(isReady.value && upto === sv.since.value) cb()
32 else waiting.push({seq: upto, cb: cb})
33 })
34 }
35
36 var wrapper = {
37 source: function (fn) {
38 return function (opts) {
39 return PullCont(function (cb) {
40 ready(function () { cb(null, fn(opts)) })
41 })
42 }
43 },
44 async: function (fn) {
45 return function (opts, cb) {
46 ready(function () {
47 fn(opts, cb)
48 })
49 }
50 },
51 sync: function (fn) { return fn }
52 }
53
54 var o = {ready: ready, since: sv.since, close: wrapper.async(sv.close || function (cb) { return cb() }) }
55 if(!sv.methods) throw new Error('a stream view must have methods property')
56
57 for(var key in sv.methods) {
58 var type = sv.methods[key]
59 var fn = sv[key]
60 if(typeof fn !== 'function') throw new Error('expected function named:'+key+'of type: '+type)
61 //type must be either source, async, or sync
62 o[key] = wrapper[type](fn)
63 }
64
65 o.methods = sv.methods
66
67 return o
68}
69
70
71function map(obj, iter) {
72 var o = {}
73 for(var k in obj)
74 o[k] = iter(obj[k], k, obj)
75 return o
76}
77
78module.exports = function (log, isReady) {
79 var views = []
80 var ready = Obv()
81 ready.set(isReady !== undefined ? isReady : true)
82 var flume = {
83 dir: log.filename ? path.dirname(log.filename) : null,
84 //stream from the log
85 since: log.since,
86 ready: ready,
87 append: function (value, cb) {
88 return log.append(value, cb)
89 },
90 stream: function (opts) {
91 return PullCont(function (cb) {
92 log.since.once(function () {
93 cb(null, log.stream(opts))
94 })
95 })
96 },
97 get: function (seq, cb) {
98 log.since.once(function () {
99 log.get(seq, cb)
100 })
101 },
102 use: function (name, createView) {
103 if(~Object.keys(flume).indexOf(name))
104 throw new Error(name + ' is already in use!')
105
106 var sv = createView(log, name)
107
108 views[name] = flume[name] = wrap(sv, log.since, ready)
109
110 sv.since.once(function (upto) {
111 pull(
112 log.stream({gt: upto, live: true, seqs: true, values: true}),
113 sv.createSink(function (err) {
114 if(err) console.error(err)
115 })
116 )
117 })
118
119 return flume
120 },
121 rebuild: function (cb) {
122 return cont.para(map(views, function (sv) {
123 return function (cb) {
124 sv.destroy(function (err) {
125 if(err) return cb(err)
126 sv.since.once(function (upto) {
127 pull(
128 log.stream({gt: upto, live: true, seqs: true, values: true}),
129 sv.createSink(function (err) {
130 if(err) console.error(err)
131 })
132 )
133 })
134 })
135 }
136 }))
137 (function (err) {
138 if(err) cb(err) //hopefully never happens
139
140 //then restream each streamview, and callback when it's uptodate with the main log.
141 })
142 },
143 close: function (cb) {
144 cont.para(map(views, function (sv) {
145 return function (cb) {
146 if(sv.close) sv.close(cb)
147 else cb()
148 }
149 })) (cb)
150
151 }
152 }
153 return flume
154}
155
156

Built with git-ssb-web