git ssb

0+

Dominic / flumeview-bloom



Tree: c754513e3ef69c37599ddc007f12abe5dceabaa1

Files: c754513e3ef69c37599ddc007f12abe5dceabaa1 / index.js

3267 bytesRaw
1var Obv = require('obv')
2var Drain = require('pull-stream/sinks/drain')
3var Once = require('pull-stream/sources/once')
4var AtomicFile = require('atomic-file')
5var path = require('path')
6var deepEqual = require('deep-equal')
7var Notify = require('pull-notify')
8
9function isEmpty (o) {
10 for(var k in o) return false
11 return true
12}
13
14function id (e) { return e }
15
16module.exports = function (reduce, map) {
17 map = map || id
18 var notify = Notify()
19 return function (log, name) { //name is where this view is mounted
20 var acc, since = Obv(), ts = 0
21 var value = Obv(), _value, writing = false, state, int
22
23 //if we are in sync, and have not written recently, then write the current state.
24
25 // if the log is persisted,
26 // then also save the reduce state.
27 // save whenever the view gets in sync with the log,
28 // as long as it hasn't beet updated in 1 minute.
29
30 function write () {
31 var _ts = Date.now()
32 if(state && since.value === log.since.value && _ts > ts + 60*1000 && !writing) {
33 clearTimeout(int)
34 int = setTimeout(function () {
35 ts = _ts; writing = true
36 state.set({seq: since.value, value: _value = value.value}, function () {
37 writing = false
38 })
39 }, 200)
40 }
41 }
42
43 //depending on the function, the reduction may not change on every update.
44 //but currently, we still need to rewrite the file to reflect that.
45 //(or accept that we'll have to reprocess some items)
46 //might be good to have a cheap way to update the seq. maybe put it in the filename,
47 //so filenames monotonically increase, instead of write to `name~` and then `mv name~ name`
48
49 if(log.filename) {
50 var dir = path.dirname(log.filename)
51 state = AtomicFile(path.join(dir, name+'.json'))
52 state.get(function (err, data) {
53 if(err || isEmpty(data)) since.set(-1)
54 else {
55 value.set(_value = data.value)
56 since.set(data.seq)
57 }
58 })
59 }
60 else
61 since.set(-1)
62
63 return {
64 since: since,
65 value: value,
66 methods: {get: 'async', stream: 'source'},
67 get: function (path, cb) {
68 if('function' === typeof path)
69 cb = path, path = null
70 cb(null, value.value)
71 },
72 stream: function (opts) {
73 opts = opts || {}
74 //todo: send the HASH of the value, and only resend it if it is different!
75 if(opts.live !== true)
76 return Once(value.value)
77 var source = notify.listen()
78 //start by sending the current value...
79 source.push(value.value)
80 return source
81 },
82 createSink: function (cb) {
83 return Drain(function (data) {
84 var _data = map(data.value, data.seq)
85 if(_data != null) value.set(reduce(value.value, _data, data.seq))
86 since.set(data.seq)
87 notify(_data)
88 write()
89 }, cb)
90 },
91 destroy: function (cb) {
92 value.set(null); since.set(-1);
93 if(state) state.set(null, cb)
94 else cb()
95 },
96 close: function (cb) {
97 clearTimeout(int)
98 if(!since.value) return cb()
99 //force a write.
100 state.set({seq: since.value, value: _value = value.value}, cb)
101 }
102 }
103 }
104}
105
106

Built with git-ssb-web