Files: b4df1e66bea6275126d6d9c508356dc44e62b504 / index.js
5618 bytesRaw
1 | ; |
2 | |
3 | var join = require('path').join |
4 | var EventEmitter = require('events') |
5 | //var Obv = require('obv') |
6 | |
7 | var pull = require('pull-stream') |
8 | var timestamp = require('monotonic-timestamp') |
9 | var explain = require('explain-error') |
10 | //var createFeed = require('ssb-feed') |
11 | var ref = require('ssb-ref') |
12 | var ssbKeys = require('ssb-keys') |
13 | var Notify = require('pull-notify') |
14 | var Validator = require('ssb-feed/validator') |
15 | var Related = require('./related') |
16 | |
17 | var isFeedId = ref.isFeedId |
18 | var isMsgId = ref.isMsgId |
19 | var isBlobId = ref.isBlobId |
20 | |
21 | var u = require('./util') |
22 | var stdopts = u.options |
23 | var Format = u.formatStream |
24 | //53 bit integer |
25 | var MAX_INT = 0x1fffffffffffff |
26 | |
27 | function isNumber (n) { |
28 | return typeof n === 'number' |
29 | } |
30 | |
31 | function isString (s) { |
32 | return 'string' === typeof s |
33 | } |
34 | |
35 | function isFunction (f) { |
36 | return 'funciton' == typeof f |
37 | } |
38 | |
39 | var isArray = Array.isArray |
40 | |
41 | function isObject (o) { |
42 | return o && 'object' === typeof o && !Array.isArray(o) |
43 | } |
44 | |
45 | function getVMajor () { |
46 | var version = require('./package.json').version |
47 | return (version.split('.')[0])|0 |
48 | } |
49 | |
50 | module.exports = function (_db, opts, keys, path) { |
51 | path = path || _db.location |
52 | |
53 | keys = keys || ssbKeys.generate() |
54 | |
55 | var db = require('./db')(join(opts.path || path, 'flume'), keys) |
56 | |
57 | //legacy database |
58 | if(_db) require('./legacy')(_db, db) |
59 | else db.ready.set(true) |
60 | |
61 | db.sublevel = function (a, b) { |
62 | return _db.sublevel(a, b) |
63 | } |
64 | |
65 | //UGLY HACK, but... |
66 | //fairly sure that something up the stack expects ssb to be an event emitter. |
67 | db.__proto__ = new EventEmitter() |
68 | |
69 | db.opts = opts |
70 | |
71 | var _get = db.get |
72 | |
73 | db.get = function (key, cb) { |
74 | if(ref.isMsg(key)) |
75 | return db.keys.get(key, function (err, seq) { |
76 | if(err) cb(err) |
77 | else cb(null, seq && seq.value) |
78 | }) |
79 | else if(Number.isInteger(key)) |
80 | _get(key, cb) //seq |
81 | else |
82 | throw new Error('secure-scuttlebutt.get: key *must* be a ssb message id or a flume offset') |
83 | } |
84 | |
85 | db.add = function (msg, cb) { |
86 | db.queue(msg, function (err, data) { |
87 | if(err) cb(err) |
88 | else db.flush(function () { cb(null, data) }) |
89 | }) |
90 | } |
91 | |
92 | db.createFeed = function (keys) { |
93 | if(!keys) keys = ssbKeys.generate() |
94 | function add (content, cb) { |
95 | //LEGACY: hacks to support add as a continuable |
96 | if(!cb) |
97 | return function (cb) { add (content, cb) } |
98 | |
99 | db.append({content: content, keys: keys}, cb) |
100 | } |
101 | return { |
102 | add: add, publish: add, |
103 | id: keys.id, keys: keys |
104 | } |
105 | } |
106 | |
107 | var realtime = Notify() |
108 | |
109 | //TODO: eventually, this should filter out authors you do not follow. |
110 | db.createFeedStream = db.feed.createFeedStream |
111 | |
112 | //latest was stored as author: seq |
113 | //but for the purposes of replication back pressure |
114 | //we need to know when we last replicated with someone. |
115 | //instead store as: {sequence: seq, ts: localtime} |
116 | //then, peers can request a max number of posts per feed. |
117 | |
118 | function toSeq (latest) { |
119 | return isNumber(latest) ? latest : latest.sequence |
120 | } |
121 | |
122 | function lookup(keys, values) { |
123 | return paramap(function (key, cb) { |
124 | if(key.sync) return cb(null, key) |
125 | if(!values) return cb(null, key) |
126 | db.get(key, function (err, data) { |
127 | if (err) cb(err) |
128 | else cb(null, u.format(keys, values, data)) |
129 | }) |
130 | }) |
131 | } |
132 | |
133 | db.lookup = lookup |
134 | |
135 | db.createHistoryStream = db.clock.createHistoryStream |
136 | |
137 | db.createUserStream = db.clock.createUserStream |
138 | |
139 | //writeStream - used in replication. |
140 | db.createWriteStream = function (cb) { |
141 | return pull( |
142 | pull.asyncMap(function (data, cb) { |
143 | db.add(data, function (err, msg) { |
144 | if(err) { |
145 | db.emit('invalid', err, msg) |
146 | } |
147 | cb() |
148 | }) |
149 | }), |
150 | pull.drain(null, cb) |
151 | ) |
152 | } |
153 | |
154 | db.latest = db.last.latest |
155 | |
156 | //used by sbot replication plugin |
157 | db.latestSequence = function (id, cb) { |
158 | db.last.get(function (err, val) { |
159 | if(err) cb(err) |
160 | else if (!val || !val[id]) cb(new Error('not found:'+id)) |
161 | else cb(null, val[id].sequence) |
162 | }) |
163 | } |
164 | |
165 | db.getLatest = function (key, cb) { |
166 | db.last.get(function (err, value) { |
167 | if(err || !value || !value[key]) cb() |
168 | //Currently, this retrives the previous message. |
169 | //but, we could rewrite validation to only use |
170 | //data the reduce view, so that no disk read is necessary. |
171 | else db.get(value[key].id, function (err, msg) { |
172 | cb(err, {key: value[key].id, value: msg}) |
173 | }) |
174 | }) |
175 | } |
176 | |
177 | db.createLogStream = function (opts) { |
178 | opts = stdopts(opts) |
179 | if(opts.raw) |
180 | return db.stream() |
181 | |
182 | var keys = opts.keys; delete opts.keys |
183 | var values = opts.values; delete opts.values |
184 | return pull(db.time.read(opts), Format(keys, values)) |
185 | } |
186 | |
187 | db.messagesByType = db.links.messagesByType |
188 | |
189 | db.links = db.links.links |
190 | |
191 | var HI = undefined, LO = null |
192 | |
193 | //get all messages that link to a given message. |
194 | |
195 | db.relatedMessages = Related(db) |
196 | |
197 | //called with [id, seq] or "<id>:<seq>" |
198 | db.getAtSequence = function (seqid, cb) { |
199 | db.clock.get(isString(seqid) ? seqid.split(':') : seqid, cb) |
200 | } |
201 | |
202 | db.getVectorClock = function (_, cb) { |
203 | if(!cb) cb = _ |
204 | db.last.get(function (err, h) { |
205 | if(err) return cb(err) |
206 | var clock = {} |
207 | for(var k in h) |
208 | clock[k] = h[k].sequence |
209 | cb(null, clock) |
210 | }) |
211 | |
212 | } |
213 | |
214 | if(_db) { |
215 | var close = db.close |
216 | db.close = function (cb) { |
217 | var n = 2 |
218 | _db.close(next); close(next) |
219 | |
220 | function next (err) { |
221 | if(err && n>0) { |
222 | n = -1 |
223 | return cb(err) |
224 | } |
225 | if(--n) return |
226 | cb() |
227 | } |
228 | |
229 | } |
230 | |
231 | } |
232 | return db |
233 | } |
234 | |
235 |
Built with git-ssb-web