git ssb

1+

Dominic / secure-scuttlebutt



Tree: 8e8434f441e3e179672053fbde29570a4fcf6981

Files: 8e8434f441e3e179672053fbde29570a4fcf6981 / minimal.js

4557 bytesRaw
1'use strict'
2var path = require('path')
3var Flume = require('flumedb')
4var codec = require('./codec')
5var AsyncWrite = require('async-write')
6var V = require('ssb-validate')
7var timestamp = require('monotonic-timestamp')
8var Obv = require('obv')
9var ssbKeys = require('ssb-keys')
10var box = ssbKeys.box
11var u = require('./util')
12var isFeed = require('ssb-ref').isFeed
13
14/*
15var Compat = require('flumelog-aligned-offset/compat')
16var FlumeLogAligned = require('flumelog-aligned-offset')
17function OffsetLog(file, opts) {
18 return Compat(FlumeLogAligned(file, opts))
19}
20*/
21var OffsetLog = require('flumelog-memory')
22/*
23 this file provides the flumelog,
24 message append (and validation)
25 and decrypting - as that is part of loading the messages.
26
27*/
28
29var isArray = Array.isArray
30function isFunction (f) { return typeof f === 'function' }
31
32/*
33## queue (msg, cb)
34
35add a message to the log, buffering the write to make it as fast as
36possible, cb when the message is queued.
37
38## append (msg, cb)
39
40write a message, callback once it's definitely written.
41*/
42
43function isString (s) {
44 return typeof s === 'string'
45}
46
47module.exports = function (dirname, keys, opts, map) {
48 var hmacKey = opts && opts.caps && opts.caps.sign
49
50 var log = OffsetLog(path.join(dirname, 'log.offset'), { blockSize: 1024 * 16, codec })
51
52 // NOTE: must use db.ready.set(true) at when migration is complete
53 // false says the database is not ready yet!
54 var db = Flume(log, true, map)
55 .use('last', require('./indexes/last')())
56
57 var state = V.initial()
58 var ready = false
59 var waiting = []
60 var flush = []
61
62 var append = db.rawAppend = db.append
63 db.post = Obv()
64 var queue = AsyncWrite(function (_, cb) {
65 var batch = state.queue
66 state.queue = []
67 append(batch, function (err, v) {
68 batch.forEach(function (data) {
69 db.post.set(u.originalData(data))
70 })
71 cb(err, v)
72 })
73 }, function reduce (_, msg) {
74 return V.append(state, hmacKey, msg)
75 }, function (_state) {
76 return state.queue.length > 1000
77 }, function isEmpty (_state) {
78 return !state.queue.length
79 }, 100)
80
81 queue.onDrain = function () {
82 if (state.queue.length === 0) {
83 var l = flush.length
84 for (var i = 0; i < l; ++i) { flush[i]() }
85 flush = flush.slice(l)
86 }
87 }
88
89 //load the map of the latest items, copy into validation state.
90 db.last.get(function (_, last) {
91 // copy to so we avoid weirdness, because this object
92 // tracks the state coming in to the database.
93 for (var k in last) {
94 state.feeds[k] = {
95 id: last[k].id,
96 timestamp: last[k].ts || last[k].timestamp,
97 sequence: last[k].sequence,
98 queue: []
99 }
100 }
101 ready = true
102
103 var l = waiting.length
104 for (var i = 0; i < l; ++i) { waiting[i]() }
105 waiting = waiting.slice(l)
106 })
107
108 function wait (fn) {
109 return function (value, cb) {
110 if (ready) fn(value, cb)
111 else {
112 waiting.push(function () {
113 fn(value, cb)
114 })
115 }
116 }
117 }
118
119 db.queue = wait(function (msg, cb) {
120 queue(msg, function (err) {
121 var data = state.queue[state.queue.length - 1]
122 if (err) cb(err)
123 else cb(null, data)
124 })
125 })
126
127 db.append = wait(function (opts, cb) {
128 try {
129 var content = opts.content
130 var recps = opts.content.recps
131 if (recps) {
132 const isNonEmptyArrayOfFeeds = isArray(recps) && recps.every(isFeed) && recps.length > 0
133 if (isFeed(recps) || isNonEmptyArrayOfFeeds) {
134 recps = opts.content.recps = [].concat(recps) // force to array
135 content = opts.content = box(opts.content, recps)
136 } else {
137 const errMsg = 'private message recipients must be valid, was:' + JSON.stringify(recps)
138 throw new Error(errMsg)
139 }
140 }
141
142 var msg = V.create(
143 state.feeds[opts.keys.id],
144 opts.keys, opts.hmacKey || hmacKey,
145 content,
146 timestamp()
147 )
148 } catch (err) {
149 cb(err)
150 return
151 }
152
153 queue(msg, function (err) {
154 if (err) return cb(err)
155 var data = state.queue[state.queue.length - 1]
156 flush.push(function () {
157 cb(null, data)
158 })
159 })
160 })
161
162 db.publish = function (content, cb) {
163 return db.append({content: content, keys: keys}, cb)
164 }
165
166 db.buffer = function () {
167 return queue.buffer
168 }
169
170 db.flush = function (cb) {
171 // maybe need to check if there is anything currently writing?
172 if (!queue.buffer || !queue.buffer.queue.length && !queue.writing) cb()
173 else flush.push(cb)
174 }
175
176 return db
177}
178
179
180
181

Built with git-ssb-web