Files: 04f70232848d0eb769abc879ff4af69e21fb677b / index.js
5749 bytesRaw
1 | ; |
2 | |
3 | var join = require('path').join |
4 | var EventEmitter = require('events') |
5 | var Obv = require('obv') |
6 | |
7 | var pull = require('pull-stream') |
8 | var timestamp = require('monotonic-timestamp') |
9 | var explain = require('explain-error') |
10 | var createFeed = require('ssb-feed') |
11 | var ref = require('ssb-ref') |
12 | var ssbKeys = require('ssb-keys') |
13 | var Notify = require('pull-notify') |
14 | var Validator = require('ssb-feed/validator') |
15 | var Related = require('./related') |
16 | |
17 | var isFeedId = ref.isFeedId |
18 | var isMsgId = ref.isMsgId |
19 | var isBlobId = ref.isBlobId |
20 | |
21 | var u = require('./util') |
22 | var stdopts = u.options |
23 | var Format = u.formatStream |
24 | //53 bit integer |
25 | var MAX_INT = 0x1fffffffffffff |
26 | |
27 | function isNumber (n) { |
28 | return typeof n === 'number' |
29 | } |
30 | |
31 | function isString (s) { |
32 | return 'string' === typeof s |
33 | } |
34 | |
35 | var isArray = Array.isArray |
36 | |
37 | function isObject (o) { |
38 | return o && 'object' === typeof o && !Array.isArray(o) |
39 | } |
40 | |
41 | function getVMajor () { |
42 | var version = require('./package.json').version |
43 | return (version.split('.')[0])|0 |
44 | } |
45 | |
46 | module.exports = function (_db, opts, keys, path) { |
47 | path = path || _db.location |
48 | |
49 | keys = keys || ssbKeys.generate() |
50 | |
51 | var db = require('./db')(join(opts.path || path, 'flume'), keys) |
52 | |
53 | //legacy database |
54 | if(_db) require('./legacy')(_db, db) |
55 | else db.ready.set(true) |
56 | |
57 | db.sublevel = function (a, b) { |
58 | return _db.sublevel(a, b) |
59 | } |
60 | |
61 | //UGLY HACK, but... |
62 | //fairly sure that something up the stack expects ssb to be an event emitter. |
63 | db.__proto__ = new EventEmitter() |
64 | |
65 | db.opts = opts |
66 | |
67 | db.post = Obv() |
68 | db.batch = function (batch, cb) { |
69 | db.append(batch.map(function (e) { |
70 | return { |
71 | key: e.key, |
72 | value: e.value, |
73 | timestamp: timestamp() |
74 | } |
75 | }), function (err, offsets) { |
76 | batch.forEach(function (msg, i) { |
77 | //trigger post immediately. |
78 | db.post.set(msg) |
79 | }) |
80 | cb(err) |
81 | }) |
82 | } |
83 | |
84 | var _get = db.get |
85 | |
86 | db.get = function (key, cb) { |
87 | if(ref.isMsg(key)) |
88 | return db.keys.get(key, function (err, seq) { |
89 | if(err) cb(err) |
90 | else cb(null, seq && seq.value) |
91 | }) |
92 | else if(Number.isInteger(key)) |
93 | _get(key, cb) //seq |
94 | else |
95 | throw new Error('secure-scuttlebutt.get: key *must* be a ssb message id or a flume offset') |
96 | } |
97 | |
98 | var add = Validator(db, opts) |
99 | db.add = function (msg, cb) { |
100 | if(db.ready.value) next(true) |
101 | else db.ready.once(next, false) |
102 | function next (ready) { |
103 | add(msg, function (err, value) { |
104 | cb(err, value) |
105 | }) |
106 | } |
107 | } |
108 | |
109 | var realtime = Notify() |
110 | |
111 | //TODO: eventually, this should filter out authors you do not follow. |
112 | db.createFeedStream = db.feed.createFeedStream |
113 | |
114 | //latest was stored as author: seq |
115 | //but for the purposes of replication back pressure |
116 | //we need to know when we last replicated with someone. |
117 | //instead store as: {sequence: seq, ts: localtime} |
118 | //then, peers can request a max number of posts per feed. |
119 | |
120 | function toSeq (latest) { |
121 | return isNumber(latest) ? latest : latest.sequence |
122 | } |
123 | |
124 | function lookup(keys, values) { |
125 | return paramap(function (key, cb) { |
126 | if(key.sync) return cb(null, key) |
127 | if(!values) return cb(null, key) |
128 | db.get(key, function (err, data) { |
129 | if (err) cb(err) |
130 | else cb(null, u.format(keys, values, data)) |
131 | }) |
132 | }) |
133 | } |
134 | |
135 | db.lookup = lookup |
136 | |
137 | db.createHistoryStream = db.clock.createHistoryStream |
138 | |
139 | db.createUserStream = db.clock.createUserStream |
140 | |
141 | //writeStream - used in replication. |
142 | db.createWriteStream = function (cb) { |
143 | return pull( |
144 | pull.asyncMap(function (data, cb) { |
145 | db.add(data, function (err, msg) { |
146 | if(err) { |
147 | db.emit('invalid', err, msg) |
148 | } |
149 | cb() |
150 | }) |
151 | }), |
152 | pull.drain(null, cb) |
153 | ) |
154 | } |
155 | |
156 | db.createFeed = function (keys) { |
157 | if(!keys) keys = ssbKeys.generate() |
158 | return createFeed(db, keys, opts) |
159 | } |
160 | |
161 | db.latest = db.last.latest |
162 | |
163 | //used by sbot replication plugin |
164 | db.latestSequence = function (id, cb) { |
165 | db.last.get(function (err, val) { |
166 | if(err) cb(err) |
167 | else if (!val || !val[id]) cb(new Error('not found:'+id)) |
168 | else cb(null, val[id].sequence) |
169 | }) |
170 | } |
171 | |
172 | |
173 | db.getLatest = function (key, cb) { |
174 | db.last.get(function (err, value) { |
175 | if(err || !value || !value[key]) cb() |
176 | //Currently, this retrives the previous message. |
177 | //but, we could rewrite validation to only use |
178 | //data the reduce view, so that no disk read is necessary. |
179 | else db.get(value[key].id, function (err, msg) { |
180 | cb(err, {key: value[key].id, value: msg}) |
181 | }) |
182 | }) |
183 | } |
184 | |
185 | |
186 | db.createLogStream = function (opts) { |
187 | opts = stdopts(opts) |
188 | if(opts.raw) |
189 | return db.stream() |
190 | |
191 | var keys = opts.keys; delete opts.keys |
192 | var values = opts.values; delete opts.values |
193 | return pull(db.time.read(opts), Format(keys, values)) |
194 | } |
195 | |
196 | db.messagesByType = db.links.messagesByType |
197 | |
198 | db.links = db.links.links |
199 | |
200 | var HI = undefined, LO = null |
201 | |
202 | //get all messages that link to a given message. |
203 | |
204 | db.relatedMessages = Related(db) |
205 | |
206 | //called with [id, seq] or "<id>:<seq>" |
207 | db.getAtSequence = function (seqid, cb) { |
208 | db.clock.get(isString(seqid) ? seqid.split(':') : seqid, cb) |
209 | } |
210 | |
211 | db.getVectorClock = function (_, cb) { |
212 | if(!cb) cb = _ |
213 | db.last.get(function (err, h) { |
214 | if(err) return cb(err) |
215 | var clock = {} |
216 | for(var k in h) |
217 | clock[k] = h[k].sequence |
218 | cb(null, clock) |
219 | }) |
220 | |
221 | } |
222 | |
223 | if(_db) { |
224 | var close = db.close |
225 | db.close = function (cb) { |
226 | var n = 2 |
227 | _db.close(next); close(next) |
228 | |
229 | function next (err) { |
230 | if(err && n>0) { |
231 | n = -1 |
232 | return cb(err) |
233 | } |
234 | if(--n) return |
235 | cb() |
236 | } |
237 | |
238 | } |
239 | |
240 | } |
241 | return db |
242 | } |
243 | |
244 |
Built with git-ssb-web