git ssb

1+

Dominic / secure-scuttlebutt



Tree: b4df1e66bea6275126d6d9c508356dc44e62b504

Files: b4df1e66bea6275126d6d9c508356dc44e62b504 / index.js

5618 bytesRaw
1'use strict';
2
3var join = require('path').join
4var EventEmitter = require('events')
5//var Obv = require('obv')
6
7var pull = require('pull-stream')
8var timestamp = require('monotonic-timestamp')
9var explain = require('explain-error')
10//var createFeed = require('ssb-feed')
11var ref = require('ssb-ref')
12var ssbKeys = require('ssb-keys')
13var Notify = require('pull-notify')
14var Validator = require('ssb-feed/validator')
15var Related = require('./related')
16
17var isFeedId = ref.isFeedId
18var isMsgId = ref.isMsgId
19var isBlobId = ref.isBlobId
20
21var u = require('./util')
22var stdopts = u.options
23var Format = u.formatStream
24//53 bit integer
25var MAX_INT = 0x1fffffffffffff
26
27function isNumber (n) {
28 return typeof n === 'number'
29}
30
31function isString (s) {
32 return 'string' === typeof s
33}
34
35function isFunction (f) {
36 return 'funciton' == typeof f
37}
38
39var isArray = Array.isArray
40
41function isObject (o) {
42 return o && 'object' === typeof o && !Array.isArray(o)
43}
44
45function getVMajor () {
46 var version = require('./package.json').version
47 return (version.split('.')[0])|0
48}
49
50module.exports = function (_db, opts, keys, path) {
51 path = path || _db.location
52
53 keys = keys || ssbKeys.generate()
54
55 var db = require('./db')(join(opts.path || path, 'flume'), keys)
56
57 //legacy database
58 if(_db) require('./legacy')(_db, db)
59 else db.ready.set(true)
60
61 db.sublevel = function (a, b) {
62 return _db.sublevel(a, b)
63 }
64
65 //UGLY HACK, but...
66 //fairly sure that something up the stack expects ssb to be an event emitter.
67 db.__proto__ = new EventEmitter()
68
69 db.opts = opts
70
71 var _get = db.get
72
73 db.get = function (key, cb) {
74 if(ref.isMsg(key))
75 return db.keys.get(key, function (err, seq) {
76 if(err) cb(err)
77 else cb(null, seq && seq.value)
78 })
79 else if(Number.isInteger(key))
80 _get(key, cb) //seq
81 else
82 throw new Error('secure-scuttlebutt.get: key *must* be a ssb message id or a flume offset')
83 }
84
85 db.add = function (msg, cb) {
86 db.queue(msg, function (err, data) {
87 if(err) cb(err)
88 else db.flush(function () { cb(null, data) })
89 })
90 }
91
92 db.createFeed = function (keys) {
93 if(!keys) keys = ssbKeys.generate()
94 function add (content, cb) {
95 //LEGACY: hacks to support add as a continuable
96 if(!cb)
97 return function (cb) { add (content, cb) }
98
99 db.append({content: content, keys: keys}, cb)
100 }
101 return {
102 add: add, publish: add,
103 id: keys.id, keys: keys
104 }
105 }
106
107 var realtime = Notify()
108
109 //TODO: eventually, this should filter out authors you do not follow.
110 db.createFeedStream = db.feed.createFeedStream
111
112 //latest was stored as author: seq
113 //but for the purposes of replication back pressure
114 //we need to know when we last replicated with someone.
115 //instead store as: {sequence: seq, ts: localtime}
116 //then, peers can request a max number of posts per feed.
117
118 function toSeq (latest) {
119 return isNumber(latest) ? latest : latest.sequence
120 }
121
122 function lookup(keys, values) {
123 return paramap(function (key, cb) {
124 if(key.sync) return cb(null, key)
125 if(!values) return cb(null, key)
126 db.get(key, function (err, data) {
127 if (err) cb(err)
128 else cb(null, u.format(keys, values, data))
129 })
130 })
131 }
132
133 db.lookup = lookup
134
135 db.createHistoryStream = db.clock.createHistoryStream
136
137 db.createUserStream = db.clock.createUserStream
138
139 //writeStream - used in replication.
140 db.createWriteStream = function (cb) {
141 return pull(
142 pull.asyncMap(function (data, cb) {
143 db.add(data, function (err, msg) {
144 if(err) {
145 db.emit('invalid', err, msg)
146 }
147 cb()
148 })
149 }),
150 pull.drain(null, cb)
151 )
152 }
153
154 db.latest = db.last.latest
155
156 //used by sbot replication plugin
157 db.latestSequence = function (id, cb) {
158 db.last.get(function (err, val) {
159 if(err) cb(err)
160 else if (!val || !val[id]) cb(new Error('not found:'+id))
161 else cb(null, val[id].sequence)
162 })
163 }
164
165 db.getLatest = function (key, cb) {
166 db.last.get(function (err, value) {
167 if(err || !value || !value[key]) cb()
168 //Currently, this retrives the previous message.
169 //but, we could rewrite validation to only use
170 //data the reduce view, so that no disk read is necessary.
171 else db.get(value[key].id, function (err, msg) {
172 cb(err, {key: value[key].id, value: msg})
173 })
174 })
175 }
176
177 db.createLogStream = function (opts) {
178 opts = stdopts(opts)
179 if(opts.raw)
180 return db.stream()
181
182 var keys = opts.keys; delete opts.keys
183 var values = opts.values; delete opts.values
184 return pull(db.time.read(opts), Format(keys, values))
185 }
186
187 db.messagesByType = db.links.messagesByType
188
189 db.links = db.links.links
190
191 var HI = undefined, LO = null
192
193 //get all messages that link to a given message.
194
195 db.relatedMessages = Related(db)
196
197 //called with [id, seq] or "<id>:<seq>"
198 db.getAtSequence = function (seqid, cb) {
199 db.clock.get(isString(seqid) ? seqid.split(':') : seqid, cb)
200 }
201
202 db.getVectorClock = function (_, cb) {
203 if(!cb) cb = _
204 db.last.get(function (err, h) {
205 if(err) return cb(err)
206 var clock = {}
207 for(var k in h)
208 clock[k] = h[k].sequence
209 cb(null, clock)
210 })
211
212 }
213
214 if(_db) {
215 var close = db.close
216 db.close = function (cb) {
217 var n = 2
218 _db.close(next); close(next)
219
220 function next (err) {
221 if(err && n>0) {
222 n = -1
223 return cb(err)
224 }
225 if(--n) return
226 cb()
227 }
228
229 }
230
231 }
232 return db
233}
234
235

Built with git-ssb-web