git ssb

1+

Dominic / secure-scuttlebutt



Tree: 6dde1c5e1d32a7427848bfad8a4641577bd0d1ab

Files: 6dde1c5e1d32a7427848bfad8a4641577bd0d1ab / index.js

8315 bytesRaw
1'use strict';
2
3var contpara = require('cont').para
4var pull = require('pull-stream')
5var pl = require('pull-level')
6var paramap = require('pull-paramap')
7var timestamp = require('monotonic-timestamp')
8var assert = require('assert')
9var ltgt = require('ltgt')
10var mlib = require('ssb-msgs')
11var explain = require('explain-error')
12var pdotjson = require('./package.json')
13var createFeed = require('ssb-feed')
14var cat = require('pull-cat')
15var ref = require('ssb-ref')
16var ssbKeys = require('ssb-keys')
17var Live = require('pull-live')
18var Notify = require('pull-notify')
19var compare = require('typewiselite')
20var peek = require('level-peek')
21var Validator = require('ssb-feed/validator')
22
23var isFeedId = ref.isFeedId
24var isMsgId = ref.isMsgId
25var isBlobId = ref.isBlobId
26
27var u = require('./util')
28var stdopts = u.options
29var msgFmt = u.format
30
31//53 bit integer
32var MAX_INT = 0x1fffffffffffff
33
34function isNumber (n) {
35 return typeof n === 'number'
36}
37
38function isString (s) {
39 return 'string' === typeof s
40}
41
42var isArray = Array.isArray
43
44function isObject (o) {
45 return o && 'object' === typeof o && !Array.isArray(o)
46}
47
48function all (stream) {
49 return function (cb) {
50 pull(stream, pull.collect(cb))
51 }
52}
53
54function getVMajor () {
55 var version = require('./package.json').version
56 return (version.split('.')[0])|0
57}
58
59module.exports = function (db, opts, keys, path) {
60 var sysDB = db.sublevel('sys')
61 var logDB = db.sublevel('log')
62 var feedDB = require('./indexes/feed')(db)
63 var clockDB = require('./indexes/clock')(db)
64 var lastDB = require('./indexes/last')(db)
65 var indexDB = require('./indexes/links')(db, keys)
66 var appsDB = db.sublevel('app')
67
68 function get (db, key) {
69 return function (cb) { db.get(key, cb) }
70 }
71
72 db.opts = opts
73
74 db.add = Validator(db, opts)
75
76 var realtime = Notify()
77
78 var await = u.await()
79 var set = await.set
80 await.set = null
81 var waiting = []
82 db.seen = await
83 db.post(function (op) {
84 set(Math.max(op.ts || op.timestamp, await.get()||0))
85 })
86
87 peek.last(logDB, {keys: true}, function (err, key) {
88 set(Math.max(key || 0, await.get()||0))
89 })
90
91 db.pre(function (op, _add, _batch) {
92 var msg = op.value
93 var id = op.key
94 // index by sequence number
95
96 function add (kv) {
97 _add(kv);
98 kv._value = op.value
99 realtime(kv)
100 }
101
102 var localtime = op.timestamp = timestamp()
103
104 add({
105 key: localtime, value: id,
106 type: 'put', prefix: logDB
107 })
108
109 })
110
111
112 db.needsRebuild = function (cb) {
113 sysDB.get('vmajor', function (err, dbvmajor) {
114 dbvmajor = (dbvmajor|0) || 0
115 cb(null, dbvmajor < getVMajor())
116 })
117 }
118
119 db.rebuildIndex = function (cb) {
120 var n = 4, m = 4, ended
121 feedDB.rebuild(next)
122 clockDB.rebuild(next)
123 lastDB.rebuild(next)
124 indexDB.rebuild(next)
125
126 function next (err) {
127 if(err && !ended) cb(ended = err)
128 }
129
130 var m = 4
131 feedDB.await(next2)
132 clockDB.await(next2)
133 lastDB.await(next2)
134 indexDB.await(next2)
135
136 function next2 () {
137 if(ended) return
138 if(--m) return
139 ended = true
140 sysDB.put('vmajor', getVMajor(), cb)
141 }
142 }
143
144 function Limit (fn) {
145 return function (opts) {
146 if(opts && opts.limit && opts.limit > 0) {
147 var limit = opts.limit
148 var read = fn(opts)
149 return function (abort, cb) {
150 if(limit--) return read(abort, function (err, data) {
151 if(data && data.sync) limit ++
152 cb(err, data)
153 })
154 else read(true, cb)
155 }
156 }
157 else
158 return fn(opts)
159 }
160 }
161
162 //TODO: eventually, this should filter out authors you do not follow.
163 db.createFeedStream = Limit(feedDB.createFeedStream)
164 //latest was stored as author: seq
165 //but for the purposes of replication back pressure
166 //we need to know when we last replicated with someone.
167 //instead store as: {sequence: seq, ts: localtime}
168 //then, peers can request a max number of posts per feed.
169
170 function toSeq (latest) {
171 return isNumber(latest) ? latest : latest.sequence
172 }
173
174 function lookup(keys, values) {
175 return paramap(function (key, cb) {
176 if(key.sync) return cb(null, key)
177 if(!values) return cb(null, key)
178 db.get(key, function (err, msg) {
179 if (err) cb(err)
180 else {
181 cb(null, u.format(keys, values, { key: key, value: msg }))
182 }
183 })
184 })
185 }
186
187 db.lookup = lookup
188
189 db.createHistoryStream = Limit(clockDB.createHistoryStream)
190
191 db.createUserStream = Limit(clockDB.createUserStream)
192
193
194 //writeStream - used in replication.
195 db.createWriteStream = function (cb) {
196 return pull(
197 paramap(function (data, cb) {
198 db.add(data, function (err, msg) {
199 if(err)
200 db.emit('invalid', err, msg)
201 cb()
202 })
203 }),
204 pull.drain(null, cb)
205 )
206 }
207
208 db.createFeed = function (keys) {
209 if(!keys) keys = ssbKeys.generate()
210 return createFeed(db, keys, opts)
211 }
212
213 db.latest = Limit(lastDB.latest)
214
215 db.latestSequence = function (id, cb) {
216 lastDB.get(id, cb)
217 }
218
219 db.getLatest = function (id, cb) {
220 lastDB.get(id, function (err, v) {
221 if(err) return cb()
222 //callback null there is no latest
223 clockDB.get([id, toSeq(v)], function (err, hash) {
224 if(err) return cb()
225 db.get(hash, function (err, msg) {
226 if(err) cb()
227 else cb(null, {key: hash, value: msg})
228 })
229 })
230 })
231 }
232
233 db.createLogStream = Limit(Live(function (opts) {
234 opts = stdopts(opts)
235 var keys = opts.keys; delete opts.keys
236 var values = opts.values; delete opts.values
237 return pull(
238 pl.old(logDB, stdopts(opts)),
239 //lookup2(keys, values, 'timestamp')
240 paramap(function (data, cb) {
241 var key = data.value
242 var seq = data.key
243 db.get(key, function (err, value) {
244 if (err) cb(err)
245 else cb(null, msgFmt(keys, values, {key: key, value: value, timestamp: seq}))
246 })
247 })
248 )
249 }, function (opts) {
250 return pl.live(db, stdopts(opts))
251 }))
252
253 var HI = undefined, LO = null
254
255 db.messagesByType = Limit(indexDB.messagesByType)
256
257 db.links = Limit(indexDB.links)
258
259 //get all messages that link to a given message.
260 db.relatedMessages = function (opts, cb) {
261 if(isString(opts)) opts = {key: opts}
262 if(!opts) throw new Error('opts *must* be object')
263 var key = opts.id || opts.key
264 var depth = opts.depth || Infinity
265 var seen = {}
266
267 //filter a list of rel, used to avoid 'branch' rel in patchwork,
268 //which causes messages to be queried twice.
269 var n = 1
270 var msgs = {key: key, value: null}
271 db.get(key, function (err, msg) {
272 msgs.value = msg
273 if (err && err.notFound)
274 err = null // ignore not found
275 done(err)
276 })
277
278 related(msgs, depth)
279
280 function related (msg, depth) {
281 if(depth <= 0) return
282 if (n<0) return
283 n++
284 all(db.links({dest: msg.key, rel: opts.rel, keys: true, values:true, meta: false, type:'msg'}))
285 (function (err, ary) {
286 if(ary && ary.length) {
287 msg.related = ary = ary.sort(function (a, b) {
288 return compare(a.value.timestamp, b.value.timestamp) || compare(a.key, b.key)
289 }).filter(function (msg) {
290 if(seen[msg.key]) return
291 return seen[msg.key] = true
292 })
293 ary.forEach(function (msg) { related (msg, depth - 1) })
294 }
295 done(err)
296 })
297 }
298
299 function count (msg) {
300 if(!msg.related)
301 return msg
302 var c = 0
303 msg.related.forEach(function (_msg) {
304 if(opts.parent) _msg.parent = msg.key
305 c += 1 + (count(_msg).count || 0)
306 })
307 if(opts.count) msg.count = c
308 return msg
309 }
310
311 function done (err) {
312 if(err && n > 0) {
313 n = -1
314 return cb(err)
315 }
316 if(--n) return
317 cb(null, count(msgs))
318 }
319 }
320
321 var _close = db.close
322
323 db.close = function (cb) {
324 var n = 5
325 clockDB.close(next)
326 feedDB.close(next)
327 lastDB.close(next)
328 indexDB.close(next)
329 _close.call(db, next)
330 function next (err) {
331 if(n < 0) return
332 if(err) return n = -1, cb(err)
333 if(--n) return
334 db && cb()
335 }
336 }
337
338 return db
339}
340
341
342
343
344
345
346
347
348
349
350
351

Built with git-ssb-web