git ssb

10+

Matt McKegg / patchwork



Tree: 4c3279a881a3983b1e7b4f4bc960633975fe77d6

Files: 4c3279a881a3983b1e7b4f4bc960633975fe77d6 / lib / mutant-pull-reduce.js

1134 bytesRaw
1var pullPause = require('pull-pause')
2var Value = require('@mmckegg/mutant/value')
3var LazyWatcher = require('@mmckegg/mutant/lib/lazy-watcher')
4
5var pull = require('pull-stream')
6
7module.exports = function (stream, reducer, opts) {
8 var pauser = pullPause((paused) => {})
9 var seq = 0
10 var lastSeq = -1
11 pauser.pause()
12
13 var binder = LazyWatcher(update, pauser.resume, pauser.pause)
14 var result = function MutantPullReduce (listener) {
15 if (!listener) {
16 return binder.getValue()
17 }
18 return binder.addListener(listener)
19 }
20
21 binder.value = opts.startValue
22 binder.nextTick = opts.nextTick
23 result.sync = Value(false)
24
25 pull(
26 stream,
27 pauser,
28 pull.drain((item) => {
29 if (item.sync) {
30 result.sync.set(true)
31 } else {
32 seq += 1
33 binder.value = reducer(binder.value, item)
34 binder.onUpdate()
35 }
36 })
37 )
38
39 return result
40
41 // scoped
42
43 function update () {
44 if (!binder.live) {
45 // attempt to push through sync changes
46 pauser.resume()
47 pauser.pause()
48 }
49
50 if (lastSeq !== seq) {
51 seq = lastSeq
52 return true
53 }
54 }
55}
56

Built with git-ssb-web