git ssb

0+

Dominic / flumeview-bloom



Tree: 8150458b85a4cfd309ddb194f2650d37793b8b1d

Files: 8150458b85a4cfd309ddb194f2650d37793b8b1d / index.js

3550 bytesRaw
1var Obv = require('obv')
2var Drain = require('pull-stream/sinks/drain')
3var Once = require('pull-stream/sources/once')
4var AtomicFile = require('atomic-file')
5var path = require('path')
6var deepEqual = require('deep-equal')
7var Notify = require('pull-notify')
8
9function isEmpty (o) {
10 for(var k in o) return false
11 return true
12}
13
14function isFunction (f) {
15 return 'function' === typeof f
16}
17
18function id (e) { return e }
19
20module.exports = function (version, reduce, map) {
21 if(isFunction(version))
22 throw new Error('version must be a number')
23
24 map = map || id
25 var notify = Notify()
26 return function (log, name) { //name is where this view is mounted
27 var acc, since = Obv(), ts = 0
28 var value = Obv(), _value, writing = false, state, int
29
30 //if we are in sync, and have not written recently, then write the current state.
31
32 // if the log is persisted,
33 // then also save the reduce state.
34 // save whenever the view gets in sync with the log,
35 // as long as it hasn't beet updated in 1 minute.
36
37 function write () {
38 var _ts = Date.now()
39 if(state && since.value === log.since.value && _ts > ts + 60*1000 && !writing) {
40 clearTimeout(int)
41 int = setTimeout(function () {
42 ts = _ts; writing = true
43 state.set({seq: since.value, version: version, value: _value = value.value}, function () {
44 writing = false
45 })
46 }, 200)
47 }
48 }
49
50 //depending on the function, the reduction may not change on every update.
51 //but currently, we still need to rewrite the file to reflect that.
52 //(or accept that we'll have to reprocess some items)
53 //might be good to have a cheap way to update the seq. maybe put it in the filename,
54 //so filenames monotonically increase, instead of write to `name~` and then `mv name~ name`
55
56 if(log.filename) {
57 var dir = path.dirname(log.filename)
58 state = AtomicFile(path.join(dir, name+'.json'))
59 state.get(function (err, data) {
60 if(err || isEmpty(data)) since.set(-1)
61 else if(data.version !== version) {
62 since.set(-1) //overwrite old data.
63 }
64 else {
65 value.set(_value = data.value)
66 since.set(data.seq)
67 }
68 })
69 }
70 else
71 since.set(-1)
72
73 return {
74 since: since,
75 value: value,
76 methods: {get: 'async', stream: 'source'},
77 get: function (path, cb) {
78 if('function' === typeof path)
79 cb = path, path = null
80 cb(null, value.value)
81 },
82 stream: function (opts) {
83 opts = opts || {}
84 //todo: send the HASH of the value, and only resend it if it is different!
85 if(opts.live !== true)
86 return Once(value.value)
87 var source = notify.listen()
88 //start by sending the current value...
89 source.push(value.value)
90 return source
91 },
92 createSink: function (cb) {
93 return Drain(function (data) {
94 var _data = map(data.value, data.seq)
95 if(_data != null) value.set(reduce(value.value, _data, data.seq))
96 since.set(data.seq)
97 notify(_data)
98 write()
99 }, cb)
100 },
101 destroy: function (cb) {
102 value.set(null); since.set(-1);
103 if(state) state.set(null, cb)
104 else cb()
105 },
106 close: function (cb) {
107 clearTimeout(int)
108 if(!since.value) return cb()
109 //force a write.
110 state.set({seq: since.value, version: version, value: _value = value.value}, cb)
111 }
112 }
113 }
114}
115
116
117

Built with git-ssb-web