git ssb

0+

Dominic / flumeview-bloom



Tree: ab7d772ebb34b2168f673af11889ea2efd68d3b0

Files: ab7d772ebb34b2168f673af11889ea2efd68d3b0 / index.js

1953 bytesRaw
1var Obv = require('obv')
2var Drain = require('pull-stream/sinks/drain')
3var AtomicFile = require('atomic-file')
4var path = require('path')
5var deepEqual = require('deep-equal')
6
7function isEmpty (o) {
8 for(var k in o) return false
9 return true
10}
11
12module.exports = function (reduce) {
13 return function (log, name) { //name is where this view is mounted
14 var acc, since = Obv()
15 var value = Obv(), _value, int
16
17 // if the log is persisted,
18 // then also save the reduce state.
19 // currently saving it every minute, if it's changed.
20 // I don't think this is the best way
21 // but it's the easiest...
22 if(log.filename) {
23 var dir = path.dirname(log.filename)
24 var state = AtomicFile(path.join(dir, name+'.json'))
25 state.get(function (err, data) {
26 if(err || isEmpty(data)) since.set(-1)
27 else {
28 value.set(_value = data.value)
29 since.set(data.seq)
30 }
31 })
32 ;(function next() {
33 int = setTimeout(function () {
34 if(since.value > 0 && !deepEqual(_value, value.value))
35 state.set({seq: since.value, value: _value = value.value}, function () {})
36 next()
37 }, 1000*60)
38 int.unref()
39 })()
40 }
41 else
42 since.set(-1)
43
44 return {
45 since: since,
46 value: value,
47 methods: {get: 'async'},
48 get: function (path, cb) {
49 cb(null, value.value)
50 },
51 createSink: function (cb) {
52 return Drain(function (data) {
53 value.set(reduce(value.value, data.value, data.seq))
54 since.set(data.seq)
55 }, cb)
56 },
57 destroy: function (cb) {
58 value.set(null); since.set(-1);
59 if(state) state.set(null, cb)
60 else cb()
61 },
62 close: function (cb) {
63 clearTimeout(int)
64 if(!since.value) return cb()
65 //force a write.
66 state.set({seq: since.value, value: _value = value.value}, cb)
67 }
68 }
69 }
70}
71
72
73
74
75
76

Built with git-ssb-web