git ssb

1+

Dominic / secure-scuttlebutt



Tree: 1ccda7026b7b15509e3da63065dd0416077782ea

Files: 1ccda7026b7b15509e3da63065dd0416077782ea / index.js

5609 bytesRaw
1'use strict';
2
3var join = require('path').join
4var EventEmitter = require('events')
5var Obv = require('obv')
6
7var pull = require('pull-stream')
8var timestamp = require('monotonic-timestamp')
9var explain = require('explain-error')
10var createFeed = require('ssb-feed')
11var ref = require('ssb-ref')
12var ssbKeys = require('ssb-keys')
13var Notify = require('pull-notify')
14var Validator = require('ssb-feed/validator')
15var Related = require('./related')
16
17var isFeedId = ref.isFeedId
18var isMsgId = ref.isMsgId
19var isBlobId = ref.isBlobId
20
21var u = require('./util')
22var stdopts = u.options
23var Format = u.formatStream
24//53 bit integer
25var MAX_INT = 0x1fffffffffffff
26
27function isNumber (n) {
28 return typeof n === 'number'
29}
30
31function isString (s) {
32 return 'string' === typeof s
33}
34
35var isArray = Array.isArray
36
37function isObject (o) {
38 return o && 'object' === typeof o && !Array.isArray(o)
39}
40
41function getVMajor () {
42 var version = require('./package.json').version
43 return (version.split('.')[0])|0
44}
45
46module.exports = function (_db, opts, keys, path) {
47 path = path || _db.location
48
49 keys = keys || ssbKeys.generate()
50
51 var db = require('./db')(join(opts.path || path, 'flume'), keys)
52
53 //legacy database
54 if(_db) require('./legacy')(_db, db)
55 else db.ready.set(true)
56
57 db.sublevel = function (a, b) {
58 return _db.sublevel(a, b)
59 }
60
61 //UGLY HACK, but...
62 //fairly sure that something up the stack expects ssb to be an event emitter.
63 db.__proto__ = new EventEmitter()
64
65 db.opts = opts
66
67 db.post = Obv()
68 db.batch = function (batch, cb) {
69 db.append(batch.map(function (e) {
70 return {
71 key: e.key,
72 value: e.value,
73 timestamp: timestamp()
74 }
75 }), function (err, offsets) {
76 batch.forEach(function (msg, i) {
77 //trigger post immediately.
78 db.post.set(msg)
79 })
80 cb(err)
81 })
82 }
83
84 var _get = db.get
85
86 db.get = function (key, cb) {
87 if(ref.isMsg(key))
88 return db.keys.get(key, function (err, seq) {
89 if(err) cb(err)
90 else cb(null, seq && seq.value)
91 })
92 else _get(key, cb) //seq
93 }
94
95 var add = Validator(db, opts)
96 db.add = function (msg, cb) {
97 if(db.ready.value) next(true)
98 else db.ready.once(next, false)
99 function next (ready) {
100 add(msg, function (err, value) {
101 cb(err, value)
102 })
103 }
104 }
105
106 var realtime = Notify()
107
108 //TODO: eventually, this should filter out authors you do not follow.
109 db.createFeedStream = db.feed.createFeedStream
110
111 //latest was stored as author: seq
112 //but for the purposes of replication back pressure
113 //we need to know when we last replicated with someone.
114 //instead store as: {sequence: seq, ts: localtime}
115 //then, peers can request a max number of posts per feed.
116
117 function toSeq (latest) {
118 return isNumber(latest) ? latest : latest.sequence
119 }
120
121 function lookup(keys, values) {
122 return paramap(function (key, cb) {
123 if(key.sync) return cb(null, key)
124 if(!values) return cb(null, key)
125 db.get(key, function (err, data) {
126 if (err) cb(err)
127 else cb(null, u.format(keys, values, data))
128 })
129 })
130 }
131
132 db.lookup = lookup
133
134 db.createHistoryStream = db.clock.createHistoryStream
135
136 db.createUserStream = db.clock.createUserStream
137
138 //writeStream - used in replication.
139 db.createWriteStream = function (cb) {
140 return pull(
141 pull.asyncMap(function (data, cb) {
142 db.add(data, function (err, msg) {
143 if(err) {
144 db.emit('invalid', err, msg)
145 }
146 cb()
147 })
148 }),
149 pull.drain(null, cb)
150 )
151 }
152
153 db.createFeed = function (keys) {
154 if(!keys) keys = ssbKeys.generate()
155 return createFeed(db, keys, opts)
156 }
157
158 db.latest = db.last.latest
159
160 //used by sbot replication plugin
161 db.latestSequence = function (id, cb) {
162 db.last.get(function (err, val) {
163 if(err) cb(err)
164 else if (!val || !val[id]) cb(new Error('not found:'+id))
165 else cb(null, val[id].sequence)
166 })
167 }
168
169
170 db.getLatest = function (key, cb) {
171 db.last.get(function (err, value) {
172 if(err || !value || !value[key]) cb()
173 //Currently, this retrives the previous message.
174 //but, we could rewrite validation to only use
175 //data the reduce view, so that no disk read is necessary.
176 else db.get(value[key].id, function (err, msg) {
177 cb(err, {key: value[key].id, value: msg})
178 })
179 })
180 }
181
182
183 db.createLogStream = function (opts) {
184 opts = stdopts(opts)
185 if(opts.raw)
186 return db.stream()
187
188 var keys = opts.keys; delete opts.keys
189 var values = opts.values; delete opts.values
190 return pull(db.time.read(opts), Format(keys, values))
191 }
192
193 db.messagesByType = db.links.messagesByType
194
195 db.links = db.links.links
196
197 var HI = undefined, LO = null
198
199 //get all messages that link to a given message.
200
201 db.relatedMessages = Related(db)
202
203 //called with [id, seq] or "<id>:<seq>"
204 db.getAtSequence = function (seqid, cb) {
205 db.clock.get(isString(seqid) ? seqid.split(':') : seqid, cb)
206 }
207
208 db.getVectorClock = function (_, cb) {
209 if(!cb) cb = _
210 db.last.get(function (err, h) {
211 if(err) return cb(err)
212 var clock = {}
213 for(var k in h)
214 clock[k] = h[k].sequence
215 cb(null, clock)
216 })
217
218 }
219
220 if(_db) {
221 var close = db.close
222 db.close = function (cb) {
223 var n = 2
224 _db.close(next); close(next)
225
226 function next (err) {
227 if(err && n>0) {
228 n = -1
229 return cb(err)
230 }
231 if(--n) return
232 cb()
233 }
234
235 }
236
237 }
238 return db
239}
240
241

Built with git-ssb-web