git ssb

0+

alanz / patchwork



forked from Matt McKegg / patchwork

Tree: 52922a4cd9a0eb4dfa4e61033be7a2fb037818ac

Files: 52922a4cd9a0eb4dfa4e61033be7a2fb037818ac / modules / progress / obs.js

2072 bytesRaw
1var nest = require('depnest')
2var pull = require('pull-stream')
3var {Struct, Dict, Value, computed, watch, onceTrue} = require('mutant')
4
5exports.gives = nest({
6 'progress.obs': [
7 'global',
8 'indexes',
9 'replicate',
10 'migration',
11 'peer'
12 ]
13})
14
15exports.needs = nest({
16 'sbot.obs.connection': 'first'
17})
18
19exports.create = function (api) {
20 var syncStatus = null
21 var progress = null
22
23 setInterval(() => {
24 onceTrue(api.sbot.obs.connection(), sbot => {
25 sbot.progress
26 })
27 }, 1000)
28
29 return nest({
30 'progress.obs': {
31 replicate () {
32 load()
33 return syncStatus
34 },
35 peer (id) {
36 load()
37 var result = computed(syncStatus, (status) => {
38 return status.pendingPeers[id] || 0
39 })
40 return result
41 },
42 indexes () {
43 load()
44 return progress.indexes
45 },
46 migration () {
47 load()
48 return progress.migration
49 },
50 global () {
51 load()
52 return progress
53 }
54 }
55 })
56
57 function load () {
58 if (!syncStatus) {
59 syncStatus = ProgressStatus(x => x.replicate.changes(), {
60 incompleteFeeds: 0,
61 pendingPeers: Dict({}, {fixedIndexing: true}),
62 feeds: null,
63 rate: 0
64 })
65 }
66 if (!progress) {
67 progress = ProgressStatus(x => x.patchwork.progress(), {
68 indexes: Status(),
69 migration: Status()
70 })
71 }
72 }
73
74 function ProgressStatus (keyFn, attrs) {
75 var progress = Struct(attrs || {
76 pending: 0
77 })
78
79 watch(api.sbot.obs.connection, (sbot) => {
80 if (sbot) {
81 var source
82 try {
83 source = keyFn(sbot)
84 } catch (err) {
85 progress.set(err)
86 return progress
87 }
88 if (source) {
89 pull(
90 source,
91 pull.drain((event) => {
92 progress.set(event)
93 })
94 )
95 }
96 }
97 })
98
99 return progress
100 }
101}
102
103function Status () {
104 return Struct({
105 start: Value(),
106 current: Value(),
107 target: Value()
108 })
109}
110

Built with git-ssb-web