Files: 8e8434f441e3e179672053fbde29570a4fcf6981 / minimal.js
4557 bytesRaw
1 | |
2 | var path = require('path') |
3 | var Flume = require('flumedb') |
4 | var codec = require('./codec') |
5 | var AsyncWrite = require('async-write') |
6 | var V = require('ssb-validate') |
7 | var timestamp = require('monotonic-timestamp') |
8 | var Obv = require('obv') |
9 | var ssbKeys = require('ssb-keys') |
10 | var box = ssbKeys.box |
11 | var u = require('./util') |
12 | var isFeed = require('ssb-ref').isFeed |
13 | |
14 | /* |
15 | var Compat = require('flumelog-aligned-offset/compat') |
16 | var FlumeLogAligned = require('flumelog-aligned-offset') |
17 | function OffsetLog(file, opts) { |
18 | return Compat(FlumeLogAligned(file, opts)) |
19 | } |
20 | */ |
21 | var OffsetLog = require('flumelog-memory') |
22 | /* |
23 | this file provides the flumelog, |
24 | message append (and validation) |
25 | and decrypting - as that is part of loading the messages. |
26 | |
27 | */ |
28 | |
29 | var isArray = Array.isArray |
30 | function isFunction (f) { return typeof f === 'function' } |
31 | |
32 | /* |
33 | ## queue (msg, cb) |
34 | |
35 | add a message to the log, buffering the write to make it as fast as |
36 | possible, cb when the message is queued. |
37 | |
38 | ## append (msg, cb) |
39 | |
40 | write a message, callback once it's definitely written. |
41 | */ |
42 | |
43 | function isString (s) { |
44 | return typeof s === 'string' |
45 | } |
46 | |
47 | module.exports = function (dirname, keys, opts, map) { |
48 | var hmacKey = opts && opts.caps && opts.caps.sign |
49 | |
50 | var log = OffsetLog(path.join(dirname, 'log.offset'), { blockSize: 1024 * 16, codec }) |
51 | |
52 | // NOTE: must use db.ready.set(true) at when migration is complete |
53 | // false says the database is not ready yet! |
54 | var db = Flume(log, true, map) |
55 | .use('last', require('./indexes/last')()) |
56 | |
57 | var state = V.initial() |
58 | var ready = false |
59 | var waiting = [] |
60 | var flush = [] |
61 | |
62 | var append = db.rawAppend = db.append |
63 | db.post = Obv() |
64 | var queue = AsyncWrite(function (_, cb) { |
65 | var batch = state.queue |
66 | state.queue = [] |
67 | append(batch, function (err, v) { |
68 | batch.forEach(function (data) { |
69 | db.post.set(u.originalData(data)) |
70 | }) |
71 | cb(err, v) |
72 | }) |
73 | }, function reduce (_, msg) { |
74 | return V.append(state, hmacKey, msg) |
75 | }, function (_state) { |
76 | return state.queue.length > 1000 |
77 | }, function isEmpty (_state) { |
78 | return !state.queue.length |
79 | }, 100) |
80 | |
81 | queue.onDrain = function () { |
82 | if (state.queue.length === 0) { |
83 | var l = flush.length |
84 | for (var i = 0; i < l; ++i) { flush[i]() } |
85 | flush = flush.slice(l) |
86 | } |
87 | } |
88 | |
89 | //load the map of the latest items, copy into validation state. |
90 | db.last.get(function (_, last) { |
91 | // copy to so we avoid weirdness, because this object |
92 | // tracks the state coming in to the database. |
93 | for (var k in last) { |
94 | state.feeds[k] = { |
95 | id: last[k].id, |
96 | timestamp: last[k].ts || last[k].timestamp, |
97 | sequence: last[k].sequence, |
98 | queue: [] |
99 | } |
100 | } |
101 | ready = true |
102 | |
103 | var l = waiting.length |
104 | for (var i = 0; i < l; ++i) { waiting[i]() } |
105 | waiting = waiting.slice(l) |
106 | }) |
107 | |
108 | function wait (fn) { |
109 | return function (value, cb) { |
110 | if (ready) fn(value, cb) |
111 | else { |
112 | waiting.push(function () { |
113 | fn(value, cb) |
114 | }) |
115 | } |
116 | } |
117 | } |
118 | |
119 | db.queue = wait(function (msg, cb) { |
120 | queue(msg, function (err) { |
121 | var data = state.queue[state.queue.length - 1] |
122 | if (err) cb(err) |
123 | else cb(null, data) |
124 | }) |
125 | }) |
126 | |
127 | db.append = wait(function (opts, cb) { |
128 | try { |
129 | var content = opts.content |
130 | var recps = opts.content.recps |
131 | if (recps) { |
132 | const isNonEmptyArrayOfFeeds = isArray(recps) && recps.every(isFeed) && recps.length > 0 |
133 | if (isFeed(recps) || isNonEmptyArrayOfFeeds) { |
134 | recps = opts.content.recps = [].concat(recps) // force to array |
135 | content = opts.content = box(opts.content, recps) |
136 | } else { |
137 | const errMsg = 'private message recipients must be valid, was:' + JSON.stringify(recps) |
138 | throw new Error(errMsg) |
139 | } |
140 | } |
141 | |
142 | var msg = V.create( |
143 | state.feeds[opts.keys.id], |
144 | opts.keys, opts.hmacKey || hmacKey, |
145 | content, |
146 | timestamp() |
147 | ) |
148 | } catch (err) { |
149 | cb(err) |
150 | return |
151 | } |
152 | |
153 | queue(msg, function (err) { |
154 | if (err) return cb(err) |
155 | var data = state.queue[state.queue.length - 1] |
156 | flush.push(function () { |
157 | cb(null, data) |
158 | }) |
159 | }) |
160 | }) |
161 | |
162 | db.publish = function (content, cb) { |
163 | return db.append({content: content, keys: keys}, cb) |
164 | } |
165 | |
166 | db.buffer = function () { |
167 | return queue.buffer |
168 | } |
169 | |
170 | db.flush = function (cb) { |
171 | // maybe need to check if there is anything currently writing? |
172 | if (!queue.buffer || !queue.buffer.queue.length && !queue.writing) cb() |
173 | else flush.push(cb) |
174 | } |
175 | |
176 | return db |
177 | } |
178 | |
179 | |
180 | |
181 |
Built with git-ssb-web