git ssb

0+

Dominic / flumeview-bloom



Commit c351bed00d2c0f504ff422b7256156c5b697b8f4

convert to a bloom filter

Dominic Tarr committed on 12/13/2016, 4:35:01 PM
Parent: 8150458b85a4cfd309ddb194f2650d37793b8b1d

Files changed

README.mdchanged
index.jschanged
package.jsonchanged
README.mdView
@@ -8,63 +8,62 @@
88 ``` js
99 var FlumeLog = require('flumelog-offset')
1010 var codec = require('flumecodec')
1111 var Flume = require('flumedb')
12-var Reduce = require('flumeview-reduce')
12 +var Bloom = require('flumeview-bloom')
1313
14-//statistics exports a reduce function that calculates
15-//mean, stdev, etc!
16-var statistics = require('statistics')
17-
1814 //initialize a flumelog with a codec.
1915 //this example uses flumelog-offset, but any flumelog is valid.
2016 var log = FlumeLog(file, 1024*16, codec.json) //use any flume log
2117
2218 //attach the reduce function.
23-var db = Flume(log).use('stats',
24- Reduce(1, statistics, function (data) {
25- return data.value
26- })
19 +var db = Flume(log).use('bloom', Bloom(1, 'key')
2720
28-db.append({value: 1}, function (err) {
29-
30- db.stats.get(function (err, stats) {
31- console.log(stats) // => {mean: 1, stdev: 0, count: 1, sum: 1, ...}
21 +db.append({key: 1, value: 1}, function (err) {
22 + db.bloom.ready(function (err, stats) {
23 + console.log(db.bloom.has(1)) // ==> true
24 + conosle.log(db.bloom.has(2)) // ==> false
3225 })
3326 })
3427 ```
3528
36-## FlumeViewReduce(version, reduce, map?) => FlumeView
29 +## FlumeViewBloom(version, map, opts) => FlumeView
3730
3831 construct a flumeview from this reduce function. `version` should be a number,
39-and must be provided. If you make a breaking change to either `reduce` or `map`
40-then increment `version` and the view will be rebuilt.
32 +and must be provided. If you change `map`
33 +then increment `version` and the view will be rebuilt. Also, if any options change,
34 +the view will be rebuilt.
4135
42-`map` is optional. If map is applied, then each item in the log is passed to `map`
43-and then if the returned value is not null, it is passed to reduce.
36 +`opts` provides options to the [bloom filter](https://github.com/cry/jsbloom) `items` and `probability`.
37 +default settings are `100,000` items and `0.001` probability of a collision.
4438
45-``` js
46-var _data = map(data)
47-if(_data != null)
48- state = reduce(state, map(data))
49-```
39 +`map` is the key that is used to id each item. it can be a function that returns a string,
40 +or if it is a string then that property is taken from the item.
5041
51-using a `map` function is useful, because it enables efficiently streaming the realtime
52-changes in the state to a remote client.
42 +## db[name].has(key) => boolean
5343
54-then, pass the flumeview to `db.use(name, flumeview)`
55-and you'll have access to the flumeview methods on `db[name]...`
44 +check if an item with `key` is in the log.
45 +If the result is `false`, then the item is _not_ in the database, but if the result is `true`,
46 +then the item _might_ be in the database.
5647
57-## db[name].get(cb)
48 +## Uses
5849
59-get the current state of the reduce. This will wait until the view is up to date, if necessary.
50 +### cheaply enforce uniqueness
6051
61-## db[name].stream({live: boolean}) => PullSource
52 +Before adding something, check if you already have it.
53 +If the bloom filter does not have it, then we can add it without any other checks.
54 +But since bloom filters can give false positives, if it says yes, we need to check if it really
55 +is there. This will be a more expensive check, but we only need to do it if the bloom check fails.
6256
63-Stream the changing reduce state. for this to work, a map function must be provided.
57 +### estimating the number of unique values
6458
65-If so, the same reduce function can be used to process the output.
59 +By measuring the probability of a bloom filter match, we can get an estimate of the number of
60 +unique values added to the bloom filter. For example, unique visits to your website.
61 +This could also be used to track the how many possible values a field might have.
6662
63 +
6764 ## License
6865
6966 MIT
7067
68 +
69 +
index.jsView
@@ -2,10 +2,9 @@
22 var Drain = require('pull-stream/sinks/drain')
33 var Once = require('pull-stream/sources/once')
44 var AtomicFile = require('atomic-file')
55 var path = require('path')
6-var deepEqual = require('deep-equal')
7-var Notify = require('pull-notify')
6 +var jsbloom = require('jsbloom')
87
98 function isEmpty (o) {
109 for(var k in o) return false
1110 return true
@@ -14,34 +13,46 @@
1413 function isFunction (f) {
1514 return 'function' === typeof f
1615 }
1716
18-function id (e) { return e }
17 +function isString (s) {
18 + return 'string' === typeof s
19 +}
1920
20-module.exports = function (version, reduce, map) {
21 +var codec = require('./codec')
22 +
23 +function lookup(map) {
24 + if(isFunction(map)) return map
25 + else if(isString(map)) return function (obj) { return obj[map] }
26 + else return function (e) { return e }
27 +}
28 +
29 +module.exports = function (version, map, opts) {
30 + opts = opts || {}
31 + var items = opts.items || 100e3
32 + var probability = opts.probability || 0.001
33 +
2134 if(isFunction(version))
2235 throw new Error('version must be a number')
2336
24- map = map || id
25- var notify = Notify()
37 + map = lookup(map)
2638 return function (log, name) { //name is where this view is mounted
2739 var acc, since = Obv(), ts = 0
2840 var value = Obv(), _value, writing = false, state, int
2941
30- //if we are in sync, and have not written recently, then write the current state.
31-
32- // if the log is persisted,
33- // then also save the reduce state.
34- // save whenever the view gets in sync with the log,
35- // as long as it hasn't beet updated in 1 minute.
36-
3742 function write () {
3843 var _ts = Date.now()
3944 if(state && since.value === log.since.value && _ts > ts + 60*1000 && !writing) {
4045 clearTimeout(int)
4146 int = setTimeout(function () {
4247 ts = _ts; writing = true
43- state.set({seq: since.value, version: version, value: _value = value.value}, function () {
48 + state.set({
49 + seq: since.value,
50 + version: version,
51 + items: items,
52 + probability: probability,
53 + value: bloom
54 + }, function () {
4455 writing = false
4556 })
4657 }, 200)
4758 }
@@ -54,16 +65,24 @@
5465 //so filenames monotonically increase, instead of write to `name~` and then `mv name~ name`
5566
5667 if(log.filename) {
5768 var dir = path.dirname(log.filename)
58- state = AtomicFile(path.join(dir, name+'.json'))
69 + state = AtomicFile(path.join(dir, name+'.json'), codec)
5970 state.get(function (err, data) {
60- if(err || isEmpty(data)) since.set(-1)
61- else if(data.version !== version) {
71 + if(err || isEmpty(data)) {
72 + bloom = jsbloom.filter(opts.items || 100e3, opts.probability || 0.001)
73 + since.set(-1)
74 + }
75 + else if( //if any settings have changed, reinitialize the filter.
76 + data.version !== version
77 + || data.items !== opts.items
78 + || data.probabilty !== opts.probability
79 + ) {
80 + bloom = jsbloom.filter(opts.items || 100e3, opts.probability || 0.001)
6281 since.set(-1) //overwrite old data.
6382 }
6483 else {
65- value.set(_value = data.value)
84 + bloom = data.value
6685 since.set(data.seq)
6786 }
6887 })
6988 }
@@ -72,45 +91,47 @@
7291
7392 return {
7493 since: since,
7594 value: value,
76- methods: {get: 'async', stream: 'source'},
77- get: function (path, cb) {
78- if('function' === typeof path)
79- cb = path, path = null
80- cb(null, value.value)
95 + methods: {has: 'sync'},
96 + //has checks immediately, but if you want to wait
97 + //use db[name].ready(function () { db[name].has(key) })
98 + //ready is added by flumedb
99 + has: function (key) {
100 + return bloom.checkEntry(key)
81101 },
82- stream: function (opts) {
83- opts = opts || {}
84- //todo: send the HASH of the value, and only resend it if it is different!
85- if(opts.live !== true)
86- return Once(value.value)
87- var source = notify.listen()
88- //start by sending the current value...
89- source.push(value.value)
90- return source
91- },
92102 createSink: function (cb) {
93103 return Drain(function (data) {
94- var _data = map(data.value, data.seq)
95- if(_data != null) value.set(reduce(value.value, _data, data.seq))
96- since.set(data.seq)
97- notify(_data)
98- write()
104 + var key = map(data.value, data.seq)
105 + if(key) bloom.addEntry(key)
106 + since.set(data.seq)
107 + write()
99108 }, cb)
100109 },
101110 destroy: function (cb) {
102- value.set(null); since.set(-1);
111 + bloom = null; since.set(-1);
103112 if(state) state.set(null, cb)
104113 else cb()
105114 },
106115 close: function (cb) {
107116 clearTimeout(int)
108117 if(!since.value) return cb()
109118 //force a write.
110- state.set({seq: since.value, version: version, value: _value = value.value}, cb)
119 + state.set({
120 + seq: since.value,
121 + version: version,
122 + items: opts.items,
123 + probability: opts.probability,
124 + value: bloom
125 + }, cb)
111126 }
112127 }
113128 }
114129 }
115130
116131
132 +
133 +
134 +
135 +
136 +
137 +
package.jsonView
@@ -9,8 +9,9 @@
99 },
1010 "dependencies": {
1111 "atomic-file": "^0.1.0",
1212 "deep-equal": "^1.0.1",
13 + "jsbloom": "^1.0.3",
1314 "obv": "0.0.0",
1415 "pull-notify": "^0.1.1",
1516 "pull-stream": "^3.5.0"
1617 },

Built with git-ssb-web