Commit 9060926fc8d0a4acaadba88858dfaefc5a8e1d16
use flumeview-reduce@1
Dominic Tarr committed on 11/29/2016, 2:15:35 PMParent: 46c6d9361cb9a399be820c8d6b756e1ea0928390
Files changed
index.js | changed |
indexes/last.js | changed |
index.js | ||
---|---|---|
@@ -182,11 +182,14 @@ | ||
182 | 182 … | |
183 | 183 … | //writeStream - used in replication. |
184 | 184 … | db.createWriteStream = function (cb) { |
185 | 185 … | return pull( |
186 | - paramap(function (data, cb) { | |
186 … | + pull.asyncMap(function (data, cb) { | |
187 | 187 … | db.add(data, function (err, msg) { |
188 | - if(err) db.emit('invalid', err, msg) | |
188 … | + if(err) { | |
189 … | + console.error(err.message, data) | |
190 … | + db.emit('invalid', err, msg) | |
191 … | + } | |
189 | 192 … | cb() |
190 | 193 … | }) |
191 | 194 … | }), |
192 | 195 … | pull.drain(null, cb) |
@@ -201,26 +204,32 @@ | ||
201 | 204 … | db.latest = db.last.latest |
202 | 205 … | |
203 | 206 … | //used by sbot replication plugin |
204 | 207 … | db.latestSequence = function (id, cb) { |
205 | - db.getLatest(id, function (err, data) { | |
206 | - if(err || !data) cb(null, 0) | |
207 | - else cb(null, data.value.sequence) | |
208 … | + db.last.get(function (err, val) { | |
209 … | + if(err) cb(err) | |
210 … | + else if(!val[id]) cb(new Error('not found:'+id)) | |
211 … | + else cb(null, val[id].sequence) | |
208 | 212 … | }) |
209 | 213 … | } |
210 | 214 … | |
211 | 215 … | |
212 | 216 … | db.getLatest = function (key, cb) { |
213 | - db.last.get(key, function (err, seq) { | |
214 | - if(err) return cb() | |
215 | - db.get(seq, cb) | |
217 … | + db.last.get(function (err, value) { | |
218 … | + if(err || !value || !value[key]) cb() | |
219 … | + //Currently, this retrives the previous message. | |
220 … | + //but, we could rewrite validation to only use | |
221 … | + //data the reduce view, so that no disk read is necessary. | |
222 … | +// else cb(null, {key: value.id, value: {sequence: value.sequence, timestamp: value.ts}}) | |
223 … | + else db.get(value[key].id, function (err, msg) { | |
224 … | + cb(err, {key: value[key].id, value: msg}) | |
225 … | + }) | |
216 | 226 … | }) |
217 | 227 … | } |
218 | 228 … | |
219 | 229 … | |
220 | 230 … | db.createLogStream = function (opts) { |
221 | 231 … | opts = stdopts(opts) |
222 | - console.log('createLogStream', opts) | |
223 | 232 … | if(opts.raw) |
224 | 233 … | return db.stream() |
225 | 234 … | |
226 | 235 … | var keys = opts.keys; delete opts.keys |
@@ -303,9 +312,4 @@ | ||
303 | 312 … | |
304 | 313 … | |
305 | 314 … | |
306 | 315 … | |
307 | - | |
308 | - | |
309 | - | |
310 | - | |
311 | - |
indexes/last.js | ||
---|---|---|
@@ -1,11 +1,11 @@ | ||
1 | 1 … | var pull = require('pull-stream') |
2 | 2 … | var path = require('path') |
3 | 3 … | var ltgt = require('ltgt') |
4 | 4 … | var u = require('../util') |
5 | - | |
6 | -var ViewLevel = require('flumeview-level') | |
7 | - | |
5 … | +var pCont = require('pull-cont') | |
6 … | +//var ViewLevel = require('flumeview-level') | |
7 … | +var Reduce = require('flumeview-reduce') | |
8 | 8 … | function isNumber (n) { |
9 | 9 … | return typeof n === 'number' |
10 | 10 … | } |
11 | 11 … | |
@@ -15,25 +15,27 @@ | ||
15 | 15 … | |
16 | 16 … | module.exports = function () { |
17 | 17 … | |
18 | 18 … | //TODO: rewrite as a flumeview-reduce |
19 | - var createIndex = ViewLevel(1, function (data) { | |
20 | - return [data.value.author] | |
19 … | + var createIndex = Reduce(1, sfunction (acc, data) { | |
20 … | + if(!acc) acc = {} | |
21 … | + acc[data.value.author] = {id: data.key, sequence: data.value.sequence, ts: data.value.timestamp} | |
22 … | + return acc | |
21 | 23 … | }) |
22 | 24 … | |
23 | 25 … | return function (log, name) { |
24 | 26 … | var index = createIndex(log, name) |
25 | 27 … | index.methods.latest = 'source' |
26 | 28 … | |
27 | 29 … | index.latest = function (opts) { |
28 | - opts = opts || {} | |
29 | - return pull( | |
30 | - index.read(opts), | |
31 | - pull.map(function (data) { | |
32 | - var d = {id: data.key, sequence: toSeq(data.value.value), ts: data.value.timestamp } | |
33 | - return d | |
30 … | + return pCont(function (cb) { | |
31 … | + index.get([], function (err, val) { | |
32 … | + if(err) return cb(err) | |
33 … | + cb(null, pull.values(Object.keys(val).map(function (author) { | |
34 … | + return {id: author, sequence: val[author].sequence, ts: val[author].ts} | |
35 … | + }))) | |
34 | 36 … | }) |
35 | - ) | |
37 … | + }) | |
36 | 38 … | } |
37 | 39 … | |
38 | 40 … | return index |
39 | 41 … | |
@@ -41,4 +43,7 @@ | ||
41 | 43 … | } |
42 | 44 … | |
43 | 45 … | |
44 | 46 … | |
47 … | + | |
48 … | + | |
49 … | + |
Built with git-ssb-web