git ssb

2+

mixmix / ticktack



Tree: 5f86bdb837a971c779cc9be81fd7c8e39090a252

Files: 5f86bdb837a971c779cc9be81fd7c8e39090a252 / ftu / manageProgress.js

4838 bytesRaw
1const pull = require('pull-stream')
2const Client = require('ssb-client')
3const Path = require('path')
4const get = require('lodash/get')
5const map = require('lodash/map')
6const { resolve, watch } = require('mutant')
7const mapLimit = require('map-limit')
8
9function manageProgress ({ state, config }) {
10 const { peersLatestSequence } = require(Path.join(config.path, 'importing.json'))
11
12 Client(config.keys, config, (err, sbot) => {
13 if (err) return console.error('problem starting client', err)
14
15 console.log('> sbot running!!!!')
16
17 watchCurrentSequence({ sbot, state })
18 watchLatestSequence({ sbot, period: 5000, state })
19 watchPeersLatestSequence({ sbot, peersLatestSequence, period: 10000, state })
20
21 // pull(
22 // sbot.replicate.changes(),
23 // pull.log()
24 // )
25
26 // watch progress (db size) ??
27 // sbot.progress(console.log)
28 //
29 sbot.gossip.peers((err, peers) => {
30 if (err) return console.error(err)
31
32 connectToPeers({ sbot, peers, state })
33 })
34 })
35}
36
37function connectToPeers ({ sbot, peers, state }) {
38 if (peers.length > 10) {
39 const lessPeers = peers.filter(p => !p.error)
40 if (lessPeers.length > 10) peers = lessPeers
41 console.log('CONNECTING TO PEERS:', peers.length)
42 }
43
44 peers.forEach(({ host, port, key }) => {
45 if (host && port && key) {
46 sbot.gossip.connect({ host, port, key }, (err, v) => {
47 if (err) console.log('error connecting to ', host, err)
48 else console.log('connected to ', host)
49 })
50 }
51 })
52}
53
54function watchCurrentSequence ({ sbot, state }) {
55 var sink = pull.drain((msg) => {
56 let seq = get(msg, 'value.sequence', false)
57 if (seq) state.mySequence.current.set(seq)
58 })
59
60 pull(
61 sbot.createUserStream({ live: true, id: sbot.id }),
62 sink
63 )
64
65 watch(state.importComplete, importComplete => {
66 if (importComplete) sink.abort(() => console.log('stopping watchCurrentSequence'))
67 })
68}
69
70function watchLatestSequence ({ sbot, period, state }) {
71 const feedId = sbot.id
72 sbot.ebt.peerStatus(feedId, (err, data) => {
73 if (err) return setTimeout(() => watchLatestSequence({ sbot, period, state }), period)
74
75 const currentLatest = resolve(state.mySequence.latest)
76
77 const remoteSeqs = map(data.peers, (val) => val.seq)
78 .filter(s => s >= currentLatest) // only keep remote seq that confirm or update backup seq
79 .sort((a, b) => a > b ? -1 : 1) // order them
80
81 console.log('mySequence.latest', resolve(state.mySequence.latest), remoteSeqs)
82 const newLatest = remoteSeqs[0]
83 if (newLatest && newLatest >= resolve(state.mySequence.latest)) {
84 state.mySequence.latest.set(newLatest)
85
86 // if this value is confirmed remotely twice, assume safe
87 if (remoteSeqs.filter(s => s === newLatest).length >= 2) {
88 state.mySequence.latestConfirmed.set(true)
89 }
90 }
91
92 if (resolve(state.importComplete)) return
93
94 setTimeout(() => watchLatestSequence({ sbot, period, state }), period)
95 })
96}
97
98function watchPeersLatestSequence ({ sbot, peersLatestSequence, period, state }) {
99 mapLimit(Object.keys(peersLatestSequence), 5,
100 (id, cb) => sbot.latestSequence(id, (err, seq) => {
101 if (err && err.message && err.message.indexOf('not found') > -1) {
102 return cb(null, null) // don't include this user
103 }
104 if (err) return cb(err)
105 cb(null, [id, seq])
106 }),
107 (err, data) => {
108 if (err) return setTimeout(() => watchPeersLatestSequence({ sbot, peersLatestSequence, period, state }), period)
109
110 const results = data
111 .filter(Boolean)
112 .reduce((soFar, d) => {
113 soFar.current = soFar.current + d[1]
114 soFar.latest = soFar.latest + peersLatestSequence[d[0]]
115
116 return soFar
117 }, { current: 0, latest: 0 })
118
119 state.peerSequences.set(results) // NOT WORKING yet
120
121 // const results = data
122 // .filter(Boolean)
123 // .reduce((soFar, d) => {
124 // soFar[d[0]] = {
125 // progress: d[1],
126 // total: peersLatestSequence[d[0]]
127 // }
128 // return soFar
129 // }, {})
130 // console.log('progress', results)
131
132 if (resolve(state.importComplete)) return
133
134 setTimeout(() => watchPeersLatestSequence({ sbot, peersLatestSequence, period, state }), period)
135 }
136 )
137
138 // NOTE this would be to do something with remote state of peers
139 // Object.keys(peersLatestSequence).forEach(peerId => {
140 // sbot.ebt.peerStatus(peerId, (err, data) => {
141 // if (err) return
142 // const currentLatest = peersLatestSequence[peerId]
143
144 // const remoteSeq = map(data.peers, (val) => val.seq)
145 // .filter(s => s >= currentLatest)
146 // .sort((a, b) => a > b ? -1 : 1)
147 // .shift()
148
149 // console.log(peerId, currentLatest, remoteSeq)
150 // })
151 // })
152
153}
154
155module.exports = manageProgress
156

Built with git-ssb-web