Files: 3d4eca03f309655578710ab4453f309c514268d1 / ftu / observeSequence.js
2344 bytesRaw
1 | const get = require('lodash/get') |
2 | const pull = require('pull-stream') |
3 | const Client = require('ssb-client') |
4 | const { resolve } = require('mutant') |
5 | |
6 | function observeSequence ({ state, timeout }) { |
7 | const config = require('../config').create().config.sync.load() |
8 | |
9 | Client(config.keys, config, (err, ssbServer) => { |
10 | if (err) return console.error('problem starting client', err) |
11 | |
12 | console.log('> sbot running!!!!') |
13 | |
14 | ssbServer.gossip.peers((err, peers) => { |
15 | if (err) return console.error(err) |
16 | |
17 | connectToPeers(peers) |
18 | checkPeers() |
19 | }) |
20 | |
21 | // start listening to the my seq, and update the state |
22 | pull( |
23 | ssbServer.createUserStream({ live: true, id: ssbServer.id }), |
24 | pull.drain((msg) => { |
25 | let seq = get(msg, 'value.sequence', false) |
26 | if (seq) { |
27 | state.currentSequence.set(seq) |
28 | } |
29 | }) |
30 | ) |
31 | |
32 | function connectToPeers (peers) { |
33 | if (peers.length > 10) { |
34 | const lessPeers = peers.filter(p => !p.error) |
35 | if (lessPeers.length > 10) peers = lessPeers |
36 | } |
37 | |
38 | peers.forEach(({ host, port, key }) => { |
39 | if (host && port && key) { |
40 | ssbServer.gossip.connect({ host, port, key }, (err, v) => { |
41 | if (err) console.log('error connecting to ', host, err) |
42 | else console.log('connected to ', host) |
43 | }) |
44 | } |
45 | }) |
46 | } |
47 | |
48 | function checkPeers () { |
49 | ssbServer.ebt.peerStatus(ssbServer.id, (err, data) => { |
50 | if (err) { |
51 | timeout = setTimeout(checkPeers, 5000) |
52 | return |
53 | } |
54 | |
55 | const latest = resolve(state.latestSequence) |
56 | |
57 | const remoteSeqs = Object.keys(data.peers) |
58 | .map(p => data.peers[p].seq) // get my seq reported by each peer |
59 | .filter(s => s >= latest) // only keep remote seq that confirm or update backup seq |
60 | .sort((a, b) => a > b ? -1 : 1) // order them |
61 | |
62 | // console.log(remoteSeqs) |
63 | |
64 | const newLatest = remoteSeqs[0] |
65 | if (newLatest) { |
66 | state.latestSequence.set(newLatest) |
67 | |
68 | // if this value is confirmed remotely twice, assume safe |
69 | if (remoteSeqs.filter(s => s === newLatest).length >= 2) { |
70 | state.confirmedRemotely.set(true) |
71 | } |
72 | } |
73 | |
74 | timeout = setTimeout(checkPeers, 5000) |
75 | }) |
76 | } |
77 | }) |
78 | } |
79 | |
80 | module.exports = observeSequence |
81 |
Built with git-ssb-web