Files: 3627ef5bb2d0af90d48c88e2897c15df3cd2dd11 / ftu / manageProgress.js
4897 bytesRaw
1 | const pull = require('pull-stream') |
2 | const Client = require('ssb-client') |
3 | const Path = require('path') |
4 | const get = require('lodash/get') |
5 | const map = require('lodash/map') |
6 | const { resolve, watch } = require('mutant') |
7 | const mapLimit = require('map-limit') |
8 | |
9 | function manageProgress ({ state, config }) { |
10 | const { peersLatestSequence } = require(Path.join(config.path, 'importing.json')) |
11 | |
12 | Client(config.keys, config, (err, sbot) => { |
13 | if (err) return console.error('problem starting client', err) |
14 | |
15 | console.log('> sbot running!!!!') |
16 | |
17 | watchCurrentSequence({ sbot, state }) |
18 | watchLatestSequence({ sbot, period: 5000, state }) |
19 | watchPeersLatestSequence({ sbot, peersLatestSequence, period: 10000, state }) |
20 | |
21 | // pull( |
22 | // sbot.replicate.changes(), |
23 | // pull.log() |
24 | // ) |
25 | |
26 | // watch progress (db size) ?? |
27 | // sbot.progress(console.log) |
28 | |
29 | sbot.gossip.peers((err, peers) => { |
30 | if (err) return console.error(err) |
31 | |
32 | connectToPeers({ sbot, peers, state }) |
33 | }) |
34 | }) |
35 | } |
36 | |
37 | function connectToPeers ({ sbot, peers, state }) { |
38 | if (peers.length > 10) { |
39 | const lessPeers = peers.filter(p => !p.error) |
40 | if (lessPeers.length > 10) peers = lessPeers |
41 | console.log('CONNECTING TO PEERS:', peers.length) |
42 | } |
43 | |
44 | peers.forEach(({ host, port, key }) => { |
45 | if (host && port && key) { |
46 | sbot.gossip.connect({ host, port, key }, (err, v) => { |
47 | if (err) console.log('error connecting to ', host, err) |
48 | else console.log('connected to ', host) |
49 | }) |
50 | } |
51 | }) |
52 | } |
53 | |
54 | function watchCurrentSequence ({ sbot, state }) { |
55 | var sink = pull.drain((msg) => { |
56 | let seq = get(msg, 'value.sequence', false) |
57 | if (seq) state.mySequence.current.set(seq) |
58 | }) |
59 | |
60 | pull( |
61 | sbot.createUserStream({ live: true, id: sbot.id }), |
62 | sink |
63 | ) |
64 | |
65 | watch(state.importComplete, importComplete => { |
66 | if (importComplete) sink.abort(() => console.log('stopping watchCurrentSequence')) |
67 | }) |
68 | } |
69 | |
70 | var cache = {} |
71 | function watchLatestSequence ({ sbot, period, state }) { |
72 | const feedId = sbot.id |
73 | sbot.ebt.peerStatus(feedId, (err, data) => { |
74 | if (err) return setTimeout(() => watchLatestSequence({ sbot, period, state }), period) |
75 | |
76 | cache = data = Object.assign({}, cache, data) |
77 | const currentLatest = resolve(state.mySequence.latest) |
78 | |
79 | const remoteSeqs = map(data.peers, (val) => val.seq) |
80 | .filter(s => s >= currentLatest) // only keep remote seq that confirm or update backup seq |
81 | .sort((a, b) => a > b ? -1 : 1) // order them |
82 | |
83 | console.log('mySequence.latest', resolve(state.mySequence.latest), remoteSeqs) |
84 | const newLatest = remoteSeqs[0] |
85 | if (newLatest && newLatest >= resolve(state.mySequence.latest)) { |
86 | state.mySequence.latest.set(newLatest) |
87 | |
88 | // if this value is confirmed remotely twice, assume safe |
89 | if (remoteSeqs.filter(s => s === newLatest).length >= 2) { |
90 | state.mySequence.latestConfirmed.set(true) |
91 | } |
92 | } |
93 | |
94 | if (resolve(state.importComplete)) return |
95 | |
96 | setTimeout(() => watchLatestSequence({ sbot, period, state }), period) |
97 | }) |
98 | } |
99 | |
100 | function watchPeersLatestSequence ({ sbot, peersLatestSequence, period, state }) { |
101 | mapLimit(Object.keys(peersLatestSequence), 5, |
102 | (id, cb) => sbot.latestSequence(id, (err, seq) => { |
103 | if (err && err.message && err.message.indexOf('not found') > -1) { |
104 | return cb(null, null) // don't include this user |
105 | } |
106 | if (err) return cb(err) |
107 | cb(null, [id, seq]) |
108 | }), |
109 | (err, data) => { |
110 | if (err) return setTimeout(() => watchPeersLatestSequence({ sbot, peersLatestSequence, period, state }), period) |
111 | |
112 | const results = data |
113 | .filter(Boolean) |
114 | .reduce((soFar, d) => { |
115 | soFar.current = soFar.current + d[1] |
116 | soFar.latest = soFar.latest + peersLatestSequence[d[0]] |
117 | |
118 | return soFar |
119 | }, { current: 0, latest: 0 }) |
120 | |
121 | state.peerSequences.set(results) // NOT WORKING yet |
122 | |
123 | // const results = data |
124 | // .filter(Boolean) |
125 | // .reduce((soFar, d) => { |
126 | // soFar[d[0]] = { |
127 | // progress: d[1], |
128 | // total: peersLatestSequence[d[0]] |
129 | // } |
130 | // return soFar |
131 | // }, {}) |
132 | // console.log('progress', results) |
133 | |
134 | if (resolve(state.importComplete)) return |
135 | |
136 | setTimeout(() => watchPeersLatestSequence({ sbot, peersLatestSequence, period, state }), period) |
137 | } |
138 | ) |
139 | |
140 | // NOTE this would be to do something with remote state of peers |
141 | // Object.keys(peersLatestSequence).forEach(peerId => { |
142 | // sbot.ebt.peerStatus(peerId, (err, data) => { |
143 | // if (err) return |
144 | // const currentLatest = peersLatestSequence[peerId] |
145 | |
146 | // const remoteSeq = map(data.peers, (val) => val.seq) |
147 | // .filter(s => s >= currentLatest) |
148 | // .sort((a, b) => a > b ? -1 : 1) |
149 | // .shift() |
150 | |
151 | // console.log(peerId, currentLatest, remoteSeq) |
152 | // }) |
153 | // }) |
154 | |
155 | } |
156 | |
157 | module.exports = manageProgress |
158 |
Built with git-ssb-web