Files: 58ab0241031aa549a35cce1e678c27065ae66221 / lib / mutant-pull-value.js
1580 bytesRaw
1 | const pull = require('pull-stream') |
2 | const Abortable = require('pull-abortable') |
3 | const LazyWatcher = require('mutant/lib/lazy-watcher') |
4 | const Value = require('mutant/value') |
5 | |
6 | module.exports = createPullValue |
7 | |
8 | function createPullValue (getStream, { |
9 | nextTick = false, |
10 | idle = false, |
11 | onListen = null, |
12 | onUnlisten = null, |
13 | defaultValue = null, |
14 | sync = null |
15 | } = {}) { |
16 | const state = { getStream, onListen, onUnlisten } |
17 | const binder = LazyWatcher.call(state, update, listen, unlisten) |
18 | binder.value = defaultValue |
19 | if (nextTick) binder.nextTick = true |
20 | if (idle) binder.idle = true |
21 | state.binder = binder |
22 | const result = MutantPullValue.bind(state) |
23 | if (sync) { |
24 | state.sync = result.sync = Value(false) |
25 | } |
26 | return result |
27 | } |
28 | |
29 | function MutantPullValue (listener) { |
30 | if (!listener) { |
31 | return this.binder.getValue() |
32 | } |
33 | return this.binder.addListener(listener) |
34 | } |
35 | |
36 | function update () { |
37 | // this is only run on the item when no one is listening (use a onceTrue instead of resolve) |
38 | // since we don't have an synchronous way to check, ignore |
39 | } |
40 | |
41 | function listen () { |
42 | const abortable = Abortable() |
43 | this.abort = abortable.abort |
44 | pull( |
45 | this.getStream(), |
46 | abortable, |
47 | pull.drain((value) => { |
48 | if (this.sync && !this.sync()) this.sync.set(true) |
49 | this.binder.value = value |
50 | this.binder.broadcast() |
51 | }) |
52 | ) |
53 | if (typeof this.onListen === 'function') { |
54 | this.onListen() |
55 | } |
56 | } |
57 | |
58 | function unlisten () { |
59 | if (this.abort) { |
60 | this.abort() |
61 | this.abort = null |
62 | } |
63 | if (typeof this.onUnlisten === 'function') { |
64 | this.onUnlisten() |
65 | } |
66 | } |
67 |
Built with git-ssb-web