Files: f80524e4e4d9ee8b39654228746a0afe58778971 / ftu / manageProgress.js
5448 bytesRaw
1 | const pull = require('pull-stream') |
2 | const Client = require('ssb-client') |
3 | const Path = require('path') |
4 | const get = require('lodash/get') |
5 | const map = require('lodash/map') |
6 | const { resolve, watch } = require('mutant') |
7 | const mapLimit = require('map-limit') |
8 | |
9 | function manageProgress ({ state, config }) { |
10 | const { peersLatestSequence } = require(Path.join(config.path, 'importing.json')) |
11 | |
12 | Client(config.keys, config, (err, sbot) => { |
13 | if (err) return console.error('problem starting client', err) |
14 | |
15 | console.log('> sbot running!!!!') |
16 | |
17 | watchCurrentSequence({ sbot, state }) |
18 | watchLatestSequence({ sbot, period: 5000, state }) |
19 | watchPeersLatestSequence({ sbot, peersLatestSequence, period: 10000, state }) |
20 | |
21 | // pull( |
22 | // sbot.replicate.changes(), |
23 | // pull.log() |
24 | // ) |
25 | |
26 | // watch progress (db size) ?? |
27 | // sbot.progress(console.log) |
28 | |
29 | sbot.gossip.peers((err, peers) => { |
30 | if (err) return console.error(err) |
31 | |
32 | connectToPeers({ sbot, peers, state }) |
33 | reconnectToPeers({ sbot, peers, state, period: 7000 }) |
34 | }) |
35 | }) |
36 | } |
37 | |
38 | function connectToPeers ({ sbot, peers, state }) { |
39 | if (peers.length > 10) { |
40 | const lessPeers = peers.filter(p => !p.error) |
41 | if (lessPeers.length > 10) peers = lessPeers |
42 | console.log('CONNECTING TO PEERS:', peers.length) |
43 | } |
44 | |
45 | peers.forEach(({ host, port, key }) => { |
46 | if (host && port && key) { |
47 | sbot.gossip.connect({ host, port, key }, (err, v) => { |
48 | if (err) console.log('error connecting to ', host, err) |
49 | else console.log('connected to ', host) |
50 | }) |
51 | } |
52 | }) |
53 | } |
54 | |
55 | function reconnectToPeers ({ sbot, peers, state, period }) { |
56 | sbot.status((err, data) => { |
57 | if (err) return setTimeout(() => reconnectToPeers({ sbot, peers, period, state }), period) |
58 | |
59 | if (data.gossip.length < 5) { |
60 | peers |
61 | .sort((a, b) => Math.random() > 0.5 ? -1 : 1) |
62 | .slice(0, 5) |
63 | .forEach(p => sbot.gossip.connect(p, console.log)) |
64 | } |
65 | |
66 | if (resolve(state.importComplete)) return |
67 | |
68 | setTimeout(() => reconnectToPeers({ sbot, peers, period, state }), period) |
69 | }) |
70 | } |
71 | |
72 | function watchCurrentSequence ({ sbot, state }) { |
73 | var sink = pull.drain((msg) => { |
74 | let seq = get(msg, 'value.sequence', false) |
75 | if (seq) state.mySequence.current.set(seq) |
76 | }) |
77 | |
78 | pull( |
79 | sbot.createUserStream({ live: true, id: sbot.id }), |
80 | sink |
81 | ) |
82 | |
83 | watch(state.importComplete, importComplete => { |
84 | if (importComplete) sink.abort(() => console.log('stopping watchCurrentSequence')) |
85 | }) |
86 | } |
87 | |
88 | var cache = {} |
89 | function watchLatestSequence ({ sbot, period, state }) { |
90 | const feedId = sbot.id |
91 | sbot.ebt.peerStatus(feedId, (err, data) => { |
92 | if (err) return setTimeout(() => watchLatestSequence({ sbot, period, state }), period) |
93 | |
94 | Object.assign(cache, data.peers) |
95 | const currentLatest = resolve(state.mySequence.latest) |
96 | |
97 | const remoteSeqs = map(cache, (val) => val.seq) |
98 | .filter(s => s >= currentLatest) // only keep remote seq that confirm or update backup seq |
99 | .sort((a, b) => a > b ? -1 : 1) // order them |
100 | |
101 | console.log('mySequence.latest', resolve(state.mySequence.latest), remoteSeqs) |
102 | const newLatest = remoteSeqs[0] |
103 | if (newLatest && newLatest >= resolve(state.mySequence.latest)) { |
104 | state.mySequence.latest.set(newLatest) |
105 | |
106 | // if this value is confirmed remotely twice, assume safe |
107 | if (remoteSeqs.filter(s => s === newLatest).length >= 2) { |
108 | state.mySequence.latestConfirmed.set(true) |
109 | } |
110 | } |
111 | |
112 | if (resolve(state.importComplete)) return |
113 | |
114 | setTimeout(() => watchLatestSequence({ sbot, period, state }), period) |
115 | }) |
116 | } |
117 | |
118 | function watchPeersLatestSequence ({ sbot, peersLatestSequence, period, state }) { |
119 | mapLimit(Object.keys(peersLatestSequence), 5, |
120 | (id, cb) => sbot.latestSequence(id, (err, seq) => { |
121 | if (err && err.message && err.message.indexOf('not found') > -1) { |
122 | return cb(null, null) // don't include this user |
123 | } |
124 | if (err) return cb(err) |
125 | cb(null, [id, seq]) |
126 | }), |
127 | (err, data) => { |
128 | if (err) return setTimeout(() => watchPeersLatestSequence({ sbot, peersLatestSequence, period, state }), period) |
129 | |
130 | const results = data |
131 | .filter(Boolean) |
132 | .reduce((soFar, d) => { |
133 | soFar.current = soFar.current + d[1] |
134 | soFar.latest = soFar.latest + peersLatestSequence[d[0]] |
135 | |
136 | return soFar |
137 | }, { current: 0, latest: 0 }) |
138 | |
139 | state.peerSequences.set(results) // NOT WORKING yet |
140 | |
141 | // const results = data |
142 | // .filter(Boolean) |
143 | // .reduce((soFar, d) => { |
144 | // soFar[d[0]] = { |
145 | // progress: d[1], |
146 | // total: peersLatestSequence[d[0]] |
147 | // } |
148 | // return soFar |
149 | // }, {}) |
150 | // console.log('progress', results) |
151 | |
152 | if (resolve(state.importComplete)) return |
153 | |
154 | setTimeout(() => watchPeersLatestSequence({ sbot, peersLatestSequence, period, state }), period) |
155 | } |
156 | ) |
157 | |
158 | // NOTE this would be to do something with remote state of peers |
159 | // Object.keys(peersLatestSequence).forEach(peerId => { |
160 | // sbot.ebt.peerStatus(peerId, (err, data) => { |
161 | // if (err) return |
162 | // const currentLatest = peersLatestSequence[peerId] |
163 | |
164 | // const remoteSeq = map(data.peers, (val) => val.seq) |
165 | // .filter(s => s >= currentLatest) |
166 | // .sort((a, b) => a > b ? -1 : 1) |
167 | // .shift() |
168 | |
169 | // console.log(peerId, currentLatest, remoteSeq) |
170 | // }) |
171 | // }) |
172 | } |
173 | |
174 | module.exports = manageProgress |
175 |
Built with git-ssb-web