git ssb

1+

Dominic / secure-scuttlebutt



Tree: 04f70232848d0eb769abc879ff4af69e21fb677b

Files: 04f70232848d0eb769abc879ff4af69e21fb677b / index.js

5749 bytesRaw
1'use strict';
2
3var join = require('path').join
4var EventEmitter = require('events')
5var Obv = require('obv')
6
7var pull = require('pull-stream')
8var timestamp = require('monotonic-timestamp')
9var explain = require('explain-error')
10var createFeed = require('ssb-feed')
11var ref = require('ssb-ref')
12var ssbKeys = require('ssb-keys')
13var Notify = require('pull-notify')
14var Validator = require('ssb-feed/validator')
15var Related = require('./related')
16
17var isFeedId = ref.isFeedId
18var isMsgId = ref.isMsgId
19var isBlobId = ref.isBlobId
20
21var u = require('./util')
22var stdopts = u.options
23var Format = u.formatStream
24//53 bit integer
25var MAX_INT = 0x1fffffffffffff
26
27function isNumber (n) {
28 return typeof n === 'number'
29}
30
31function isString (s) {
32 return 'string' === typeof s
33}
34
35var isArray = Array.isArray
36
37function isObject (o) {
38 return o && 'object' === typeof o && !Array.isArray(o)
39}
40
41function getVMajor () {
42 var version = require('./package.json').version
43 return (version.split('.')[0])|0
44}
45
46module.exports = function (_db, opts, keys, path) {
47 path = path || _db.location
48
49 keys = keys || ssbKeys.generate()
50
51 var db = require('./db')(join(opts.path || path, 'flume'), keys)
52
53 //legacy database
54 if(_db) require('./legacy')(_db, db)
55 else db.ready.set(true)
56
57 db.sublevel = function (a, b) {
58 return _db.sublevel(a, b)
59 }
60
61 //UGLY HACK, but...
62 //fairly sure that something up the stack expects ssb to be an event emitter.
63 db.__proto__ = new EventEmitter()
64
65 db.opts = opts
66
67 db.post = Obv()
68 db.batch = function (batch, cb) {
69 db.append(batch.map(function (e) {
70 return {
71 key: e.key,
72 value: e.value,
73 timestamp: timestamp()
74 }
75 }), function (err, offsets) {
76 batch.forEach(function (msg, i) {
77 //trigger post immediately.
78 db.post.set(msg)
79 })
80 cb(err)
81 })
82 }
83
84 var _get = db.get
85
86 db.get = function (key, cb) {
87 if(ref.isMsg(key))
88 return db.keys.get(key, function (err, seq) {
89 if(err) cb(err)
90 else cb(null, seq && seq.value)
91 })
92 else if(Number.isInteger(key))
93 _get(key, cb) //seq
94 else
95 throw new Error('secure-scuttlebutt.get: key *must* be a ssb message id or a flume offset')
96 }
97
98 var add = Validator(db, opts)
99 db.add = function (msg, cb) {
100 if(db.ready.value) next(true)
101 else db.ready.once(next, false)
102 function next (ready) {
103 add(msg, function (err, value) {
104 cb(err, value)
105 })
106 }
107 }
108
109 var realtime = Notify()
110
111 //TODO: eventually, this should filter out authors you do not follow.
112 db.createFeedStream = db.feed.createFeedStream
113
114 //latest was stored as author: seq
115 //but for the purposes of replication back pressure
116 //we need to know when we last replicated with someone.
117 //instead store as: {sequence: seq, ts: localtime}
118 //then, peers can request a max number of posts per feed.
119
120 function toSeq (latest) {
121 return isNumber(latest) ? latest : latest.sequence
122 }
123
124 function lookup(keys, values) {
125 return paramap(function (key, cb) {
126 if(key.sync) return cb(null, key)
127 if(!values) return cb(null, key)
128 db.get(key, function (err, data) {
129 if (err) cb(err)
130 else cb(null, u.format(keys, values, data))
131 })
132 })
133 }
134
135 db.lookup = lookup
136
137 db.createHistoryStream = db.clock.createHistoryStream
138
139 db.createUserStream = db.clock.createUserStream
140
141 //writeStream - used in replication.
142 db.createWriteStream = function (cb) {
143 return pull(
144 pull.asyncMap(function (data, cb) {
145 db.add(data, function (err, msg) {
146 if(err) {
147 db.emit('invalid', err, msg)
148 }
149 cb()
150 })
151 }),
152 pull.drain(null, cb)
153 )
154 }
155
156 db.createFeed = function (keys) {
157 if(!keys) keys = ssbKeys.generate()
158 return createFeed(db, keys, opts)
159 }
160
161 db.latest = db.last.latest
162
163 //used by sbot replication plugin
164 db.latestSequence = function (id, cb) {
165 db.last.get(function (err, val) {
166 if(err) cb(err)
167 else if (!val || !val[id]) cb(new Error('not found:'+id))
168 else cb(null, val[id].sequence)
169 })
170 }
171
172
173 db.getLatest = function (key, cb) {
174 db.last.get(function (err, value) {
175 if(err || !value || !value[key]) cb()
176 //Currently, this retrives the previous message.
177 //but, we could rewrite validation to only use
178 //data the reduce view, so that no disk read is necessary.
179 else db.get(value[key].id, function (err, msg) {
180 cb(err, {key: value[key].id, value: msg})
181 })
182 })
183 }
184
185
186 db.createLogStream = function (opts) {
187 opts = stdopts(opts)
188 if(opts.raw)
189 return db.stream()
190
191 var keys = opts.keys; delete opts.keys
192 var values = opts.values; delete opts.values
193 return pull(db.time.read(opts), Format(keys, values))
194 }
195
196 db.messagesByType = db.links.messagesByType
197
198 db.links = db.links.links
199
200 var HI = undefined, LO = null
201
202 //get all messages that link to a given message.
203
204 db.relatedMessages = Related(db)
205
206 //called with [id, seq] or "<id>:<seq>"
207 db.getAtSequence = function (seqid, cb) {
208 db.clock.get(isString(seqid) ? seqid.split(':') : seqid, cb)
209 }
210
211 db.getVectorClock = function (_, cb) {
212 if(!cb) cb = _
213 db.last.get(function (err, h) {
214 if(err) return cb(err)
215 var clock = {}
216 for(var k in h)
217 clock[k] = h[k].sequence
218 cb(null, clock)
219 })
220
221 }
222
223 if(_db) {
224 var close = db.close
225 db.close = function (cb) {
226 var n = 2
227 _db.close(next); close(next)
228
229 function next (err) {
230 if(err && n>0) {
231 n = -1
232 return cb(err)
233 }
234 if(--n) return
235 cb()
236 }
237
238 }
239
240 }
241 return db
242}
243
244

Built with git-ssb-web