git ssb

2+

mixmix / ticktack



Tree: 3627ef5bb2d0af90d48c88e2897c15df3cd2dd11

Files: 3627ef5bb2d0af90d48c88e2897c15df3cd2dd11 / ftu / manageProgress.js

4897 bytesRaw
1const pull = require('pull-stream')
2const Client = require('ssb-client')
3const Path = require('path')
4const get = require('lodash/get')
5const map = require('lodash/map')
6const { resolve, watch } = require('mutant')
7const mapLimit = require('map-limit')
8
9function manageProgress ({ state, config }) {
10 const { peersLatestSequence } = require(Path.join(config.path, 'importing.json'))
11
12 Client(config.keys, config, (err, sbot) => {
13 if (err) return console.error('problem starting client', err)
14
15 console.log('> sbot running!!!!')
16
17 watchCurrentSequence({ sbot, state })
18 watchLatestSequence({ sbot, period: 5000, state })
19 watchPeersLatestSequence({ sbot, peersLatestSequence, period: 10000, state })
20
21 // pull(
22 // sbot.replicate.changes(),
23 // pull.log()
24 // )
25
26 // watch progress (db size) ??
27 // sbot.progress(console.log)
28
29 sbot.gossip.peers((err, peers) => {
30 if (err) return console.error(err)
31
32 connectToPeers({ sbot, peers, state })
33 })
34 })
35}
36
37function connectToPeers ({ sbot, peers, state }) {
38 if (peers.length > 10) {
39 const lessPeers = peers.filter(p => !p.error)
40 if (lessPeers.length > 10) peers = lessPeers
41 console.log('CONNECTING TO PEERS:', peers.length)
42 }
43
44 peers.forEach(({ host, port, key }) => {
45 if (host && port && key) {
46 sbot.gossip.connect({ host, port, key }, (err, v) => {
47 if (err) console.log('error connecting to ', host, err)
48 else console.log('connected to ', host)
49 })
50 }
51 })
52}
53
54function watchCurrentSequence ({ sbot, state }) {
55 var sink = pull.drain((msg) => {
56 let seq = get(msg, 'value.sequence', false)
57 if (seq) state.mySequence.current.set(seq)
58 })
59
60 pull(
61 sbot.createUserStream({ live: true, id: sbot.id }),
62 sink
63 )
64
65 watch(state.importComplete, importComplete => {
66 if (importComplete) sink.abort(() => console.log('stopping watchCurrentSequence'))
67 })
68}
69
70var cache = {}
71function watchLatestSequence ({ sbot, period, state }) {
72 const feedId = sbot.id
73 sbot.ebt.peerStatus(feedId, (err, data) => {
74 if (err) return setTimeout(() => watchLatestSequence({ sbot, period, state }), period)
75
76 cache = data = Object.assign({}, cache, data)
77 const currentLatest = resolve(state.mySequence.latest)
78
79 const remoteSeqs = map(data.peers, (val) => val.seq)
80 .filter(s => s >= currentLatest) // only keep remote seq that confirm or update backup seq
81 .sort((a, b) => a > b ? -1 : 1) // order them
82
83 console.log('mySequence.latest', resolve(state.mySequence.latest), remoteSeqs)
84 const newLatest = remoteSeqs[0]
85 if (newLatest && newLatest >= resolve(state.mySequence.latest)) {
86 state.mySequence.latest.set(newLatest)
87
88 // if this value is confirmed remotely twice, assume safe
89 if (remoteSeqs.filter(s => s === newLatest).length >= 2) {
90 state.mySequence.latestConfirmed.set(true)
91 }
92 }
93
94 if (resolve(state.importComplete)) return
95
96 setTimeout(() => watchLatestSequence({ sbot, period, state }), period)
97 })
98}
99
100function watchPeersLatestSequence ({ sbot, peersLatestSequence, period, state }) {
101 mapLimit(Object.keys(peersLatestSequence), 5,
102 (id, cb) => sbot.latestSequence(id, (err, seq) => {
103 if (err && err.message && err.message.indexOf('not found') > -1) {
104 return cb(null, null) // don't include this user
105 }
106 if (err) return cb(err)
107 cb(null, [id, seq])
108 }),
109 (err, data) => {
110 if (err) return setTimeout(() => watchPeersLatestSequence({ sbot, peersLatestSequence, period, state }), period)
111
112 const results = data
113 .filter(Boolean)
114 .reduce((soFar, d) => {
115 soFar.current = soFar.current + d[1]
116 soFar.latest = soFar.latest + peersLatestSequence[d[0]]
117
118 return soFar
119 }, { current: 0, latest: 0 })
120
121 state.peerSequences.set(results) // NOT WORKING yet
122
123 // const results = data
124 // .filter(Boolean)
125 // .reduce((soFar, d) => {
126 // soFar[d[0]] = {
127 // progress: d[1],
128 // total: peersLatestSequence[d[0]]
129 // }
130 // return soFar
131 // }, {})
132 // console.log('progress', results)
133
134 if (resolve(state.importComplete)) return
135
136 setTimeout(() => watchPeersLatestSequence({ sbot, peersLatestSequence, period, state }), period)
137 }
138 )
139
140 // NOTE this would be to do something with remote state of peers
141 // Object.keys(peersLatestSequence).forEach(peerId => {
142 // sbot.ebt.peerStatus(peerId, (err, data) => {
143 // if (err) return
144 // const currentLatest = peersLatestSequence[peerId]
145
146 // const remoteSeq = map(data.peers, (val) => val.seq)
147 // .filter(s => s >= currentLatest)
148 // .sort((a, b) => a > b ? -1 : 1)
149 // .shift()
150
151 // console.log(peerId, currentLatest, remoteSeq)
152 // })
153 // })
154
155}
156
157module.exports = manageProgress
158

Built with git-ssb-web