git ssb

0+

alanz / patchwork



forked from Matt McKegg / patchwork

Tree: dbe444113f2ca34fc436778abd48372dd514941d

Files: dbe444113f2ca34fc436778abd48372dd514941d / modules / progress / obs.js

1951 bytesRaw
1var nest = require('depnest')
2var pull = require('pull-stream')
3var {Struct, Dict, Value, computed, watch} = require('mutant')
4
5exports.gives = nest({
6 'progress.obs': [
7 'global',
8 'indexes',
9 'replicate',
10 'migration',
11 'peer'
12 ]
13})
14
15exports.needs = nest({
16 'sbot.obs.connection': 'first'
17})
18
19exports.create = function (api) {
20 var syncStatus = null
21 var progress = null
22
23 return nest({
24 'progress.obs': {
25 replicate () {
26 load()
27 return syncStatus
28 },
29 peer (id) {
30 load()
31 var result = computed(syncStatus, (status) => {
32 return status.pendingPeers[id] || 0
33 })
34 return result
35 },
36 indexes () {
37 load()
38 return progress.indexes
39 },
40 migration () {
41 load()
42 return progress.migration
43 },
44 global () {
45 load()
46 return progress
47 }
48 }
49 })
50
51 function load () {
52 if (!syncStatus) {
53 syncStatus = ProgressStatus(x => x.replicate.changes(), {
54 incompleteFeeds: 0,
55 pendingPeers: Dict({}, {fixedIndexing: true}),
56 feeds: null,
57 rate: 0
58 })
59 }
60 if (!progress) {
61 progress = ProgressStatus(x => x.patchwork.progress(), {
62 indexes: Status(),
63 migration: Status()
64 })
65 }
66 }
67
68 function ProgressStatus (keyFn, attrs) {
69 var progress = Struct(attrs || {
70 pending: 0
71 })
72
73 watch(api.sbot.obs.connection, (sbot) => {
74 if (sbot) {
75 var source
76 try {
77 source = keyFn(sbot)
78 } catch (err) {
79 progress.set(err)
80 return progress
81 }
82 if (source) {
83 pull(
84 source,
85 pull.drain((event) => {
86 progress.set(event)
87 })
88 )
89 }
90 }
91 })
92
93 return progress
94 }
95}
96
97function Status () {
98 return Struct({
99 start: Value(),
100 current: Value(),
101 target: Value()
102 })
103}
104

Built with git-ssb-web