git ssb

1+

Dominic / secure-scuttlebutt



Commit 3a87ab0f145263cab848f87d3125febdc5561b9a

tidy up flume, and move it to beside the db not inside it

Dominic Tarr committed on 12/22/2016, 2:03:22 PM
Parent: ebd8581d6cea19558023b851fbdf599a5044b446

Files changed

index.jschanged
legacy.jschanged
related.jsadded
index.jsView
@@ -1,12 +1,15 @@
11 'use strict';
22
3 +var join = require('path').join
4 +var assert = require('assert')
5 +var EventEmitter = require('events')
6 +
37 var contpara = require('cont').para
48 var pull = require('pull-stream')
59 var pl = require('pull-level')
610 var paramap = require('pull-paramap')
711 var timestamp = require('monotonic-timestamp')
8-var assert = require('assert')
912 var ltgt = require('ltgt')
1013 var mlib = require('ssb-msgs')
1114 var explain = require('explain-error')
1215 var pdotjson = require('./package.json')
@@ -15,14 +18,12 @@
1518 var ref = require('ssb-ref')
1619 var ssbKeys = require('ssb-keys')
1720 var Live = require('pull-live')
1821 var Notify = require('pull-notify')
19-var compare = require('typewiselite')
2022 var peek = require('level-peek')
2123 var Validator = require('ssb-feed/validator')
24 +var Related = require('./related')
2225
23-var EventEmitter = require('events')
24-
2526 var isFeedId = ref.isFeedId
2627 var isMsgId = ref.isMsgId
2728 var isBlobId = ref.isBlobId
2829
@@ -45,14 +46,8 @@
4546 function isObject (o) {
4647 return o && 'object' === typeof o && !Array.isArray(o)
4748 }
4849
49-function all (stream) {
50- return function (cb) {
51- pull(stream, pull.collect(cb))
52- }
53-}
54-
5550 function getVMajor () {
5651 var version = require('./package.json').version
5752 return (version.split('.')[0])|0
5853 }
@@ -61,60 +56,34 @@
6156 path = path || _db.location
6257
6358 keys = keys || ssbKeys.generate()
6459
65- var db = require('./db')(path, keys)
60 + var db = require('./db')(join(opts.path || path, 'flume'), keys)
6661
67- if(_db) { //legacy database
68- require('./legacy')(_db)
69- db.since.once(function (v) {
70- if(v === -1) load(null)
71- else db.get(v, function (err, data) {
72- if(err) throw err
73- load(data.timestamp)
74- })
75- })
62 + //legacy database
63 + if(_db) require('./legacy')(_db, db)
7664
77- function load(since) {
78- pull(
79- _db.createLogStream({gt: since}),
80- paramap(function (data, cb) {
81- if(Math.random() < 0.001)
82- console.log(data.timestamp)
83- db.append(data, cb)
84- }),
85- pull.drain(null, function () {
86- console.log('loaded!')
87- })
88- )
89-
90- }
91-
92-// pull(db.time.read(), pull.drain(console.log))
93- }
9465 db.sublevel = function (a, b) {
9566 return _db.sublevel(a, b)
9667 }
68 +
69 + //UGLY HACK, but...
9770 //fairly sure that something up the stack expects ssb to be an event emitter.
9871 db.__proto__ = new EventEmitter()
9972
100- function get (db, key) {
101- return function (cb) { db.get(key, cb) }
102- }
103-
10473 db.opts = opts
10574
10675 db.batch = function (batch, cb) {
107- db.append(batch.map(function (e) {
108- return {
109- key: e.key,
110- value: e.value,
111- timestamp: timestamp()
112- }
113- }), function (err, offsets) {
114- cb(err)
115- })
116- }
76 + db.append(batch.map(function (e) {
77 + return {
78 + key: e.key,
79 + value: e.value,
80 + timestamp: timestamp()
81 + }
82 + }), function (err, offsets) {
83 + cb(err)
84 + })
85 + }
11786
11887 var _get = db.get
11988
12089 db.get = function (key, cb) {
@@ -131,26 +100,8 @@
131100 db.add = Validator(db, opts)
132101
133102 var realtime = Notify()
134103
135- function Limit (fn) {
136- return function (opts) {
137- if(opts && opts.limit && opts.limit > 0) {
138- var limit = opts.limit
139- var read = fn(opts)
140- return function (abort, cb) {
141- if(limit--) return read(abort, function (err, data) {
142- if(data && data.sync) limit ++
143- cb(err, data)
144- })
145- else read(true, cb)
146- }
147- }
148- else
149- return fn(opts)
150- }
151- }
152-
153104 //TODO: eventually, this should filter out authors you do not follow.
154105 db.createFeedStream = db.feed.createFeedStream
155106
156107 //latest was stored as author: seq
@@ -218,9 +169,8 @@
218169 if(err || !value || !value[key]) cb()
219170 //Currently, this retrives the previous message.
220171 //but, we could rewrite validation to only use
221172 //data the reduce view, so that no disk read is necessary.
222-// else cb(null, {key: value.id, value: {sequence: value.sequence, timestamp: value.ts}})
223173 else db.get(value[key].id, function (err, msg) {
224174 cb(err, {key: value[key].id, value: msg})
225175 })
226176 })
@@ -243,73 +193,10 @@
243193
244194 var HI = undefined, LO = null
245195
246196 //get all messages that link to a given message.
247- db.relatedMessages = function (opts, cb) {
248- if(isString(opts)) opts = {key: opts}
249- if(!opts) throw new Error('opts *must* be object')
250- var key = opts.id || opts.key
251- var depth = opts.depth || Infinity
252- var seen = {}
253197
254- //filter a list of rel, used to avoid 'branch' rel in patchwork,
255- //which causes messages to be queried twice.
256- var n = 1
257- var msgs = {key: key, value: null}
258- db.get(key, function (err, msg) {
259- msgs.value = msg
260- if (err && err.notFound)
261- err = null // ignore not found
262- done(err)
263- })
198 + db.relatedMessages = Related(db)
264199
265- related(msgs, depth)
266-
267- function related (msg, depth) {
268- if(depth <= 0) return
269- if (n<0) return
270- n++
271- all(db.links({dest: msg.key, rel: opts.rel, keys: true, values:true, meta: false, type:'msg'}))
272- (function (err, ary) {
273- if(ary && ary.length) {
274- msg.related = ary = ary.sort(function (a, b) {
275- return compare(a.value.timestamp, b.value.timestamp) || compare(a.key, b.key)
276- }).filter(function (msg) {
277- if(seen[msg.key]) return
278- return seen[msg.key] = true
279- })
280- ary.forEach(function (msg) { related (msg, depth - 1) })
281- }
282- done(err)
283- })
284- }
285-
286- function count (msg) {
287- if(!msg.related)
288- return msg
289- var c = 0
290- msg.related.forEach(function (_msg) {
291- if(opts.parent) _msg.parent = msg.key
292- c += 1 + (count(_msg).count || 0)
293- })
294- if(opts.count) msg.count = c
295- return msg
296- }
297-
298- function done (err) {
299- if(err && n > 0) {
300- n = -1
301- return cb(err)
302- }
303- if(--n) return
304- cb(null, count(msgs))
305- }
306- }
307-
308200 return db
309201 }
310202
311-
312-
313-
314-
315-
legacy.jsView
@@ -6,11 +6,11 @@
66 var stdopts = u.options
77 var Format = u.formatStream
88 var msgFmt = u.format
99
10-module.exports = function (db) {
10 +module.exports = function (db, flumedb) {
1111
12- var logDB = db.sublevel('log')
12 + var logDB = db.sublevel('log')
1313 db.pre(function (op, _add, _batch) {
1414 var msg = op.value
1515 var id = op.key
1616 // index by sequence number
@@ -29,9 +29,8 @@
2929 })
3030
3131 })
3232
33-
3433 function Limit (fn) {
3534 return function (opts) {
3635 if(opts && opts.limit && opts.limit > 0) {
3736 var limit = opts.limit
@@ -68,6 +67,30 @@
6867 }, function (opts) {
6968 return pl.live(db, stdopts(opts))
7069 }))
7170
71 + if(flumedb) {
72 + flumedb.since.once(function (v) {
73 + if(v === -1) load(null)
74 + else flumedb.get(v, function (err, data) {
75 + if(err) throw err
76 + load(data.timestamp)
77 + })
78 + })
79 +
80 + function load(since) {
81 + pull(
82 + db.createLogStream({gt: since}),
83 + paramap(function (data, cb) {
84 + if(Math.random() < 0.001)
85 + console.log(data.timestamp)
86 + flumedb.append(data, cb)
87 + }),
88 + pull.drain(null, function () {
89 + console.log('loaded!')
90 + })
91 + )
92 + }
93 + }
7294 }
7395
96 +
related.jsView
@@ -1,0 +1,76 @@
1 +var compare = require('typewiselite')
2 +var pull = require('pull-stream')
3 +
4 +function isString (s) {
5 + return 'string' === typeof s
6 +}
7 +
8 +function all (stream) {
9 + return function (cb) {
10 + pull(stream, pull.collect(cb))
11 + }
12 +}
13 +
14 +module.exports = function (db) {
15 + return function (opts, cb) {
16 + if(isString(opts)) opts = {key: opts}
17 + if(!opts) throw new Error('opts *must* be object')
18 + var key = opts.id || opts.key
19 + var depth = opts.depth || Infinity
20 + var seen = {}
21 +
22 + //filter a list of rel, used to avoid 'branch' rel in patchwork,
23 + //which causes messages to be queried twice.
24 + var n = 1
25 + var msgs = {key: key, value: null}
26 + db.get(key, function (err, msg) {
27 + msgs.value = msg
28 + if (err && err.notFound)
29 + err = null // ignore not found
30 + done(err)
31 + })
32 +
33 + related(msgs, depth)
34 +
35 + function related (msg, depth) {
36 + if(depth <= 0) return
37 + if (n<0) return
38 + n++
39 + all(db.links({dest: msg.key, rel: opts.rel, keys: true, values:true, meta: false, type:'msg'}))
40 + (function (err, ary) {
41 + if(ary && ary.length) {
42 + msg.related = ary = ary.sort(function (a, b) {
43 + return compare(a.value.timestamp, b.value.timestamp) || compare(a.key, b.key)
44 + }).filter(function (msg) {
45 + if(seen[msg.key]) return
46 + return seen[msg.key] = true
47 + })
48 + ary.forEach(function (msg) { related (msg, depth - 1) })
49 + }
50 + done(err)
51 + })
52 + }
53 +
54 + function count (msg) {
55 + if(!msg.related)
56 + return msg
57 + var c = 0
58 + msg.related.forEach(function (_msg) {
59 + if(opts.parent) _msg.parent = msg.key
60 + c += 1 + (count(_msg).count || 0)
61 + })
62 + if(opts.count) msg.count = c
63 + return msg
64 + }
65 +
66 + function done (err) {
67 + if(err && n > 0) {
68 + n = -1
69 + return cb(err)
70 + }
71 + if(--n) return
72 + cb(null, count(msgs))
73 + }
74 + }
75 +}
76 +

Built with git-ssb-web