Files: 24347ad96cb0abe203631fec2dd50a9b85a2a926 / index.js
5609 bytesRaw
1 | ; |
2 | |
3 | var join = require('path').join |
4 | var EventEmitter = require('events') |
5 | var Obv = require('obv') |
6 | |
7 | var pull = require('pull-stream') |
8 | var timestamp = require('monotonic-timestamp') |
9 | var explain = require('explain-error') |
10 | var createFeed = require('ssb-feed') |
11 | var ref = require('ssb-ref') |
12 | var ssbKeys = require('ssb-keys') |
13 | var Notify = require('pull-notify') |
14 | var Validator = require('ssb-feed/validator') |
15 | var Related = require('./related') |
16 | |
17 | var isFeedId = ref.isFeedId |
18 | var isMsgId = ref.isMsgId |
19 | var isBlobId = ref.isBlobId |
20 | |
21 | var u = require('./util') |
22 | var stdopts = u.options |
23 | var Format = u.formatStream |
24 | //53 bit integer |
25 | var MAX_INT = 0x1fffffffffffff |
26 | |
27 | function isNumber (n) { |
28 | return typeof n === 'number' |
29 | } |
30 | |
31 | function isString (s) { |
32 | return 'string' === typeof s |
33 | } |
34 | |
35 | var isArray = Array.isArray |
36 | |
37 | function isObject (o) { |
38 | return o && 'object' === typeof o && !Array.isArray(o) |
39 | } |
40 | |
41 | function getVMajor () { |
42 | var version = require('./package.json').version |
43 | return (version.split('.')[0])|0 |
44 | } |
45 | |
46 | module.exports = function (_db, opts, keys, path) { |
47 | path = path || _db.location |
48 | |
49 | keys = keys || ssbKeys.generate() |
50 | |
51 | var db = require('./db')(join(opts.path || path, 'flume'), keys) |
52 | |
53 | //legacy database |
54 | if(_db) require('./legacy')(_db, db) |
55 | else db.ready.set(true) |
56 | |
57 | db.sublevel = function (a, b) { |
58 | return _db.sublevel(a, b) |
59 | } |
60 | |
61 | //UGLY HACK, but... |
62 | //fairly sure that something up the stack expects ssb to be an event emitter. |
63 | db.__proto__ = new EventEmitter() |
64 | |
65 | db.opts = opts |
66 | |
67 | db.post = Obv() |
68 | db.batch = function (batch, cb) { |
69 | db.append(batch.map(function (e) { |
70 | return { |
71 | key: e.key, |
72 | value: e.value, |
73 | timestamp: timestamp() |
74 | } |
75 | }), function (err, offsets) { |
76 | batch.forEach(function (msg, i) { |
77 | //trigger post immediately. |
78 | db.post.set(msg) |
79 | }) |
80 | cb(err) |
81 | }) |
82 | } |
83 | |
84 | var _get = db.get |
85 | |
86 | db.get = function (key, cb) { |
87 | if(ref.isMsg(key)) |
88 | return db.keys.get(key, function (err, seq) { |
89 | if(err) cb(err) |
90 | else cb(null, seq && seq.value) |
91 | }) |
92 | else _get(key, cb) //seq |
93 | } |
94 | |
95 | var add = Validator(db, opts) |
96 | db.add = function (msg, cb) { |
97 | if(db.ready.value) next(true) |
98 | else db.ready.once(next, false) |
99 | function next (ready) { |
100 | add(msg, function (err, value) { |
101 | cb(err, value) |
102 | }) |
103 | } |
104 | } |
105 | |
106 | var realtime = Notify() |
107 | |
108 | //TODO: eventually, this should filter out authors you do not follow. |
109 | db.createFeedStream = db.feed.createFeedStream |
110 | |
111 | //latest was stored as author: seq |
112 | //but for the purposes of replication back pressure |
113 | //we need to know when we last replicated with someone. |
114 | //instead store as: {sequence: seq, ts: localtime} |
115 | //then, peers can request a max number of posts per feed. |
116 | |
117 | function toSeq (latest) { |
118 | return isNumber(latest) ? latest : latest.sequence |
119 | } |
120 | |
121 | function lookup(keys, values) { |
122 | return paramap(function (key, cb) { |
123 | if(key.sync) return cb(null, key) |
124 | if(!values) return cb(null, key) |
125 | db.get(key, function (err, data) { |
126 | if (err) cb(err) |
127 | else cb(null, u.format(keys, values, data)) |
128 | }) |
129 | }) |
130 | } |
131 | |
132 | db.lookup = lookup |
133 | |
134 | db.createHistoryStream = db.clock.createHistoryStream |
135 | |
136 | db.createUserStream = db.clock.createUserStream |
137 | |
138 | //writeStream - used in replication. |
139 | db.createWriteStream = function (cb) { |
140 | return pull( |
141 | pull.asyncMap(function (data, cb) { |
142 | db.add(data, function (err, msg) { |
143 | if(err) { |
144 | db.emit('invalid', err, msg) |
145 | } |
146 | cb() |
147 | }) |
148 | }), |
149 | pull.drain(null, cb) |
150 | ) |
151 | } |
152 | |
153 | db.createFeed = function (keys) { |
154 | if(!keys) keys = ssbKeys.generate() |
155 | return createFeed(db, keys, opts) |
156 | } |
157 | |
158 | db.latest = db.last.latest |
159 | |
160 | //used by sbot replication plugin |
161 | db.latestSequence = function (id, cb) { |
162 | db.last.get(function (err, val) { |
163 | if(err) cb(err) |
164 | else if (!val || !val[id]) cb(new Error('not found:'+id)) |
165 | else cb(null, val[id].sequence) |
166 | }) |
167 | } |
168 | |
169 | |
170 | db.getLatest = function (key, cb) { |
171 | db.last.get(function (err, value) { |
172 | if(err || !value || !value[key]) cb() |
173 | //Currently, this retrives the previous message. |
174 | //but, we could rewrite validation to only use |
175 | //data the reduce view, so that no disk read is necessary. |
176 | else db.get(value[key].id, function (err, msg) { |
177 | cb(err, {key: value[key].id, value: msg}) |
178 | }) |
179 | }) |
180 | } |
181 | |
182 | |
183 | db.createLogStream = function (opts) { |
184 | opts = stdopts(opts) |
185 | if(opts.raw) |
186 | return db.stream() |
187 | |
188 | var keys = opts.keys; delete opts.keys |
189 | var values = opts.values; delete opts.values |
190 | return pull(db.time.read(opts), Format(keys, values)) |
191 | } |
192 | |
193 | db.messagesByType = db.links.messagesByType |
194 | |
195 | db.links = db.links.links |
196 | |
197 | var HI = undefined, LO = null |
198 | |
199 | //get all messages that link to a given message. |
200 | |
201 | db.relatedMessages = Related(db) |
202 | |
203 | //called with [id, seq] or "<id>:<seq>" |
204 | db.getAtSequence = function (seqid, cb) { |
205 | db.clock.get(isString(seqid) ? seqid.split(':') : seqid, cb) |
206 | } |
207 | |
208 | db.getVectorClock = function (_, cb) { |
209 | if(!cb) cb = _ |
210 | db.last.get(function (err, h) { |
211 | if(err) return cb(err) |
212 | var clock = {} |
213 | for(var k in h) |
214 | clock[k] = h[k].sequence |
215 | cb(null, clock) |
216 | }) |
217 | |
218 | } |
219 | |
220 | if(_db) { |
221 | var close = db.close |
222 | db.close = function (cb) { |
223 | var n = 2 |
224 | _db.close(next); close(next) |
225 | |
226 | function next (err) { |
227 | if(err && n>0) { |
228 | n = -1 |
229 | return cb(err) |
230 | } |
231 | if(--n) return |
232 | cb() |
233 | } |
234 | |
235 | } |
236 | |
237 | } |
238 | return db |
239 | } |
240 | |
241 |
Built with git-ssb-web