Files: 7d39f00f5d8cc46d121b4197b45fa1dc145ec3da / modules / progress / obs.js
2073 bytesRaw
1 | var nest = require('depnest') |
2 | var pull = require('pull-stream') |
3 | var {Struct, Dict, Value, computed, watch, onceTrue} = require('mutant') |
4 | |
5 | exports.gives = nest({ |
6 | 'progress.obs': [ |
7 | 'global', |
8 | 'indexes', |
9 | 'replicate', |
10 | 'migration', |
11 | 'peer' |
12 | ] |
13 | }) |
14 | |
15 | exports.needs = nest({ |
16 | 'sbot.obs.connection': 'first' |
17 | }) |
18 | |
19 | exports.create = function (api) { |
20 | var syncStatus = null |
21 | var progress = null |
22 | |
23 | setInterval(() => { |
24 | onceTrue(api.sbot.obs.connection(), sbot => { |
25 | sbot.progress |
26 | }) |
27 | }, 1000) |
28 | |
29 | return nest({ |
30 | 'progress.obs': { |
31 | replicate () { |
32 | load() |
33 | return syncStatus |
34 | }, |
35 | peer (id) { |
36 | load() |
37 | var result = computed(syncStatus, (status) => { |
38 | return status.pendingPeers[id] || 0 |
39 | }) |
40 | return result |
41 | }, |
42 | indexes () { |
43 | load() |
44 | return progress.indexes |
45 | }, |
46 | migration () { |
47 | load() |
48 | return progress.migration |
49 | }, |
50 | global () { |
51 | load() |
52 | return progress |
53 | } |
54 | } |
55 | }) |
56 | |
57 | function load () { |
58 | if (!syncStatus) { |
59 | syncStatus = ProgressStatus(x => x.replicate.changes(), { |
60 | incompleteFeeds: 0, |
61 | pendingPeers: Dict({}, {fixedIndexing: true}), |
62 | feeds: null, |
63 | rate: 0 |
64 | }) |
65 | } |
66 | if (!progress) { |
67 | progress = ProgressStatus(x => x.progressStream.read(), { |
68 | indexes: Status(), |
69 | migration: Status() |
70 | }) |
71 | } |
72 | } |
73 | |
74 | function ProgressStatus (keyFn, attrs) { |
75 | var progress = Struct(attrs || { |
76 | pending: 0 |
77 | }) |
78 | |
79 | watch(api.sbot.obs.connection, (sbot) => { |
80 | if (sbot) { |
81 | var source |
82 | try { |
83 | source = keyFn(sbot) |
84 | } catch (err) { |
85 | progress.set(err) |
86 | return progress |
87 | } |
88 | if (source) { |
89 | pull( |
90 | source, |
91 | pull.drain((event) => { |
92 | progress.set(event) |
93 | }) |
94 | ) |
95 | } |
96 | } |
97 | }) |
98 | |
99 | return progress |
100 | } |
101 | } |
102 | |
103 | function Status () { |
104 | return Struct({ |
105 | start: Value(), |
106 | current: Value(), |
107 | target: Value() |
108 | }) |
109 | } |
110 |
Built with git-ssb-web