Files: 008629bad0826f698889a686293256b806ad2db9 / index.js
3550 bytesRaw
1 | var Obv = require('obv') |
2 | var Drain = require('pull-stream/sinks/drain') |
3 | var Once = require('pull-stream/sources/once') |
4 | var AtomicFile = require('atomic-file') |
5 | var path = require('path') |
6 | var deepEqual = require('deep-equal') |
7 | var Notify = require('pull-notify') |
8 | |
9 | function isEmpty (o) { |
10 | for(var k in o) return false |
11 | return true |
12 | } |
13 | |
14 | function isFunction (f) { |
15 | return 'function' === typeof f |
16 | } |
17 | |
18 | function id (e) { return e } |
19 | |
20 | module.exports = function (version, reduce, map) { |
21 | if(isFunction(version)) |
22 | throw new Error('version must be a number') |
23 | |
24 | map = map || id |
25 | var notify = Notify() |
26 | return function (log, name) { //name is where this view is mounted |
27 | var acc, since = Obv(), ts = 0 |
28 | var value = Obv(), _value, writing = false, state, int |
29 | |
30 | //if we are in sync, and have not written recently, then write the current state. |
31 | |
32 | // if the log is persisted, |
33 | // then also save the reduce state. |
34 | // save whenever the view gets in sync with the log, |
35 | // as long as it hasn't beet updated in 1 minute. |
36 | |
37 | function write () { |
38 | var _ts = Date.now() |
39 | if(state && since.value === log.since.value && _ts > ts + 60*1000 && !writing) { |
40 | clearTimeout(int) |
41 | int = setTimeout(function () { |
42 | ts = _ts; writing = true |
43 | state.set({seq: since.value, version: version, value: _value = value.value}, function () { |
44 | writing = false |
45 | }) |
46 | }, 200) |
47 | } |
48 | } |
49 | |
50 | //depending on the function, the reduction may not change on every update. |
51 | //but currently, we still need to rewrite the file to reflect that. |
52 | //(or accept that we'll have to reprocess some items) |
53 | //might be good to have a cheap way to update the seq. maybe put it in the filename, |
54 | //so filenames monotonically increase, instead of write to `name~` and then `mv name~ name` |
55 | |
56 | if(log.filename) { |
57 | var dir = path.dirname(log.filename) |
58 | state = AtomicFile(path.join(dir, name+'.json')) |
59 | state.get(function (err, data) { |
60 | if(err || isEmpty(data)) since.set(-1) |
61 | else if(data.version !== version) { |
62 | since.set(-1) //overwrite old data. |
63 | } |
64 | else { |
65 | value.set(_value = data.value) |
66 | since.set(data.seq) |
67 | } |
68 | }) |
69 | } |
70 | else |
71 | since.set(-1) |
72 | |
73 | return { |
74 | since: since, |
75 | value: value, |
76 | methods: {get: 'async', stream: 'source'}, |
77 | get: function (path, cb) { |
78 | if('function' === typeof path) |
79 | cb = path, path = null |
80 | cb(null, value.value) |
81 | }, |
82 | stream: function (opts) { |
83 | opts = opts || {} |
84 | //todo: send the HASH of the value, and only resend it if it is different! |
85 | if(opts.live !== true) |
86 | return Once(value.value) |
87 | var source = notify.listen() |
88 | //start by sending the current value... |
89 | source.push(value.value) |
90 | return source |
91 | }, |
92 | createSink: function (cb) { |
93 | return Drain(function (data) { |
94 | var _data = map(data.value, data.seq) |
95 | if(_data != null) value.set(reduce(value.value, _data, data.seq)) |
96 | since.set(data.seq) |
97 | notify(_data) |
98 | write() |
99 | }, cb) |
100 | }, |
101 | destroy: function (cb) { |
102 | value.set(null); since.set(-1); |
103 | if(state) state.set(null, cb) |
104 | else cb() |
105 | }, |
106 | close: function (cb) { |
107 | clearTimeout(int) |
108 | if(!since.value) return cb() |
109 | //force a write. |
110 | state.set({seq: since.value, version: version, value: _value = value.value}, cb) |
111 | } |
112 | } |
113 | } |
114 | } |
115 | |
116 | |
117 |
Built with git-ssb-web