Files: 9a4121d28f7b5ad414632abaf7e2528b0d978bdc / index.js
7530 bytesRaw
1 | ; |
2 | |
3 | var contpara = require('cont').para |
4 | var pull = require('pull-stream') |
5 | var pl = require('pull-level') |
6 | var paramap = require('pull-paramap') |
7 | var timestamp = require('monotonic-timestamp') |
8 | var assert = require('assert') |
9 | var ltgt = require('ltgt') |
10 | var mlib = require('ssb-msgs') |
11 | var explain = require('explain-error') |
12 | var pdotjson = require('./package.json') |
13 | var createFeed = require('ssb-feed') |
14 | var cat = require('pull-cat') |
15 | var ref = require('ssb-ref') |
16 | var ssbKeys = require('ssb-keys') |
17 | var Live = require('pull-live') |
18 | var Notify = require('pull-notify') |
19 | var compare = require('typewiselite') |
20 | var peek = require('level-peek') |
21 | var Validator = require('ssb-feed/validator') |
22 | |
23 | var isFeedId = ref.isFeedId |
24 | var isMsgId = ref.isMsgId |
25 | var isBlobId = ref.isBlobId |
26 | |
27 | var u = require('./util') |
28 | var stdopts = u.options |
29 | var msgFmt = u.format |
30 | |
31 | //53 bit integer |
32 | var MAX_INT = 0x1fffffffffffff |
33 | |
34 | function isNumber (n) { |
35 | return typeof n === 'number' |
36 | } |
37 | |
38 | function isString (s) { |
39 | return 'string' === typeof s |
40 | } |
41 | |
42 | function isObject (o) { |
43 | return o && 'object' === typeof o && !Array.isArray(o) |
44 | } |
45 | |
46 | function all (stream) { |
47 | return function (cb) { |
48 | pull(stream, pull.collect(cb)) |
49 | } |
50 | } |
51 | |
52 | function getVMajor () { |
53 | var version = require('./package.json').version |
54 | return (version.split('.')[0])|0 |
55 | } |
56 | |
57 | module.exports = function (db, opts, keys, path) { |
58 | |
59 | var sysDB = db.sublevel('sys') |
60 | var logDB = db.sublevel('log') |
61 | var feedDB = require('./indexes/feed')(db) |
62 | var clockDB = require('./indexes/clock')(db) |
63 | var lastDB = require('./indexes/last')(db) |
64 | var indexDB = require('./indexes/links')(db, keys) |
65 | var appsDB = db.sublevel('app') |
66 | |
67 | function get (db, key) { |
68 | return function (cb) { db.get(key, cb) } |
69 | } |
70 | |
71 | db.opts = opts |
72 | |
73 | db.add = Validator(db) |
74 | |
75 | var realtime = Notify() |
76 | |
77 | var await = u.await() |
78 | var set = await.set |
79 | await.set = null |
80 | var waiting = [] |
81 | db.seen = await |
82 | db.post(function (op) { |
83 | set(Math.max(op.ts || op.timestamp, await.get()||0)) |
84 | }) |
85 | |
86 | peek.last(logDB, {keys: true}, function (err, key) { |
87 | set(Math.max(key || 0, await.get()||0)) |
88 | }) |
89 | |
90 | db.pre(function (op, _add, _batch) { |
91 | var msg = op.value |
92 | var id = op.key |
93 | // index by sequence number |
94 | |
95 | function add (kv) { |
96 | _add(kv); |
97 | kv._value = op.value |
98 | realtime(kv) |
99 | } |
100 | |
101 | var localtime = op.timestamp = timestamp() |
102 | |
103 | add({ |
104 | key: localtime, value: id, |
105 | type: 'put', prefix: logDB |
106 | }) |
107 | |
108 | }) |
109 | |
110 | |
111 | db.needsRebuild = function (cb) { |
112 | sysDB.get('vmajor', function (err, dbvmajor) { |
113 | dbvmajor = (dbvmajor|0) || 0 |
114 | cb(null, dbvmajor < getVMajor()) |
115 | }) |
116 | } |
117 | |
118 | db.rebuildIndex = function (cb) { |
119 | var n = 4, m = 4, ended |
120 | feedDB.rebuild(next) |
121 | clockDB.rebuild(next) |
122 | lastDB.rebuild(next) |
123 | indexDB.rebuild(next) |
124 | |
125 | function next (err) { |
126 | if(err && !ended) cb(ended = err) |
127 | } |
128 | |
129 | var m = 4 |
130 | feedDB.await(next2) |
131 | clockDB.await(next2) |
132 | lastDB.await(next2) |
133 | indexDB.await(next2) |
134 | |
135 | function next2 () { |
136 | if(ended) return |
137 | if(--m) return |
138 | ended = true |
139 | sysDB.put('vmajor', getVMajor(), cb) |
140 | } |
141 | } |
142 | |
143 | //TODO: eventually, this should filter out authors you do not follow. |
144 | db.createFeedStream = feedDB.createFeedStream |
145 | //latest was stored as author: seq |
146 | //but for the purposes of replication back pressure |
147 | //we need to know when we last replicated with someone. |
148 | //instead store as: {sequence: seq, ts: localtime} |
149 | //then, peers can request a max number of posts per feed. |
150 | |
151 | function toSeq (latest) { |
152 | return isNumber(latest) ? latest : latest.sequence |
153 | } |
154 | |
155 | function lookup(keys, values) { |
156 | return paramap(function (key, cb) { |
157 | if(key.sync) return cb(null, key) |
158 | if(!values) return cb(null, key) |
159 | db.get(key, function (err, msg) { |
160 | if (err) cb(err) |
161 | else { |
162 | cb(null, u.format(keys, values, { key: key, value: msg })) |
163 | } |
164 | }) |
165 | }) |
166 | } |
167 | |
168 | db.lookup = lookup |
169 | |
170 | db.createHistoryStream = clockDB.createHistoryStream |
171 | |
172 | db.createUserStream = clockDB.createUserStream |
173 | |
174 | |
175 | //writeStream - used in replication. |
176 | db.createWriteStream = function (cb) { |
177 | return pull( |
178 | paramap(function (data, cb) { |
179 | db.add(data, function (err, msg) { |
180 | db.emit('invalid', err, msg) |
181 | cb() |
182 | }) |
183 | }), |
184 | pull.drain(null, cb) |
185 | ) |
186 | } |
187 | |
188 | db.createFeed = function (keys) { |
189 | if(!keys) |
190 | keys = opts.keys.generate() |
191 | return createFeed(db, keys, opts) |
192 | } |
193 | |
194 | db.latest = lastDB.latest |
195 | |
196 | db.latestSequence = function (id, cb) { |
197 | lastDB.get(id, cb) |
198 | } |
199 | |
200 | db.getLatest = function (id, cb) { |
201 | lastDB.get(id, function (err, v) { |
202 | if(err) return cb() |
203 | //callback null there is no latest |
204 | clockDB.get([id, toSeq(v)], function (err, hash) { |
205 | if(err) return cb() |
206 | db.get(hash, function (err, msg) { |
207 | if(err) cb() |
208 | else cb(null, {key: hash, value: msg}) |
209 | }) |
210 | }) |
211 | }) |
212 | } |
213 | |
214 | db.createLogStream = Live(function (opts) { |
215 | opts = stdopts(opts) |
216 | var keys = opts.keys; delete opts.keys |
217 | var values = opts.values; delete opts.values |
218 | return pull( |
219 | pl.old(logDB, stdopts(opts)), |
220 | //lookup2(keys, values, 'timestamp') |
221 | paramap(function (data, cb) { |
222 | var key = data.value |
223 | var seq = data.key |
224 | db.get(key, function (err, value) { |
225 | if (err) cb(err) |
226 | else cb(null, msgFmt(keys, values, {key: key, value: value, timestamp: seq})) |
227 | }) |
228 | }) |
229 | ) |
230 | }, function (opts) { |
231 | return pl.live(db, stdopts(opts)) |
232 | }) |
233 | |
234 | var HI = undefined, LO = null |
235 | |
236 | db.messagesByType = indexDB.messagesByType |
237 | |
238 | db.links = indexDB.links |
239 | |
240 | //get all messages that link to a given message. |
241 | db.relatedMessages = function (opts, cb) { |
242 | if(isString(opts)) opts = {key: opts} |
243 | if(!opts) throw new Error('opts *must* be object') |
244 | var key = opts.id || opts.key |
245 | var depth = opts.depth || Infinity |
246 | var n = 1 |
247 | var msgs = {key: key, value: null} |
248 | db.get(key, function (err, msg) { |
249 | msgs.value = msg |
250 | if (err && err.notFound) |
251 | err = null // ignore not found |
252 | done(err) |
253 | }) |
254 | |
255 | related(msgs, depth) |
256 | |
257 | function related (msg, depth) { |
258 | if(depth <= 0) return |
259 | if (n<0) return |
260 | n++ |
261 | all(db.links({dest: msg.key, rel: opts.rel, keys: true, values:true, meta: false, type:'msg'})) |
262 | (function (err, ary) { |
263 | if(ary && ary.length) { |
264 | ary.sort(function (a, b) { |
265 | return compare(a.value.timestamp, b.value.timestamp) || compare(a.key, b.key) |
266 | }) |
267 | msg.related = ary |
268 | ary.forEach(function (msg) { related (msg, depth - 1) }) |
269 | } |
270 | done(err) |
271 | }) |
272 | } |
273 | |
274 | function count (msg) { |
275 | if(!msg.related) |
276 | return msg |
277 | var c = 0 |
278 | msg.related.forEach(function (_msg) { |
279 | if(opts.parent) _msg.parent = msg.key |
280 | c += 1 + (count(_msg).count || 0) |
281 | }) |
282 | if(opts.count) msg.count = c |
283 | return msg |
284 | } |
285 | |
286 | function done (err) { |
287 | if(err && n > 0) { |
288 | n = -1 |
289 | return cb(err) |
290 | } |
291 | if(--n) return |
292 | cb(null, count(msgs)) |
293 | } |
294 | } |
295 | |
296 | var _close = db.close |
297 | |
298 | db.close = function (cb) { |
299 | var n = 5 |
300 | clockDB.close(next) |
301 | feedDB.close(next) |
302 | lastDB.close(next) |
303 | indexDB.close(next) |
304 | _close.call(db, next) |
305 | function next (err) { |
306 | if(n < 0) return |
307 | if(err) return n = -1, cb(err) |
308 | if(--n) return |
309 | db && cb() |
310 | } |
311 | } |
312 | |
313 | return db |
314 | } |
315 | |
316 |
Built with git-ssb-web