Files: 1786f9c4c5f7c770e215e697546f92cbd098bc2c / lib / mutant-pull-collection.js
1239 bytesRaw
1 | const pull = require('pull-stream') |
2 | const Abortable = require('pull-abortable') |
3 | const LazyWatcher = require('mutant/lib/lazy-watcher') |
4 | |
5 | module.exports = createPullCollection |
6 | |
7 | function createPullCollection (getStream, opts) { |
8 | const state = { getStream } |
9 | const binder = LazyWatcher.call(state, update, listen, unlisten) |
10 | binder.value = [] |
11 | if (opts && opts.nextTick) binder.nextTick = true |
12 | if (opts && opts.idle) binder.idle = true |
13 | state.binder = binder |
14 | return MutantPullCollection.bind(state) |
15 | } |
16 | |
17 | function MutantPullCollection (listener) { |
18 | if (!listener) { |
19 | return this.binder.getValue() |
20 | } |
21 | return this.binder.addListener(listener) |
22 | } |
23 | |
24 | function update () { |
25 | // this is only run on the item when no one is listening (use a onceTrue instead of resolve) |
26 | // since we don't have an synchronous way to check, ignore |
27 | } |
28 | |
29 | function listen () { |
30 | const abortable = Abortable() |
31 | this.abort = abortable.abort |
32 | pull( |
33 | this.getStream(this.lastValue), |
34 | abortable, |
35 | pull.drain((value) => { |
36 | if (value && value.sync) return |
37 | this.lastValue = value |
38 | this.binder.value.push(value) |
39 | this.binder.broadcast() |
40 | }) |
41 | ) |
42 | } |
43 | |
44 | function unlisten () { |
45 | if (this.abort) { |
46 | this.abort() |
47 | this.abort = null |
48 | } |
49 | } |
50 |
Built with git-ssb-web