git ssb

2+

mixmix / ticktack



Tree: f80524e4e4d9ee8b39654228746a0afe58778971

Files: f80524e4e4d9ee8b39654228746a0afe58778971 / ftu / manageProgress.js

5448 bytesRaw
1const pull = require('pull-stream')
2const Client = require('ssb-client')
3const Path = require('path')
4const get = require('lodash/get')
5const map = require('lodash/map')
6const { resolve, watch } = require('mutant')
7const mapLimit = require('map-limit')
8
9function manageProgress ({ state, config }) {
10 const { peersLatestSequence } = require(Path.join(config.path, 'importing.json'))
11
12 Client(config.keys, config, (err, sbot) => {
13 if (err) return console.error('problem starting client', err)
14
15 console.log('> sbot running!!!!')
16
17 watchCurrentSequence({ sbot, state })
18 watchLatestSequence({ sbot, period: 5000, state })
19 watchPeersLatestSequence({ sbot, peersLatestSequence, period: 10000, state })
20
21 // pull(
22 // sbot.replicate.changes(),
23 // pull.log()
24 // )
25
26 // watch progress (db size) ??
27 // sbot.progress(console.log)
28
29 sbot.gossip.peers((err, peers) => {
30 if (err) return console.error(err)
31
32 connectToPeers({ sbot, peers, state })
33 reconnectToPeers({ sbot, peers, state, period: 7000 })
34 })
35 })
36}
37
38function connectToPeers ({ sbot, peers, state }) {
39 if (peers.length > 10) {
40 const lessPeers = peers.filter(p => !p.error)
41 if (lessPeers.length > 10) peers = lessPeers
42 console.log('CONNECTING TO PEERS:', peers.length)
43 }
44
45 peers.forEach(({ host, port, key }) => {
46 if (host && port && key) {
47 sbot.gossip.connect({ host, port, key }, (err, v) => {
48 if (err) console.log('error connecting to ', host, err)
49 else console.log('connected to ', host)
50 })
51 }
52 })
53}
54
55function reconnectToPeers ({ sbot, peers, state, period }) {
56 sbot.status((err, data) => {
57 if (err) return setTimeout(() => reconnectToPeers({ sbot, peers, period, state }), period)
58
59 if (data.gossip.length < 5) {
60 peers
61 .sort((a, b) => Math.random() > 0.5 ? -1 : 1)
62 .slice(0, 5)
63 .forEach(p => sbot.gossip.connect(p, console.log))
64 }
65
66 if (resolve(state.importComplete)) return
67
68 setTimeout(() => reconnectToPeers({ sbot, peers, period, state }), period)
69 })
70}
71
72function watchCurrentSequence ({ sbot, state }) {
73 var sink = pull.drain((msg) => {
74 let seq = get(msg, 'value.sequence', false)
75 if (seq) state.mySequence.current.set(seq)
76 })
77
78 pull(
79 sbot.createUserStream({ live: true, id: sbot.id }),
80 sink
81 )
82
83 watch(state.importComplete, importComplete => {
84 if (importComplete) sink.abort(() => console.log('stopping watchCurrentSequence'))
85 })
86}
87
88var cache = {}
89function watchLatestSequence ({ sbot, period, state }) {
90 const feedId = sbot.id
91 sbot.ebt.peerStatus(feedId, (err, data) => {
92 if (err) return setTimeout(() => watchLatestSequence({ sbot, period, state }), period)
93
94 Object.assign(cache, data.peers)
95 const currentLatest = resolve(state.mySequence.latest)
96
97 const remoteSeqs = map(cache, (val) => val.seq)
98 .filter(s => s >= currentLatest) // only keep remote seq that confirm or update backup seq
99 .sort((a, b) => a > b ? -1 : 1) // order them
100
101 console.log('mySequence.latest', resolve(state.mySequence.latest), remoteSeqs)
102 const newLatest = remoteSeqs[0]
103 if (newLatest && newLatest >= resolve(state.mySequence.latest)) {
104 state.mySequence.latest.set(newLatest)
105
106 // if this value is confirmed remotely twice, assume safe
107 if (remoteSeqs.filter(s => s === newLatest).length >= 2) {
108 state.mySequence.latestConfirmed.set(true)
109 }
110 }
111
112 if (resolve(state.importComplete)) return
113
114 setTimeout(() => watchLatestSequence({ sbot, period, state }), period)
115 })
116}
117
118function watchPeersLatestSequence ({ sbot, peersLatestSequence, period, state }) {
119 mapLimit(Object.keys(peersLatestSequence), 5,
120 (id, cb) => sbot.latestSequence(id, (err, seq) => {
121 if (err && err.message && err.message.indexOf('not found') > -1) {
122 return cb(null, null) // don't include this user
123 }
124 if (err) return cb(err)
125 cb(null, [id, seq])
126 }),
127 (err, data) => {
128 if (err) return setTimeout(() => watchPeersLatestSequence({ sbot, peersLatestSequence, period, state }), period)
129
130 const results = data
131 .filter(Boolean)
132 .reduce((soFar, d) => {
133 soFar.current = soFar.current + d[1]
134 soFar.latest = soFar.latest + peersLatestSequence[d[0]]
135
136 return soFar
137 }, { current: 0, latest: 0 })
138
139 state.peerSequences.set(results) // NOT WORKING yet
140
141 // const results = data
142 // .filter(Boolean)
143 // .reduce((soFar, d) => {
144 // soFar[d[0]] = {
145 // progress: d[1],
146 // total: peersLatestSequence[d[0]]
147 // }
148 // return soFar
149 // }, {})
150 // console.log('progress', results)
151
152 if (resolve(state.importComplete)) return
153
154 setTimeout(() => watchPeersLatestSequence({ sbot, peersLatestSequence, period, state }), period)
155 }
156 )
157
158 // NOTE this would be to do something with remote state of peers
159 // Object.keys(peersLatestSequence).forEach(peerId => {
160 // sbot.ebt.peerStatus(peerId, (err, data) => {
161 // if (err) return
162 // const currentLatest = peersLatestSequence[peerId]
163
164 // const remoteSeq = map(data.peers, (val) => val.seq)
165 // .filter(s => s >= currentLatest)
166 // .sort((a, b) => a > b ? -1 : 1)
167 // .shift()
168
169 // console.log(peerId, currentLatest, remoteSeq)
170 // })
171 // })
172}
173
174module.exports = manageProgress
175

Built with git-ssb-web