Files: 6dde1c5e1d32a7427848bfad8a4641577bd0d1ab / index.js
8315 bytesRaw
1 | ; |
2 | |
3 | var contpara = require('cont').para |
4 | var pull = require('pull-stream') |
5 | var pl = require('pull-level') |
6 | var paramap = require('pull-paramap') |
7 | var timestamp = require('monotonic-timestamp') |
8 | var assert = require('assert') |
9 | var ltgt = require('ltgt') |
10 | var mlib = require('ssb-msgs') |
11 | var explain = require('explain-error') |
12 | var pdotjson = require('./package.json') |
13 | var createFeed = require('ssb-feed') |
14 | var cat = require('pull-cat') |
15 | var ref = require('ssb-ref') |
16 | var ssbKeys = require('ssb-keys') |
17 | var Live = require('pull-live') |
18 | var Notify = require('pull-notify') |
19 | var compare = require('typewiselite') |
20 | var peek = require('level-peek') |
21 | var Validator = require('ssb-feed/validator') |
22 | |
23 | var isFeedId = ref.isFeedId |
24 | var isMsgId = ref.isMsgId |
25 | var isBlobId = ref.isBlobId |
26 | |
27 | var u = require('./util') |
28 | var stdopts = u.options |
29 | var msgFmt = u.format |
30 | |
31 | //53 bit integer |
32 | var MAX_INT = 0x1fffffffffffff |
33 | |
34 | function isNumber (n) { |
35 | return typeof n === 'number' |
36 | } |
37 | |
38 | function isString (s) { |
39 | return 'string' === typeof s |
40 | } |
41 | |
42 | var isArray = Array.isArray |
43 | |
44 | function isObject (o) { |
45 | return o && 'object' === typeof o && !Array.isArray(o) |
46 | } |
47 | |
48 | function all (stream) { |
49 | return function (cb) { |
50 | pull(stream, pull.collect(cb)) |
51 | } |
52 | } |
53 | |
54 | function getVMajor () { |
55 | var version = require('./package.json').version |
56 | return (version.split('.')[0])|0 |
57 | } |
58 | |
59 | module.exports = function (db, opts, keys, path) { |
60 | var sysDB = db.sublevel('sys') |
61 | var logDB = db.sublevel('log') |
62 | var feedDB = require('./indexes/feed')(db) |
63 | var clockDB = require('./indexes/clock')(db) |
64 | var lastDB = require('./indexes/last')(db) |
65 | var indexDB = require('./indexes/links')(db, keys) |
66 | var appsDB = db.sublevel('app') |
67 | |
68 | function get (db, key) { |
69 | return function (cb) { db.get(key, cb) } |
70 | } |
71 | |
72 | db.opts = opts |
73 | |
74 | db.add = Validator(db, opts) |
75 | |
76 | var realtime = Notify() |
77 | |
78 | var await = u.await() |
79 | var set = await.set |
80 | await.set = null |
81 | var waiting = [] |
82 | db.seen = await |
83 | db.post(function (op) { |
84 | set(Math.max(op.ts || op.timestamp, await.get()||0)) |
85 | }) |
86 | |
87 | peek.last(logDB, {keys: true}, function (err, key) { |
88 | set(Math.max(key || 0, await.get()||0)) |
89 | }) |
90 | |
91 | db.pre(function (op, _add, _batch) { |
92 | var msg = op.value |
93 | var id = op.key |
94 | // index by sequence number |
95 | |
96 | function add (kv) { |
97 | _add(kv); |
98 | kv._value = op.value |
99 | realtime(kv) |
100 | } |
101 | |
102 | var localtime = op.timestamp = timestamp() |
103 | |
104 | add({ |
105 | key: localtime, value: id, |
106 | type: 'put', prefix: logDB |
107 | }) |
108 | |
109 | }) |
110 | |
111 | |
112 | db.needsRebuild = function (cb) { |
113 | sysDB.get('vmajor', function (err, dbvmajor) { |
114 | dbvmajor = (dbvmajor|0) || 0 |
115 | cb(null, dbvmajor < getVMajor()) |
116 | }) |
117 | } |
118 | |
119 | db.rebuildIndex = function (cb) { |
120 | var n = 4, m = 4, ended |
121 | feedDB.rebuild(next) |
122 | clockDB.rebuild(next) |
123 | lastDB.rebuild(next) |
124 | indexDB.rebuild(next) |
125 | |
126 | function next (err) { |
127 | if(err && !ended) cb(ended = err) |
128 | } |
129 | |
130 | var m = 4 |
131 | feedDB.await(next2) |
132 | clockDB.await(next2) |
133 | lastDB.await(next2) |
134 | indexDB.await(next2) |
135 | |
136 | function next2 () { |
137 | if(ended) return |
138 | if(--m) return |
139 | ended = true |
140 | sysDB.put('vmajor', getVMajor(), cb) |
141 | } |
142 | } |
143 | |
144 | function Limit (fn) { |
145 | return function (opts) { |
146 | if(opts && opts.limit && opts.limit > 0) { |
147 | var limit = opts.limit |
148 | var read = fn(opts) |
149 | return function (abort, cb) { |
150 | if(limit--) return read(abort, function (err, data) { |
151 | if(data && data.sync) limit ++ |
152 | cb(err, data) |
153 | }) |
154 | else read(true, cb) |
155 | } |
156 | } |
157 | else |
158 | return fn(opts) |
159 | } |
160 | } |
161 | |
162 | //TODO: eventually, this should filter out authors you do not follow. |
163 | db.createFeedStream = Limit(feedDB.createFeedStream) |
164 | //latest was stored as author: seq |
165 | //but for the purposes of replication back pressure |
166 | //we need to know when we last replicated with someone. |
167 | //instead store as: {sequence: seq, ts: localtime} |
168 | //then, peers can request a max number of posts per feed. |
169 | |
170 | function toSeq (latest) { |
171 | return isNumber(latest) ? latest : latest.sequence |
172 | } |
173 | |
174 | function lookup(keys, values) { |
175 | return paramap(function (key, cb) { |
176 | if(key.sync) return cb(null, key) |
177 | if(!values) return cb(null, key) |
178 | db.get(key, function (err, msg) { |
179 | if (err) cb(err) |
180 | else { |
181 | cb(null, u.format(keys, values, { key: key, value: msg })) |
182 | } |
183 | }) |
184 | }) |
185 | } |
186 | |
187 | db.lookup = lookup |
188 | |
189 | db.createHistoryStream = Limit(clockDB.createHistoryStream) |
190 | |
191 | db.createUserStream = Limit(clockDB.createUserStream) |
192 | |
193 | |
194 | //writeStream - used in replication. |
195 | db.createWriteStream = function (cb) { |
196 | return pull( |
197 | paramap(function (data, cb) { |
198 | db.add(data, function (err, msg) { |
199 | if(err) |
200 | db.emit('invalid', err, msg) |
201 | cb() |
202 | }) |
203 | }), |
204 | pull.drain(null, cb) |
205 | ) |
206 | } |
207 | |
208 | db.createFeed = function (keys) { |
209 | if(!keys) keys = ssbKeys.generate() |
210 | return createFeed(db, keys, opts) |
211 | } |
212 | |
213 | db.latest = Limit(lastDB.latest) |
214 | |
215 | db.latestSequence = function (id, cb) { |
216 | lastDB.get(id, cb) |
217 | } |
218 | |
219 | db.getLatest = function (id, cb) { |
220 | lastDB.get(id, function (err, v) { |
221 | if(err) return cb() |
222 | //callback null there is no latest |
223 | clockDB.get([id, toSeq(v)], function (err, hash) { |
224 | if(err) return cb() |
225 | db.get(hash, function (err, msg) { |
226 | if(err) cb() |
227 | else cb(null, {key: hash, value: msg}) |
228 | }) |
229 | }) |
230 | }) |
231 | } |
232 | |
233 | db.createLogStream = Limit(Live(function (opts) { |
234 | opts = stdopts(opts) |
235 | var keys = opts.keys; delete opts.keys |
236 | var values = opts.values; delete opts.values |
237 | return pull( |
238 | pl.old(logDB, stdopts(opts)), |
239 | //lookup2(keys, values, 'timestamp') |
240 | paramap(function (data, cb) { |
241 | var key = data.value |
242 | var seq = data.key |
243 | db.get(key, function (err, value) { |
244 | if (err) cb(err) |
245 | else cb(null, msgFmt(keys, values, {key: key, value: value, timestamp: seq})) |
246 | }) |
247 | }) |
248 | ) |
249 | }, function (opts) { |
250 | return pl.live(db, stdopts(opts)) |
251 | })) |
252 | |
253 | var HI = undefined, LO = null |
254 | |
255 | db.messagesByType = Limit(indexDB.messagesByType) |
256 | |
257 | db.links = Limit(indexDB.links) |
258 | |
259 | //get all messages that link to a given message. |
260 | db.relatedMessages = function (opts, cb) { |
261 | if(isString(opts)) opts = {key: opts} |
262 | if(!opts) throw new Error('opts *must* be object') |
263 | var key = opts.id || opts.key |
264 | var depth = opts.depth || Infinity |
265 | var seen = {} |
266 | |
267 | //filter a list of rel, used to avoid 'branch' rel in patchwork, |
268 | //which causes messages to be queried twice. |
269 | var n = 1 |
270 | var msgs = {key: key, value: null} |
271 | db.get(key, function (err, msg) { |
272 | msgs.value = msg |
273 | if (err && err.notFound) |
274 | err = null // ignore not found |
275 | done(err) |
276 | }) |
277 | |
278 | related(msgs, depth) |
279 | |
280 | function related (msg, depth) { |
281 | if(depth <= 0) return |
282 | if (n<0) return |
283 | n++ |
284 | all(db.links({dest: msg.key, rel: opts.rel, keys: true, values:true, meta: false, type:'msg'})) |
285 | (function (err, ary) { |
286 | if(ary && ary.length) { |
287 | msg.related = ary = ary.sort(function (a, b) { |
288 | return compare(a.value.timestamp, b.value.timestamp) || compare(a.key, b.key) |
289 | }).filter(function (msg) { |
290 | if(seen[msg.key]) return |
291 | return seen[msg.key] = true |
292 | }) |
293 | ary.forEach(function (msg) { related (msg, depth - 1) }) |
294 | } |
295 | done(err) |
296 | }) |
297 | } |
298 | |
299 | function count (msg) { |
300 | if(!msg.related) |
301 | return msg |
302 | var c = 0 |
303 | msg.related.forEach(function (_msg) { |
304 | if(opts.parent) _msg.parent = msg.key |
305 | c += 1 + (count(_msg).count || 0) |
306 | }) |
307 | if(opts.count) msg.count = c |
308 | return msg |
309 | } |
310 | |
311 | function done (err) { |
312 | if(err && n > 0) { |
313 | n = -1 |
314 | return cb(err) |
315 | } |
316 | if(--n) return |
317 | cb(null, count(msgs)) |
318 | } |
319 | } |
320 | |
321 | var _close = db.close |
322 | |
323 | db.close = function (cb) { |
324 | var n = 5 |
325 | clockDB.close(next) |
326 | feedDB.close(next) |
327 | lastDB.close(next) |
328 | indexDB.close(next) |
329 | _close.call(db, next) |
330 | function next (err) { |
331 | if(n < 0) return |
332 | if(err) return n = -1, cb(err) |
333 | if(--n) return |
334 | db && cb() |
335 | } |
336 | } |
337 | |
338 | return db |
339 | } |
340 | |
341 | |
342 | |
343 | |
344 | |
345 | |
346 | |
347 | |
348 | |
349 | |
350 | |
351 |
Built with git-ssb-web