Commit f854b637eba6ce4b8832080a3c9f86771a161db7
initil
Dominic Tarr committed on 12/13/2016, 3:36:45 PMParent: 8150458b85a4cfd309ddb194f2650d37793b8b1d
Files changed
README.md | changed |
index.js | changed |
package.json | changed |
codec.js | added |
README.md | ||
---|---|---|
@@ -8,63 +8,62 @@ | ||
8 | 8 … | ``` js |
9 | 9 … | var FlumeLog = require('flumelog-offset') |
10 | 10 … | var codec = require('flumecodec') |
11 | 11 … | var Flume = require('flumedb') |
12 | -var Reduce = require('flumeview-reduce') | |
12 … | +var Bloom = require('flumeview-bloom') | |
13 | 13 … | |
14 | -//statistics exports a reduce function that calculates | |
15 | -//mean, stdev, etc! | |
16 | -var statistics = require('statistics') | |
17 | - | |
18 | 14 … | //initialize a flumelog with a codec. |
19 | 15 … | //this example uses flumelog-offset, but any flumelog is valid. |
20 | 16 … | var log = FlumeLog(file, 1024*16, codec.json) //use any flume log |
21 | 17 … | |
22 | 18 … | //attach the reduce function. |
23 | -var db = Flume(log).use('stats', | |
24 | - Reduce(1, statistics, function (data) { | |
25 | - return data.value | |
26 | - }) | |
19 … | +var db = Flume(log).use('bloom', Bloom(1, 'key') | |
27 | 20 … | |
28 | -db.append({value: 1}, function (err) { | |
29 | - | |
30 | - db.stats.get(function (err, stats) { | |
31 | - console.log(stats) // => {mean: 1, stdev: 0, count: 1, sum: 1, ...} | |
21 … | +db.append({key: 1, value: 1}, function (err) { | |
22 … | + db.bloom.ready(function (err, stats) { | |
23 … | + console.log(db.bloom.has(1)) // ==> true | |
24 … | + conosle.log(db.bloom.has(2)) // ==> false | |
32 | 25 … | }) |
33 | 26 … | }) |
34 | 27 … | ``` |
35 | 28 … | |
36 | -## FlumeViewReduce(version, reduce, map?) => FlumeView | |
29 … | +## FlumeViewBloom(version, map, opts) => FlumeView | |
37 | 30 … | |
38 | 31 … | construct a flumeview from this reduce function. `version` should be a number, |
39 | -and must be provided. If you make a breaking change to either `reduce` or `map` | |
40 | -then increment `version` and the view will be rebuilt. | |
32 … | +and must be provided. If you change `map` | |
33 … | +then increment `version` and the view will be rebuilt. Also, if any options change, | |
34 … | +the view will be rebuilt. | |
41 | 35 … | |
42 | -`map` is optional. If map is applied, then each item in the log is passed to `map` | |
43 | -and then if the returned value is not null, it is passed to reduce. | |
36 … | +`opts` provides options to the [bloom filter](https://github.com/cry/jsbloom) `items` and `probability`. | |
37 … | +default settings are `100,000` items and `0.001` probability of a collision. | |
44 | 38 … | |
45 | -``` js | |
46 | -var _data = map(data) | |
47 | -if(_data != null) | |
48 | - state = reduce(state, map(data)) | |
49 | -``` | |
39 … | +`map` is the key that is used to id each item. it can be a function that returns a string, | |
40 … | +or if it is a string then that property is taken from the item. | |
50 | 41 … | |
51 | -using a `map` function is useful, because it enables efficiently streaming the realtime | |
52 | -changes in the state to a remote client. | |
42 … | +## db[name].has(key) => boolean | |
53 | 43 … | |
54 | -then, pass the flumeview to `db.use(name, flumeview)` | |
55 | -and you'll have access to the flumeview methods on `db[name]...` | |
44 … | +check if an item with `key` is in the log. | |
45 … | +If the result is `false`, then the item is _not_ in the database, but if the result is `true`, | |
46 … | +then the item _might_ be in the database. | |
56 | 47 … | |
57 | -## db[name].get(cb) | |
48 … | +## Uses | |
58 | 49 … | |
59 | -get the current state of the reduce. This will wait until the view is up to date, if necessary. | |
50 … | +### cheaply enforce uniqueness | |
60 | 51 … | |
61 | -## db[name].stream({live: boolean}) => PullSource | |
52 … | +Before adding something, check if you already have it. | |
53 … | +If the bloom filter does not have it, then we can add it without any other checks. | |
54 … | +But since bloom filters can give false positives, if it says yes, we need to check if it really | |
55 … | +is there. This will be a more expensive check, but we only need to do it if the bloom check fails. | |
62 | 56 … | |
63 | -Stream the changing reduce state. for this to work, a map function must be provided. | |
57 … | +### estimating the number of unique values | |
64 | 58 … | |
65 | -If so, the same reduce function can be used to process the output. | |
59 … | +By measuring the probability of a bloom filter match, we can get an estimate of the number of | |
60 … | +unique values added to the bloom filter. For example, unique visits to your website. | |
61 … | +This could also be used to track the how many possible values a field might have. | |
66 | 62 … | |
63 … | + | |
67 | 64 … | ## License |
68 | 65 … | |
69 | 66 … | MIT |
70 | 67 … | |
68 … | + | |
69 … | + |
index.js | ||
---|---|---|
@@ -2,10 +2,9 @@ | ||
2 | 2 … | var Drain = require('pull-stream/sinks/drain') |
3 | 3 … | var Once = require('pull-stream/sources/once') |
4 | 4 … | var AtomicFile = require('atomic-file') |
5 | 5 … | var path = require('path') |
6 | -var deepEqual = require('deep-equal') | |
7 | -var Notify = require('pull-notify') | |
6 … | +var jsbloom = require('jsbloom') | |
8 | 7 … | |
9 | 8 … | function isEmpty (o) { |
10 | 9 … | for(var k in o) return false |
11 | 10 … | return true |
@@ -14,34 +13,46 @@ | ||
14 | 13 … | function isFunction (f) { |
15 | 14 … | return 'function' === typeof f |
16 | 15 … | } |
17 | 16 … | |
18 | -function id (e) { return e } | |
17 … | +function isString (s) { | |
18 … | + return 'string' === typeof s | |
19 … | +} | |
19 | 20 … | |
20 | -module.exports = function (version, reduce, map) { | |
21 … | +var codec = require('./codec') | |
22 … | + | |
23 … | +function lookup(map) { | |
24 … | + if(isFunction(map)) return map | |
25 … | + else if(isString(map)) return function (obj) { return obj[map] } | |
26 … | + else return function (e) { return e } | |
27 … | +} | |
28 … | + | |
29 … | +module.exports = function (version, map, opts) { | |
30 … | + opts = opts || {} | |
31 … | + var items = opts.items || 100e3 | |
32 … | + var probability = opts.probability || 0.001 | |
33 … | + | |
21 | 34 … | if(isFunction(version)) |
22 | 35 … | throw new Error('version must be a number') |
23 | 36 … | |
24 | - map = map || id | |
25 | - var notify = Notify() | |
37 … | + map = lookup(map) | |
26 | 38 … | return function (log, name) { //name is where this view is mounted |
27 | 39 … | var acc, since = Obv(), ts = 0 |
28 | 40 … | var value = Obv(), _value, writing = false, state, int |
29 | 41 … | |
30 | - //if we are in sync, and have not written recently, then write the current state. | |
31 | - | |
32 | - // if the log is persisted, | |
33 | - // then also save the reduce state. | |
34 | - // save whenever the view gets in sync with the log, | |
35 | - // as long as it hasn't beet updated in 1 minute. | |
36 | - | |
37 | 42 … | function write () { |
38 | 43 … | var _ts = Date.now() |
39 | 44 … | if(state && since.value === log.since.value && _ts > ts + 60*1000 && !writing) { |
40 | 45 … | clearTimeout(int) |
41 | 46 … | int = setTimeout(function () { |
42 | 47 … | ts = _ts; writing = true |
43 | - state.set({seq: since.value, version: version, value: _value = value.value}, function () { | |
48 … | + state.set({ | |
49 … | + seq: since.value, | |
50 … | + version: version, | |
51 … | + items: items, | |
52 … | + probability: probability, | |
53 … | + value: bloom | |
54 … | + }, function () { | |
44 | 55 … | writing = false |
45 | 56 … | }) |
46 | 57 … | }, 200) |
47 | 58 … | } |
@@ -54,16 +65,24 @@ | ||
54 | 65 … | //so filenames monotonically increase, instead of write to `name~` and then `mv name~ name` |
55 | 66 … | |
56 | 67 … | if(log.filename) { |
57 | 68 … | var dir = path.dirname(log.filename) |
58 | - state = AtomicFile(path.join(dir, name+'.json')) | |
69 … | + state = AtomicFile(path.join(dir, name+'.json'), codec) | |
59 | 70 … | state.get(function (err, data) { |
60 | - if(err || isEmpty(data)) since.set(-1) | |
61 | - else if(data.version !== version) { | |
71 … | + if(err || isEmpty(data)) { | |
72 … | + bloom = jsbloom.filter(opts.items || 100e3, opts.probability || 0.001) | |
73 … | + since.set(-1) | |
74 … | + } | |
75 … | + else if( //if any settings have changed, reinitialize the filter. | |
76 … | + data.version !== version | |
77 … | + || data.items !== opts.items | |
78 … | + || data.probabilty !== opts.probability | |
79 … | + ) { | |
80 … | + bloom = jsbloom.filter(opts.items || 100e3, opts.probability || 0.001) | |
62 | 81 … | since.set(-1) //overwrite old data. |
63 | 82 … | } |
64 | 83 … | else { |
65 | - value.set(_value = data.value) | |
84 … | + bloom = data.value | |
66 | 85 … | since.set(data.seq) |
67 | 86 … | } |
68 | 87 … | }) |
69 | 88 … | } |
@@ -72,45 +91,47 @@ | ||
72 | 91 … | |
73 | 92 … | return { |
74 | 93 … | since: since, |
75 | 94 … | value: value, |
76 | - methods: {get: 'async', stream: 'source'}, | |
77 | - get: function (path, cb) { | |
78 | - if('function' === typeof path) | |
79 | - cb = path, path = null | |
80 | - cb(null, value.value) | |
95 … | + methods: {has: 'sync'}, | |
96 … | + //has checks immediately, but if you want to wait | |
97 … | + //use db[name].ready(function () { db[name].has(key) }) | |
98 … | + //ready is added by flumedb | |
99 … | + has: function (key) { | |
100 … | + return bloom.checkEntry(key) | |
81 | 101 … | }, |
82 | - stream: function (opts) { | |
83 | - opts = opts || {} | |
84 | - //todo: send the HASH of the value, and only resend it if it is different! | |
85 | - if(opts.live !== true) | |
86 | - return Once(value.value) | |
87 | - var source = notify.listen() | |
88 | - //start by sending the current value... | |
89 | - source.push(value.value) | |
90 | - return source | |
91 | - }, | |
92 | 102 … | createSink: function (cb) { |
93 | 103 … | return Drain(function (data) { |
94 | - var _data = map(data.value, data.seq) | |
95 | - if(_data != null) value.set(reduce(value.value, _data, data.seq)) | |
96 | - since.set(data.seq) | |
97 | - notify(_data) | |
98 | - write() | |
104 … | + var key = map(data.value, data.seq) | |
105 … | + if(key) bloom.addEntry(key) | |
106 … | + since.set(data.seq) | |
107 … | + write() | |
99 | 108 … | }, cb) |
100 | 109 … | }, |
101 | 110 … | destroy: function (cb) { |
102 | - value.set(null); since.set(-1); | |
111 … | + bloom = null; since.set(-1); | |
103 | 112 … | if(state) state.set(null, cb) |
104 | 113 … | else cb() |
105 | 114 … | }, |
106 | 115 … | close: function (cb) { |
107 | 116 … | clearTimeout(int) |
108 | 117 … | if(!since.value) return cb() |
109 | 118 … | //force a write. |
110 | - state.set({seq: since.value, version: version, value: _value = value.value}, cb) | |
119 … | + state.set({ | |
120 … | + seq: since.value, | |
121 … | + version: version, | |
122 … | + items: opts.items, | |
123 … | + probability: opts.probability, | |
124 … | + value: bloom | |
125 … | + }, cb) | |
111 | 126 … | } |
112 | 127 … | } |
113 | 128 … | } |
114 | 129 … | } |
115 | 130 … | |
116 | 131 … | |
132 … | + | |
133 … | + | |
134 … | + | |
135 … | + | |
136 … | + | |
137 … | + |
package.json | ||
---|---|---|
@@ -9,8 +9,9 @@ | ||
9 | 9 … | }, |
10 | 10 … | "dependencies": { |
11 | 11 … | "atomic-file": "^0.1.0", |
12 | 12 … | "deep-equal": "^1.0.1", |
13 … | + "jsbloom": "^1.0.3", | |
13 | 14 … | "obv": "0.0.0", |
14 | 15 … | "pull-notify": "^0.1.1", |
15 | 16 … | "pull-stream": "^3.5.0" |
16 | 17 … | }, |
codec.js | ||
---|---|---|
@@ -1,0 +1,43 @@ | ||
1 … | +var createHash = require('sha.js/sha256') | |
2 … | +var deepEqual = require('deep-equal') | |
3 … | +var jsbloom = require('jsbloom') | |
4 … | + | |
5 … | +exports.encode = function (value) { | |
6 … | + if(null == value) return new Buffer(0) | |
7 … | + | |
8 … | + var header = Buffer.concat([ | |
9 … | + data.seq, value.version, | |
10 … | + value.items, | |
11 … | + value.probability | |
12 … | + ].map(function (e) { | |
13 … | + var b = new Buffer(4) | |
14 … | + b.writeUint32BE(0, e) | |
15 … | + return b | |
16 … | + })) | |
17 … | + var lzBuffer = new Buffer(data.value.exportData(), 'base64') | |
18 … | + return Buffer.concat([createHash().update(data).update(raw).digest(), data, lzBuffer]) | |
19 … | +} | |
20 … | + | |
21 … | +exports.decode = function (buffer) { | |
22 … | + if(buffer.length == 0) | |
23 … | + throw new Error('empty buffer') | |
24 … | + | |
25 … | + var hash = buffer.slice(0, 32) | |
26 … | + if(!deepEqual(createHash().update(buffer.slice(32)).digest(), hash)) | |
27 … | + throw new Error('flumeview-bloom.decode: hash is incorrect') | |
28 … | + | |
29 … | + var seq = buffer.readUInt32BE(32+0) | |
30 … | + var version = buffer.readUInt32BE(32+4) | |
31 … | + var items = buffer.readUInt32BE(32+8) | |
32 … | + var probability = buffer.readUInt32BE(32+12) | |
33 … | + | |
34 … | + var bloom = new jsbloom.filter(items, probability) | |
35 … | + bloom.importData(data.splice(32+16).toString('base64') | |
36 … | + return { | |
37 … | + seq: seq, version: version, | |
38 … | + items: items, | |
39 … | + probability: probability, | |
40 … | + value: bloom | |
41 … | + } | |
42 … | +} | |
43 … | + |
Built with git-ssb-web