Files: 90ce5321ef13ea1e8baf2789adf6cc2e3beccf13 / minimal.js
2867 bytesRaw
1 | var path = require('path') |
2 | var Flume = require('flumedb') |
3 | var OffsetLog = require('flumelog-offset') |
4 | var codec = require('flumecodec/json') |
5 | var AsyncWrite = require('async-write') |
6 | var V = require('ssb-validate') |
7 | var timestamp = require('monotonic-timestamp') |
8 | |
9 | |
10 | /* |
11 | ## queue (msg, cb) |
12 | |
13 | add a message to the log, buffering the write to make it as fast as |
14 | possible, cb when the message is queued. |
15 | |
16 | ## append (msg, cb) |
17 | |
18 | write a message, callback once it's definitely written. |
19 | */ |
20 | |
21 | function toKeyValueTimestamp(msg) { |
22 | return { |
23 | key: V.id(msg), |
24 | value: msg, |
25 | timestamp: timestamp() |
26 | } |
27 | } |
28 | |
29 | module.exports = function (dirname) { |
30 | |
31 | var log = OffsetLog(path.join(dirname, 'log.offset'), 1024*16, codec) |
32 | //NOTE: must use db.ready.set(true) at when migration is complete |
33 | |
34 | var db = Flume(log, false) //false says the database is not ready yet! |
35 | .use('last', require('./indexes/last')()) |
36 | |
37 | var state = V.initial(), ready = false |
38 | var waiting = [], flush = [] |
39 | |
40 | var append = db.rawAppend = db.append |
41 | var queue = AsyncWrite(function (_, cb) { |
42 | var batch = state.queue//.map(toKeyValueTimestamp) |
43 | state.queue = [] |
44 | append(batch, function (err, v) { |
45 | cb(err, v) |
46 | }) |
47 | }, function reduce(_, msg) { |
48 | state = V.append(state, msg) |
49 | state.queue[state.queue.length-1] = toKeyValueTimestamp(state.queue[state.queue.length-1]) |
50 | return state |
51 | }, function (_state) { |
52 | return state.queue.length > 1000 |
53 | }, function isEmpty (_state) { |
54 | return !state.queue.length |
55 | }) |
56 | |
57 | queue.onDrain = function () { |
58 | if(state.queue.length == 0) { |
59 | var l = flush.length |
60 | while(l--) flush.shift()() |
61 | } |
62 | } |
63 | |
64 | db.last.get(function (err, last) { |
65 | //copy to so we avoid weirdness, because this object |
66 | //tracks the state coming in to the database. |
67 | for(var k in last) |
68 | state.feeds[k] = { |
69 | id: last[k].id, |
70 | timestamp: last[k].timestamp, |
71 | sequence: last[k].sequence |
72 | } |
73 | ready = true |
74 | while(waiting.length) |
75 | waiting.shift()() |
76 | }) |
77 | |
78 | function wait(fn) { |
79 | return function (value, cb) { |
80 | if(ready) fn(value, cb) |
81 | else waiting.push(function () { |
82 | fn(value, cb) |
83 | }) |
84 | } |
85 | } |
86 | |
87 | db.queue = wait(function (msg, cb) { |
88 | queue(msg, function (err) { |
89 | if(err) cb(err) |
90 | else cb(null, toKeyValueTimestamp(msg)) |
91 | }) |
92 | }) |
93 | db.append = wait(function (opts, cb) { |
94 | var msg = V.create( |
95 | state.feeds[opts.keys.id], |
96 | opts.keys, opts.hmacKey, |
97 | opts.content, |
98 | timestamp() |
99 | ) |
100 | queue(msg, function (err) { |
101 | if(err) return cb(err) |
102 | var data = state.queue[state.queue.length-1] |
103 | flush.push(function () { |
104 | cb(null, data) |
105 | }) |
106 | }) |
107 | }) |
108 | db.buffer = function () { |
109 | return queue.buffer |
110 | } |
111 | db.flush = function (cb) { |
112 | //maybe need to check if there is anything currently writing? |
113 | flush.push(cb) |
114 | } |
115 | |
116 | return db |
117 | } |
118 | |
119 |
Built with git-ssb-web