Files: 4f85a663f11b42d3b209019fcf598a738e7e43ee / ftu / observeSequence.js
2589 bytesRaw
1 | const get = require('lodash/get') |
2 | const pull = require('pull-stream') |
3 | const Client = require('ssb-client') |
4 | const { resolve } = require('mutant') |
5 | |
6 | function observeSequence ({ state }) { |
7 | const config = require('../config').create().config.sync.load() |
8 | |
9 | Client(config.keys, config, (err, ssbServer) => { |
10 | if (err) return console.error('problem starting client', err) |
11 | |
12 | console.log('> sbot running!!!!') |
13 | |
14 | ssbServer.gossip.peers((err, peers) => { |
15 | if (err) return console.error(err) |
16 | |
17 | connectToPeers(peers) |
18 | checkPeers() |
19 | }) |
20 | |
21 | // start listening to the my seq, and update the state |
22 | pull( |
23 | ssbServer.createUserStream({ live: true, id: ssbServer.id }), |
24 | pull.drain((msg) => { |
25 | let seq = get(msg, 'value.sequence', false) |
26 | if (seq) { |
27 | state.currentSequence.set(seq) |
28 | } |
29 | }) |
30 | ) |
31 | |
32 | function connectToPeers (peers) { |
33 | if (peers.length > 10) { |
34 | const lessPeers = peers.filter(p => !p.error) |
35 | if (lessPeers.length > 10) peers = lessPeers |
36 | console.log('CONNECTING TO PEERS:', peers.length) |
37 | } |
38 | |
39 | peers.forEach(({ host, port, key }) => { |
40 | if (host && port && key) { |
41 | ssbServer.gossip.connect({ host, port, key }, (err, v) => { |
42 | if (err) console.log('error connecting to ', host, err) |
43 | else console.log('connected to ', host) |
44 | }) |
45 | } |
46 | }) |
47 | } |
48 | |
49 | function checkPeers () { |
50 | ssbServer.ebt.peerStatus(ssbServer.id, (err, data) => { |
51 | if (err) { |
52 | setTimeout(checkPeers, 5000) |
53 | return |
54 | } |
55 | |
56 | const latest = resolve(state.latestSequence) |
57 | |
58 | const remoteSeqs = Object.keys(data.peers) |
59 | .map(p => data.peers[p].seq) // get my seq reported by each peer |
60 | .filter(s => s >= latest) // only keep remote seq that confirm or update backup seq |
61 | .sort((a, b) => a > b ? -1 : 1) // order them |
62 | |
63 | // console.log(remoteSeqs) |
64 | |
65 | const newLatest = remoteSeqs[0] |
66 | if (newLatest) { |
67 | state.latestSequence.set(newLatest) |
68 | |
69 | // if this value is confirmed remotely twice, assume safe |
70 | if (remoteSeqs.filter(s => s === newLatest).length >= 2) { |
71 | state.confirmedRemotely.set(true) |
72 | } |
73 | } |
74 | |
75 | var s = resolve(state) |
76 | // NOTE - this 'isDone' logic is repeated in ftu/app.js |
77 | if (s.currentSequence >= s.latestSequence && s.confirmedRemotely) return |
78 | |
79 | setTimeout(checkPeers, 5000) |
80 | }) |
81 | |
82 | ssbServer.progress(console.log) |
83 | } |
84 | }) |
85 | } |
86 | |
87 | module.exports = observeSequence |
88 |
Built with git-ssb-web