git ssb

1+

Dominic / secure-scuttlebutt



Tree: 90ce5321ef13ea1e8baf2789adf6cc2e3beccf13

Files: 90ce5321ef13ea1e8baf2789adf6cc2e3beccf13 / minimal.js

2867 bytesRaw
1var path = require('path')
2var Flume = require('flumedb')
3var OffsetLog = require('flumelog-offset')
4var codec = require('flumecodec/json')
5var AsyncWrite = require('async-write')
6var V = require('ssb-validate')
7var timestamp = require('monotonic-timestamp')
8
9
10/*
11## queue (msg, cb)
12
13add a message to the log, buffering the write to make it as fast as
14possible, cb when the message is queued.
15
16## append (msg, cb)
17
18write a message, callback once it's definitely written.
19*/
20
21function toKeyValueTimestamp(msg) {
22 return {
23 key: V.id(msg),
24 value: msg,
25 timestamp: timestamp()
26 }
27}
28
29module.exports = function (dirname) {
30
31 var log = OffsetLog(path.join(dirname, 'log.offset'), 1024*16, codec)
32 //NOTE: must use db.ready.set(true) at when migration is complete
33
34 var db = Flume(log, false) //false says the database is not ready yet!
35 .use('last', require('./indexes/last')())
36
37 var state = V.initial(), ready = false
38 var waiting = [], flush = []
39
40 var append = db.rawAppend = db.append
41 var queue = AsyncWrite(function (_, cb) {
42 var batch = state.queue//.map(toKeyValueTimestamp)
43 state.queue = []
44 append(batch, function (err, v) {
45 cb(err, v)
46 })
47 }, function reduce(_, msg) {
48 state = V.append(state, msg)
49 state.queue[state.queue.length-1] = toKeyValueTimestamp(state.queue[state.queue.length-1])
50 return state
51 }, function (_state) {
52 return state.queue.length > 1000
53 }, function isEmpty (_state) {
54 return !state.queue.length
55 })
56
57 queue.onDrain = function () {
58 if(state.queue.length == 0) {
59 var l = flush.length
60 while(l--) flush.shift()()
61 }
62 }
63
64 db.last.get(function (err, last) {
65 //copy to so we avoid weirdness, because this object
66 //tracks the state coming in to the database.
67 for(var k in last)
68 state.feeds[k] = {
69 id: last[k].id,
70 timestamp: last[k].timestamp,
71 sequence: last[k].sequence
72 }
73 ready = true
74 while(waiting.length)
75 waiting.shift()()
76 })
77
78 function wait(fn) {
79 return function (value, cb) {
80 if(ready) fn(value, cb)
81 else waiting.push(function () {
82 fn(value, cb)
83 })
84 }
85 }
86
87 db.queue = wait(function (msg, cb) {
88 queue(msg, function (err) {
89 if(err) cb(err)
90 else cb(null, toKeyValueTimestamp(msg))
91 })
92 })
93 db.append = wait(function (opts, cb) {
94 var msg = V.create(
95 state.feeds[opts.keys.id],
96 opts.keys, opts.hmacKey,
97 opts.content,
98 timestamp()
99 )
100 queue(msg, function (err) {
101 if(err) return cb(err)
102 var data = state.queue[state.queue.length-1]
103 flush.push(function () {
104 cb(null, data)
105 })
106 })
107 })
108 db.buffer = function () {
109 return queue.buffer
110 }
111 db.flush = function (cb) {
112 //maybe need to check if there is anything currently writing?
113 flush.push(cb)
114 }
115
116 return db
117}
118
119

Built with git-ssb-web