Files: 770fe5e2a3d5973a5c9b7d0945b214c3b2ea6793 / lib / depject / progress / obs.js
2229 bytesRaw
1 | const nest = require('depnest') |
2 | const pull = require('pull-stream') |
3 | const { Struct, Dict, Value, computed, watch } = require('mutant') |
4 | |
5 | exports.gives = nest({ |
6 | 'progress.obs': [ |
7 | 'global', |
8 | 'indexes', |
9 | 'plugins', |
10 | 'replicate', |
11 | 'migration', |
12 | 'peer' |
13 | ] |
14 | }) |
15 | |
16 | exports.needs = nest({ |
17 | 'sbot.obs.connection': 'first' |
18 | }) |
19 | |
20 | exports.create = function (api) { |
21 | let syncStatus = null |
22 | let progress = null |
23 | let pluginProgress = null |
24 | |
25 | return nest({ |
26 | 'progress.obs': { |
27 | replicate () { |
28 | load() |
29 | return syncStatus |
30 | }, |
31 | peer (id) { |
32 | load() |
33 | const result = computed(syncStatus, (status) => { |
34 | return status.pendingPeers[id] || 0 |
35 | }) |
36 | return result |
37 | }, |
38 | indexes () { |
39 | load() |
40 | return progress.indexes |
41 | }, |
42 | plugins () { |
43 | load() |
44 | return pluginProgress.plugins |
45 | }, |
46 | migration () { |
47 | load() |
48 | return progress.migration |
49 | }, |
50 | global () { |
51 | load() |
52 | return progress |
53 | } |
54 | } |
55 | }) |
56 | |
57 | function load () { |
58 | if (!syncStatus) { |
59 | syncStatus = ProgressStatus(x => x.replicate.changes(), { |
60 | incompleteFeeds: 0, |
61 | pendingPeers: Dict({}, { fixedIndexing: true }), |
62 | feeds: null, |
63 | rate: 0 |
64 | }) |
65 | } |
66 | if (!progress) { |
67 | progress = ProgressStatus(x => x.patchwork.progress(), { |
68 | indexes: Status(), |
69 | migration: Status() |
70 | }) |
71 | } |
72 | if (!pluginProgress) { |
73 | pluginProgress = ProgressStatus(x => x.patchwork.progress(), { |
74 | plugins: Struct({}), |
75 | }) |
76 | } |
77 | } |
78 | |
79 | function ProgressStatus (keyFn, attrs) { |
80 | const progress = Struct(attrs || { |
81 | pending: 0 |
82 | }) |
83 | |
84 | watch(api.sbot.obs.connection, (sbot) => { |
85 | if (sbot) { |
86 | let source |
87 | try { |
88 | source = keyFn(sbot) |
89 | } catch (err) { |
90 | progress.set(err) |
91 | return progress |
92 | } |
93 | if (source) { |
94 | pull( |
95 | source, |
96 | pull.drain((event) => { |
97 | progress.set(event) |
98 | }) |
99 | ) |
100 | } |
101 | } |
102 | }) |
103 | |
104 | return progress |
105 | } |
106 | } |
107 | |
108 | function Status () { |
109 | return Struct({ |
110 | start: Value(), |
111 | current: Value(), |
112 | target: Value() |
113 | }) |
114 | } |
115 |
Built with git-ssb-web