Files: 5f86bdb837a971c779cc9be81fd7c8e39090a252 / ftu / manageProgress.js
4838 bytesRaw
1 | const pull = require('pull-stream') |
2 | const Client = require('ssb-client') |
3 | const Path = require('path') |
4 | const get = require('lodash/get') |
5 | const map = require('lodash/map') |
6 | const { resolve, watch } = require('mutant') |
7 | const mapLimit = require('map-limit') |
8 | |
9 | function manageProgress ({ state, config }) { |
10 | const { peersLatestSequence } = require(Path.join(config.path, 'importing.json')) |
11 | |
12 | Client(config.keys, config, (err, sbot) => { |
13 | if (err) return console.error('problem starting client', err) |
14 | |
15 | console.log('> sbot running!!!!') |
16 | |
17 | watchCurrentSequence({ sbot, state }) |
18 | watchLatestSequence({ sbot, period: 5000, state }) |
19 | watchPeersLatestSequence({ sbot, peersLatestSequence, period: 10000, state }) |
20 | |
21 | // pull( |
22 | // sbot.replicate.changes(), |
23 | // pull.log() |
24 | // ) |
25 | |
26 | // watch progress (db size) ?? |
27 | // sbot.progress(console.log) |
28 | // |
29 | sbot.gossip.peers((err, peers) => { |
30 | if (err) return console.error(err) |
31 | |
32 | connectToPeers({ sbot, peers, state }) |
33 | }) |
34 | }) |
35 | } |
36 | |
37 | function connectToPeers ({ sbot, peers, state }) { |
38 | if (peers.length > 10) { |
39 | const lessPeers = peers.filter(p => !p.error) |
40 | if (lessPeers.length > 10) peers = lessPeers |
41 | console.log('CONNECTING TO PEERS:', peers.length) |
42 | } |
43 | |
44 | peers.forEach(({ host, port, key }) => { |
45 | if (host && port && key) { |
46 | sbot.gossip.connect({ host, port, key }, (err, v) => { |
47 | if (err) console.log('error connecting to ', host, err) |
48 | else console.log('connected to ', host) |
49 | }) |
50 | } |
51 | }) |
52 | } |
53 | |
54 | function watchCurrentSequence ({ sbot, state }) { |
55 | var sink = pull.drain((msg) => { |
56 | let seq = get(msg, 'value.sequence', false) |
57 | if (seq) state.mySequence.current.set(seq) |
58 | }) |
59 | |
60 | pull( |
61 | sbot.createUserStream({ live: true, id: sbot.id }), |
62 | sink |
63 | ) |
64 | |
65 | watch(state.importComplete, importComplete => { |
66 | if (importComplete) sink.abort(() => console.log('stopping watchCurrentSequence')) |
67 | }) |
68 | } |
69 | |
70 | function watchLatestSequence ({ sbot, period, state }) { |
71 | const feedId = sbot.id |
72 | sbot.ebt.peerStatus(feedId, (err, data) => { |
73 | if (err) return setTimeout(() => watchLatestSequence({ sbot, period, state }), period) |
74 | |
75 | const currentLatest = resolve(state.mySequence.latest) |
76 | |
77 | const remoteSeqs = map(data.peers, (val) => val.seq) |
78 | .filter(s => s >= currentLatest) // only keep remote seq that confirm or update backup seq |
79 | .sort((a, b) => a > b ? -1 : 1) // order them |
80 | |
81 | console.log('mySequence.latest', resolve(state.mySequence.latest), remoteSeqs) |
82 | const newLatest = remoteSeqs[0] |
83 | if (newLatest && newLatest >= resolve(state.mySequence.latest)) { |
84 | state.mySequence.latest.set(newLatest) |
85 | |
86 | // if this value is confirmed remotely twice, assume safe |
87 | if (remoteSeqs.filter(s => s === newLatest).length >= 2) { |
88 | state.mySequence.latestConfirmed.set(true) |
89 | } |
90 | } |
91 | |
92 | if (resolve(state.importComplete)) return |
93 | |
94 | setTimeout(() => watchLatestSequence({ sbot, period, state }), period) |
95 | }) |
96 | } |
97 | |
98 | function watchPeersLatestSequence ({ sbot, peersLatestSequence, period, state }) { |
99 | mapLimit(Object.keys(peersLatestSequence), 5, |
100 | (id, cb) => sbot.latestSequence(id, (err, seq) => { |
101 | if (err && err.message && err.message.indexOf('not found') > -1) { |
102 | return cb(null, null) // don't include this user |
103 | } |
104 | if (err) return cb(err) |
105 | cb(null, [id, seq]) |
106 | }), |
107 | (err, data) => { |
108 | if (err) return setTimeout(() => watchPeersLatestSequence({ sbot, peersLatestSequence, period, state }), period) |
109 | |
110 | const results = data |
111 | .filter(Boolean) |
112 | .reduce((soFar, d) => { |
113 | soFar.current = soFar.current + d[1] |
114 | soFar.latest = soFar.latest + peersLatestSequence[d[0]] |
115 | |
116 | return soFar |
117 | }, { current: 0, latest: 0 }) |
118 | |
119 | state.peerSequences.set(results) // NOT WORKING yet |
120 | |
121 | // const results = data |
122 | // .filter(Boolean) |
123 | // .reduce((soFar, d) => { |
124 | // soFar[d[0]] = { |
125 | // progress: d[1], |
126 | // total: peersLatestSequence[d[0]] |
127 | // } |
128 | // return soFar |
129 | // }, {}) |
130 | // console.log('progress', results) |
131 | |
132 | if (resolve(state.importComplete)) return |
133 | |
134 | setTimeout(() => watchPeersLatestSequence({ sbot, peersLatestSequence, period, state }), period) |
135 | } |
136 | ) |
137 | |
138 | // NOTE this would be to do something with remote state of peers |
139 | // Object.keys(peersLatestSequence).forEach(peerId => { |
140 | // sbot.ebt.peerStatus(peerId, (err, data) => { |
141 | // if (err) return |
142 | // const currentLatest = peersLatestSequence[peerId] |
143 | |
144 | // const remoteSeq = map(data.peers, (val) => val.seq) |
145 | // .filter(s => s >= currentLatest) |
146 | // .sort((a, b) => a > b ? -1 : 1) |
147 | // .shift() |
148 | |
149 | // console.log(peerId, currentLatest, remoteSeq) |
150 | // }) |
151 | // }) |
152 | |
153 | } |
154 | |
155 | module.exports = manageProgress |
156 |
Built with git-ssb-web