git ssb

1+

Dominic / secure-scuttlebutt



Tree: 9a4121d28f7b5ad414632abaf7e2528b0d978bdc

Files: 9a4121d28f7b5ad414632abaf7e2528b0d978bdc / index.js

7530 bytesRaw
1'use strict';
2
3var contpara = require('cont').para
4var pull = require('pull-stream')
5var pl = require('pull-level')
6var paramap = require('pull-paramap')
7var timestamp = require('monotonic-timestamp')
8var assert = require('assert')
9var ltgt = require('ltgt')
10var mlib = require('ssb-msgs')
11var explain = require('explain-error')
12var pdotjson = require('./package.json')
13var createFeed = require('ssb-feed')
14var cat = require('pull-cat')
15var ref = require('ssb-ref')
16var ssbKeys = require('ssb-keys')
17var Live = require('pull-live')
18var Notify = require('pull-notify')
19var compare = require('typewiselite')
20var peek = require('level-peek')
21var Validator = require('ssb-feed/validator')
22
23var isFeedId = ref.isFeedId
24var isMsgId = ref.isMsgId
25var isBlobId = ref.isBlobId
26
27var u = require('./util')
28var stdopts = u.options
29var msgFmt = u.format
30
31//53 bit integer
32var MAX_INT = 0x1fffffffffffff
33
34function isNumber (n) {
35 return typeof n === 'number'
36}
37
38function isString (s) {
39 return 'string' === typeof s
40}
41
42function isObject (o) {
43 return o && 'object' === typeof o && !Array.isArray(o)
44}
45
46function all (stream) {
47 return function (cb) {
48 pull(stream, pull.collect(cb))
49 }
50}
51
52function getVMajor () {
53 var version = require('./package.json').version
54 return (version.split('.')[0])|0
55}
56
57module.exports = function (db, opts, keys, path) {
58
59 var sysDB = db.sublevel('sys')
60 var logDB = db.sublevel('log')
61 var feedDB = require('./indexes/feed')(db)
62 var clockDB = require('./indexes/clock')(db)
63 var lastDB = require('./indexes/last')(db)
64 var indexDB = require('./indexes/links')(db, keys)
65 var appsDB = db.sublevel('app')
66
67 function get (db, key) {
68 return function (cb) { db.get(key, cb) }
69 }
70
71 db.opts = opts
72
73 db.add = Validator(db)
74
75 var realtime = Notify()
76
77 var await = u.await()
78 var set = await.set
79 await.set = null
80 var waiting = []
81 db.seen = await
82 db.post(function (op) {
83 set(Math.max(op.ts || op.timestamp, await.get()||0))
84 })
85
86 peek.last(logDB, {keys: true}, function (err, key) {
87 set(Math.max(key || 0, await.get()||0))
88 })
89
90 db.pre(function (op, _add, _batch) {
91 var msg = op.value
92 var id = op.key
93 // index by sequence number
94
95 function add (kv) {
96 _add(kv);
97 kv._value = op.value
98 realtime(kv)
99 }
100
101 var localtime = op.timestamp = timestamp()
102
103 add({
104 key: localtime, value: id,
105 type: 'put', prefix: logDB
106 })
107
108 })
109
110
111 db.needsRebuild = function (cb) {
112 sysDB.get('vmajor', function (err, dbvmajor) {
113 dbvmajor = (dbvmajor|0) || 0
114 cb(null, dbvmajor < getVMajor())
115 })
116 }
117
118 db.rebuildIndex = function (cb) {
119 var n = 4, m = 4, ended
120 feedDB.rebuild(next)
121 clockDB.rebuild(next)
122 lastDB.rebuild(next)
123 indexDB.rebuild(next)
124
125 function next (err) {
126 if(err && !ended) cb(ended = err)
127 }
128
129 var m = 4
130 feedDB.await(next2)
131 clockDB.await(next2)
132 lastDB.await(next2)
133 indexDB.await(next2)
134
135 function next2 () {
136 if(ended) return
137 if(--m) return
138 ended = true
139 sysDB.put('vmajor', getVMajor(), cb)
140 }
141 }
142
143 //TODO: eventually, this should filter out authors you do not follow.
144 db.createFeedStream = feedDB.createFeedStream
145 //latest was stored as author: seq
146 //but for the purposes of replication back pressure
147 //we need to know when we last replicated with someone.
148 //instead store as: {sequence: seq, ts: localtime}
149 //then, peers can request a max number of posts per feed.
150
151 function toSeq (latest) {
152 return isNumber(latest) ? latest : latest.sequence
153 }
154
155 function lookup(keys, values) {
156 return paramap(function (key, cb) {
157 if(key.sync) return cb(null, key)
158 if(!values) return cb(null, key)
159 db.get(key, function (err, msg) {
160 if (err) cb(err)
161 else {
162 cb(null, u.format(keys, values, { key: key, value: msg }))
163 }
164 })
165 })
166 }
167
168 db.lookup = lookup
169
170 db.createHistoryStream = clockDB.createHistoryStream
171
172 db.createUserStream = clockDB.createUserStream
173
174
175 //writeStream - used in replication.
176 db.createWriteStream = function (cb) {
177 return pull(
178 paramap(function (data, cb) {
179 db.add(data, function (err, msg) {
180 db.emit('invalid', err, msg)
181 cb()
182 })
183 }),
184 pull.drain(null, cb)
185 )
186 }
187
188 db.createFeed = function (keys) {
189 if(!keys)
190 keys = opts.keys.generate()
191 return createFeed(db, keys, opts)
192 }
193
194 db.latest = lastDB.latest
195
196 db.latestSequence = function (id, cb) {
197 lastDB.get(id, cb)
198 }
199
200 db.getLatest = function (id, cb) {
201 lastDB.get(id, function (err, v) {
202 if(err) return cb()
203 //callback null there is no latest
204 clockDB.get([id, toSeq(v)], function (err, hash) {
205 if(err) return cb()
206 db.get(hash, function (err, msg) {
207 if(err) cb()
208 else cb(null, {key: hash, value: msg})
209 })
210 })
211 })
212 }
213
214 db.createLogStream = Live(function (opts) {
215 opts = stdopts(opts)
216 var keys = opts.keys; delete opts.keys
217 var values = opts.values; delete opts.values
218 return pull(
219 pl.old(logDB, stdopts(opts)),
220 //lookup2(keys, values, 'timestamp')
221 paramap(function (data, cb) {
222 var key = data.value
223 var seq = data.key
224 db.get(key, function (err, value) {
225 if (err) cb(err)
226 else cb(null, msgFmt(keys, values, {key: key, value: value, timestamp: seq}))
227 })
228 })
229 )
230 }, function (opts) {
231 return pl.live(db, stdopts(opts))
232 })
233
234 var HI = undefined, LO = null
235
236 db.messagesByType = indexDB.messagesByType
237
238 db.links = indexDB.links
239
240 //get all messages that link to a given message.
241 db.relatedMessages = function (opts, cb) {
242 if(isString(opts)) opts = {key: opts}
243 if(!opts) throw new Error('opts *must* be object')
244 var key = opts.id || opts.key
245 var depth = opts.depth || Infinity
246 var n = 1
247 var msgs = {key: key, value: null}
248 db.get(key, function (err, msg) {
249 msgs.value = msg
250 if (err && err.notFound)
251 err = null // ignore not found
252 done(err)
253 })
254
255 related(msgs, depth)
256
257 function related (msg, depth) {
258 if(depth <= 0) return
259 if (n<0) return
260 n++
261 all(db.links({dest: msg.key, rel: opts.rel, keys: true, values:true, meta: false, type:'msg'}))
262 (function (err, ary) {
263 if(ary && ary.length) {
264 ary.sort(function (a, b) {
265 return compare(a.value.timestamp, b.value.timestamp) || compare(a.key, b.key)
266 })
267 msg.related = ary
268 ary.forEach(function (msg) { related (msg, depth - 1) })
269 }
270 done(err)
271 })
272 }
273
274 function count (msg) {
275 if(!msg.related)
276 return msg
277 var c = 0
278 msg.related.forEach(function (_msg) {
279 if(opts.parent) _msg.parent = msg.key
280 c += 1 + (count(_msg).count || 0)
281 })
282 if(opts.count) msg.count = c
283 return msg
284 }
285
286 function done (err) {
287 if(err && n > 0) {
288 n = -1
289 return cb(err)
290 }
291 if(--n) return
292 cb(null, count(msgs))
293 }
294 }
295
296 var _close = db.close
297
298 db.close = function (cb) {
299 var n = 5
300 clockDB.close(next)
301 feedDB.close(next)
302 lastDB.close(next)
303 indexDB.close(next)
304 _close.call(db, next)
305 function next (err) {
306 if(n < 0) return
307 if(err) return n = -1, cb(err)
308 if(--n) return
309 db && cb()
310 }
311 }
312
313 return db
314}
315
316

Built with git-ssb-web