Files: a821b0ce884300e8ff3901678c19c6827e1189e9 / lib / mutant-pull-reduce.js
1134 bytesRaw
1 | var pullPause = require('pull-pause') |
2 | var Value = require('@mmckegg/mutant/value') |
3 | var LazyWatcher = require('@mmckegg/mutant/lib/lazy-watcher') |
4 | |
5 | var pull = require('pull-stream') |
6 | |
7 | module.exports = function (stream, reducer, opts) { |
8 | var pauser = pullPause((paused) => {}) |
9 | var seq = 0 |
10 | var lastSeq = -1 |
11 | pauser.pause() |
12 | |
13 | var binder = LazyWatcher(update, pauser.resume, pauser.pause) |
14 | var result = function MutantPullReduce (listener) { |
15 | if (!listener) { |
16 | return binder.getValue() |
17 | } |
18 | return binder.addListener(listener) |
19 | } |
20 | |
21 | binder.value = opts.startValue |
22 | binder.nextTick = opts.nextTick |
23 | result.sync = Value(false) |
24 | |
25 | pull( |
26 | stream, |
27 | pauser, |
28 | pull.drain((item) => { |
29 | if (item.sync) { |
30 | result.sync.set(true) |
31 | } else { |
32 | seq += 1 |
33 | binder.value = reducer(binder.value, item) |
34 | binder.onUpdate() |
35 | } |
36 | }) |
37 | ) |
38 | |
39 | return result |
40 | |
41 | // scoped |
42 | |
43 | function update () { |
44 | if (!binder.live) { |
45 | // attempt to push through sync changes |
46 | pauser.resume() |
47 | pauser.pause() |
48 | } |
49 | |
50 | if (lastSeq !== seq) { |
51 | seq = lastSeq |
52 | return true |
53 | } |
54 | } |
55 | } |
56 |
Built with git-ssb-web