Files: c754513e3ef69c37599ddc007f12abe5dceabaa1 / index.js
3267 bytesRaw
1 | var Obv = require('obv') |
2 | var Drain = require('pull-stream/sinks/drain') |
3 | var Once = require('pull-stream/sources/once') |
4 | var AtomicFile = require('atomic-file') |
5 | var path = require('path') |
6 | var deepEqual = require('deep-equal') |
7 | var Notify = require('pull-notify') |
8 | |
9 | function isEmpty (o) { |
10 | for(var k in o) return false |
11 | return true |
12 | } |
13 | |
14 | function id (e) { return e } |
15 | |
16 | module.exports = function (reduce, map) { |
17 | map = map || id |
18 | var notify = Notify() |
19 | return function (log, name) { //name is where this view is mounted |
20 | var acc, since = Obv(), ts = 0 |
21 | var value = Obv(), _value, writing = false, state, int |
22 | |
23 | //if we are in sync, and have not written recently, then write the current state. |
24 | |
25 | // if the log is persisted, |
26 | // then also save the reduce state. |
27 | // save whenever the view gets in sync with the log, |
28 | // as long as it hasn't beet updated in 1 minute. |
29 | |
30 | function write () { |
31 | var _ts = Date.now() |
32 | if(state && since.value === log.since.value && _ts > ts + 60*1000 && !writing) { |
33 | clearTimeout(int) |
34 | int = setTimeout(function () { |
35 | ts = _ts; writing = true |
36 | state.set({seq: since.value, value: _value = value.value}, function () { |
37 | writing = false |
38 | }) |
39 | }, 200) |
40 | } |
41 | } |
42 | |
43 | //depending on the function, the reduction may not change on every update. |
44 | //but currently, we still need to rewrite the file to reflect that. |
45 | //(or accept that we'll have to reprocess some items) |
46 | //might be good to have a cheap way to update the seq. maybe put it in the filename, |
47 | //so filenames monotonically increase, instead of write to `name~` and then `mv name~ name` |
48 | |
49 | if(log.filename) { |
50 | var dir = path.dirname(log.filename) |
51 | state = AtomicFile(path.join(dir, name+'.json')) |
52 | state.get(function (err, data) { |
53 | if(err || isEmpty(data)) since.set(-1) |
54 | else { |
55 | value.set(_value = data.value) |
56 | since.set(data.seq) |
57 | } |
58 | }) |
59 | } |
60 | else |
61 | since.set(-1) |
62 | |
63 | return { |
64 | since: since, |
65 | value: value, |
66 | methods: {get: 'async', stream: 'source'}, |
67 | get: function (path, cb) { |
68 | if('function' === typeof path) |
69 | cb = path, path = null |
70 | cb(null, value.value) |
71 | }, |
72 | stream: function (opts) { |
73 | opts = opts || {} |
74 | //todo: send the HASH of the value, and only resend it if it is different! |
75 | if(opts.live !== true) |
76 | return Once(value.value) |
77 | var source = notify.listen() |
78 | //start by sending the current value... |
79 | source.push(value.value) |
80 | return source |
81 | }, |
82 | createSink: function (cb) { |
83 | return Drain(function (data) { |
84 | var _data = map(data.value, data.seq) |
85 | if(_data != null) value.set(reduce(value.value, _data, data.seq)) |
86 | since.set(data.seq) |
87 | notify(_data) |
88 | write() |
89 | }, cb) |
90 | }, |
91 | destroy: function (cb) { |
92 | value.set(null); since.set(-1); |
93 | if(state) state.set(null, cb) |
94 | else cb() |
95 | }, |
96 | close: function (cb) { |
97 | clearTimeout(int) |
98 | if(!since.value) return cb() |
99 | //force a write. |
100 | state.set({seq: since.value, value: _value = value.value}, cb) |
101 | } |
102 | } |
103 | } |
104 | } |
105 | |
106 |
Built with git-ssb-web