git ssb

0+

alanz / patchwork



forked from Matt McKegg / patchwork

Commit 933c5c308e2f9b5a63a0982d7a28e9ced1f3f9e2

workaround: don't connect to all pubs at once, only followed at first

Matt McKegg committed on 10/27/2016, 1:43:53 PM
Parent: 572440feaf959755763efb726087066a6f5b29db

Files changed

lib/ssb-server.jschanged
lib/gossip-with-slow-rollout/index.jsadded
lib/gossip-with-slow-rollout/init.jsadded
lib/gossip-with-slow-rollout/schedule.jsadded
lib/ssb-server.jsView
@@ -1,7 +1,7 @@
11 module.exports = require('scuttlebot')
22 .use(require('scuttlebot/plugins/master'))
3- .use(require('scuttlebot/plugins/gossip'))
3+ .use(require('./gossip-with-slow-rollout'))
44 .use(require('scuttlebot/plugins/friends'))
55 .use(require('scuttlebot/plugins/replicate'))
66 .use(require('ssb-blobs'))
77 .use(require('scuttlebot/plugins/invite'))
lib/gossip-with-slow-rollout/index.jsView
@@ -1,0 +1,212 @@
1+'use strict'
2+var pull = require('pull-stream')
3+var Notify = require('pull-notify')
4+var mdm = require('mdmanifest')
5+var valid = require('scuttlebot/lib/validators')
6+var apidoc = require('scuttlebot/lib/apidocs').gossip
7+var u = require('scuttlebot/lib/util')
8+var ref = require('ssb-ref')
9+var ping = require('pull-ping')
10+var Stats = require('statistics')
11+var Schedule = require('./schedule')
12+var Init = require('./init')
13+
14+function isFunction (f) {
15+ return 'function' === typeof f
16+}
17+
18+function stringify(peer) {
19+ return [peer.host, peer.port, peer.key].join(':')
20+}
21+
22+/*
23+Peers : [{
24+ key: id,
25+ host: ip,
26+ port: int,
27+ //to be backwards compatible with patchwork...
28+ announcers: {length: int}
29+ source: 'pub'|'manual'|'local'
30+}]
31+*/
32+
33+
34+module.exports = {
35+ name: 'gossip',
36+ version: '1.0.0',
37+ manifest: mdm.manifest(apidoc),
38+ permissions: {
39+ anonymous: {allow: ['ping']}
40+ },
41+ init: function (server, config) {
42+ var notify = Notify()
43+ var conf = config.gossip || {}
44+ var home = ref.parseAddress(server.getAddress())
45+
46+ //Known Peers
47+ var peers = []
48+
49+ function getPeer(id) {
50+ return u.find(peers, function (e) {
51+ return e && e.key === id
52+ })
53+ }
54+
55+ var timer_ping = 5*6e4
56+
57+ var gossip = {
58+ wakeup: 0,
59+ peers: function () {
60+ return peers
61+ },
62+ get: function (addr) {
63+ addr = ref.parseAddress(addr)
64+ return u.find(peers, function (a) {
65+ return (
66+ addr.port === a.port
67+ && addr.host === a.host
68+ && addr.key === a.key
69+ )
70+ })
71+ },
72+ connect: valid.async(function (addr, cb) {
73+ addr = ref.parseAddress(addr)
74+ if (!addr || typeof addr != 'object')
75+ return cb(new Error('first param must be an address'))
76+
77+ if(!addr.key) return cb(new Error('address must have ed25519 key'))
78+ // add peer to the table, incase it isn't already.
79+ gossip.add(addr, 'manual')
80+ var p = gossip.get(addr)
81+ if(!p) return cb()
82+
83+ p.stateChange = Date.now()
84+ p.state = 'connecting'
85+ server.connect(p, function (err, rpc) {
86+ if (err) {
87+ p.state = undefined
88+ p.failure = (p.failure || 0) + 1
89+ p.stateChange = Date.now()
90+ notify({ type: 'connect-failure', peer: p })
91+ server.emit('log:info', ['SBOT', p.host+':'+p.port+p.key, 'connection failed', err.message || err])
92+ p.duration.value(0)
93+ return (cb && cb(err))
94+ }
95+ else {
96+ p.state = 'connected'
97+ p.failure = 0
98+ }
99+ cb && cb(null, rpc)
100+ })
101+
102+ }, 'string|object'),
103+
104+ disconnect: valid.async(function (addr, cb) {
105+ var peer = this.get(addr)
106+
107+ peer.state = 'disconnecting'
108+ peer.stateChange = Date.now()
109+ if(!peer || !peer.disconnect) cb && cb()
110+ else peer.disconnect(true, function (err) {
111+ peer.stateChange = Date.now()
112+ })
113+
114+ }, 'string|object'),
115+
116+ changes: function () {
117+ return notify.listen()
118+ },
119+ //add an address to the peer table.
120+ add: valid.sync(function (addr, source) {
121+ addr = ref.parseAddress(addr)
122+ if(!ref.isAddress(addr))
123+ throw new Error('not a valid address:' + JSON.stringify(addr))
124+ // check that this is a valid address, and not pointing at self.
125+
126+ if(addr.key === home.key) return
127+ if(addr.host === home.host && addr.port === home.port) return
128+
129+ var f = gossip.get(addr)
130+
131+ if(!f) {
132+ // new peer
133+ addr.source = source
134+ addr.announcers = 1
135+ addr.duration = Stats()
136+ peers.push(addr)
137+ notify({ type: 'discover', peer: addr, source: source || 'manual' })
138+ return addr
139+ }
140+ //don't count local over and over
141+ else if(f.source != 'local')
142+ f.announcers ++
143+
144+ return f
145+ }, 'string|object', 'string?'),
146+ ping: function (opts) {
147+ var timeout = config.timers && config.timers.ping || 5*60e3
148+ //between 10 seconds and 30 minutes, default 5 min
149+ timeout = Math.max(10e3, Math.min(timeout, 30*60e3))
150+ return ping({timeout: timeout})
151+ },
152+ reconnect: function () {
153+ for(var id in server.peers)
154+ if(id !== server.id) //don't disconnect local client
155+ server.peers[id].forEach(function (peer) {
156+ peer.close(true)
157+ })
158+ return gossip.wakeup = Date.now()
159+ }
160+ }
161+
162+ Schedule (gossip, config, server)
163+ Init (gossip, config, server)
164+ //get current state
165+
166+ server.on('rpc:connect', function (rpc, isClient) {
167+ var peer = getPeer(rpc.id)
168+ //don't track clients that connect, but arn't considered peers.
169+ //maybe we should though?
170+ if(!peer) return
171+ console.log('Connected', stringify(peer))
172+ //means that we have created this connection, not received it.
173+ peer.client = !!isClient
174+ peer.state = 'connected'
175+ peer.stateChange = Date.now()
176+ peer.disconnect = function (err, cb) {
177+ if(isFunction(err)) cb = err, err = null
178+ rpc.close(err, cb)
179+ }
180+
181+ if(isClient) {
182+ //default ping is 5 minutes...
183+ var pp = ping({serve: true, timeout: timer_ping}, function (_) {})
184+ peer.ping = {rtt: pp.rtt, skew: pp.skew}
185+ pull(
186+ pp,
187+ rpc.gossip.ping({timeout: timer_ping}, function (err) {
188+ if(err.name === 'TypeError') peer.ping.fail = true
189+ }),
190+ pp
191+ )
192+ }
193+
194+ rpc.on('closed', function () {
195+ console.log('Disconnected', stringify(peer))
196+ //track whether we have successfully connected.
197+ //or how many failures there have been.
198+ var since = peer.stateChange
199+ peer.stateChange = Date.now()
200+ if(peer.state === 'connected') //may be "disconnecting"
201+ peer.duration.value(peer.stateChange - since)
202+ peer.state = undefined
203+ notify({ type: 'disconnect', peer: peer })
204+ server.emit('log:info', ['SBOT', rpc.id, 'disconnect'])
205+ })
206+
207+ notify({ type: 'connect', peer: peer })
208+ })
209+
210+ return gossip
211+ }
212+}
lib/gossip-with-slow-rollout/init.jsView
@@ -1,0 +1,32 @@
1+var isArray = Array.isArray
2+var pull = require('pull-stream')
3+var ref = require('ssb-ref')
4+
5+module.exports = function (gossip, config, server) {
6+
7+ // populate peertable with configured seeds (mainly used in testing)
8+ var seeds = config.seeds
9+
10+ ;(isArray(seeds) ? seeds : [seeds]).filter(Boolean)
11+ .forEach(function (addr) { gossip.add(addr, 'seed') })
12+
13+ // populate peertable with pub announcements on the feed
14+ pull(
15+ server.messagesByType({
16+ type: 'pub', live: true, keys: false
17+ }),
18+ pull.drain(function (msg) {
19+ if (!msg.content.address) return
20+ if (ref.isAddress(msg.content.address)) {
21+ // PATCH: only self will be connected to immediately, others will roll out slowly
22+ gossip.add(msg.content.address, msg.author === server.id ? 'self' : 'pub')
23+ }
24+ })
25+ )
26+
27+ // populate peertable with announcements on the LAN multicast
28+ server.on('local', function (_peer) {
29+ gossip.add(_peer, 'local')
30+ })
31+
32+}
lib/gossip-with-slow-rollout/schedule.jsView
@@ -1,0 +1,212 @@
1+var nonPrivate = require('non-private-ip')
2+var ip = require('ip')
3+var onWakeup = require('on-wakeup')
4+var onNetwork = require('on-change-network')
5+
6+var Stats = require('statistics')
7+var os = require('os')
8+var pull = require('pull-stream')
9+var u = require('scuttlebot/lib/util')
10+
11+function rand(array) {
12+ return array[~~(Math.random()*array.length)]
13+}
14+
15+function not (fn) {
16+ return function (e) { return !fn(e) }
17+}
18+
19+function and () {
20+ var args = [].slice.call(arguments)
21+ return function (value) {
22+ return args.every(function (fn) { return fn.call(null, value) })
23+ }
24+}
25+
26+//min delay (delay since last disconnect of most recent peer in unconnected set)
27+//unconnected filter delay peer < min delay
28+function delay (failures, factor, max) {
29+ return Math.min(Math.pow(2, failures)*factor, max || Infinity)
30+}
31+
32+function maxStateChange (M, e) {
33+ return Math.max(M, e.stateChange || 0)
34+}
35+
36+function peerNext(peer, opts) {
37+ return (peer.stateChange|0) + delay(peer.failure|0, opts.factor, opts.max)
38+}
39+
40+
41+//detect if not connected to wifi or other network
42+//(i.e. if there is only localhost)
43+
44+function isOffline (e) {
45+ if(ip.isLoopback(e.host)) return false
46+ var lo = Object.keys(os.networkInterfaces())
47+ return lo.length === 1 && lo[0] === 'lo'
48+}
49+
50+var isOnline = not(isOffline)
51+
52+function isLocal (e) {
53+ // don't rely on private ip address, because
54+ // cjdns creates fake private ip addresses.
55+ return ip.isPrivate(e.host) && e.type === 'local'
56+}
57+
58+function isFollowing (e) {
59+ return e.source === 'self'
60+}
61+
62+function isUnattempted (e) {
63+ return !e.stateChange
64+}
65+
66+//select peers which have never been successfully connected to yet,
67+//but have been tried.
68+function isInactive (e) {
69+ return e.stateChange && e.duration.mean == 0
70+}
71+
72+function isLongterm (e) {
73+ return e.ping && e.ping.rtt.mean > 0
74+}
75+
76+//peers which we can connect to, but are not upgraded.
77+//select peers which we can connect to, but are not upgraded to LT.
78+//assume any peer is legacy, until we know otherwise...
79+function isLegacy (peer) {
80+ return peer.duration.mean > 0 && !exports.isLongterm(peer)
81+}
82+
83+function isConnect (e) {
84+ return 'connected' === e.state || 'connecting' === e.state
85+}
86+
87+//sort oldest to newest then take first n
88+function earliest(peers, n) {
89+ return peers.sort(function (a, b) {
90+ return a.stateChange - b.stateChange
91+ }).slice(0, Math.max(n, 0))
92+}
93+
94+function select(peers, ts, filter, opts) {
95+ if(opts.disable) return []
96+ //opts: { quota, groupMin, min, factor, max }
97+ var type = peers.filter(filter)
98+ var unconnect = type.filter(not(isConnect))
99+ var count = Math.max(opts.quota - type.filter(isConnect).length, 0)
100+ var min = unconnect.reduce(maxStateChange, 0) + opts.groupMin
101+ if(ts < min) return []
102+
103+ return earliest(unconnect.filter(function (peer) {
104+ return peerNext(peer, opts) < ts
105+ }), count)
106+}
107+
108+var schedule = exports = module.exports =
109+function (gossip, config, server) {
110+
111+ var min = 60e3, hour = 60*60e3
112+
113+ //trigger hard reconnect after suspend or local network changes
114+ onWakeup(gossip.reconnect)
115+ onNetwork(gossip.reconnect)
116+
117+ function conf(name, def) {
118+ if(!config.gossip) return def
119+ var value = config.gossip[name]
120+ return (value === undefined || value === '') ? def : value
121+ }
122+
123+ function connect (peers, ts, name, filter, opts) {
124+ var connected = peers.filter(isConnect).filter(filter)
125+ .filter(function (peer) {
126+ // don't disconnect from followed pubs
127+ return !isFollowing(peer) && peer.stateChange + 10e3 < ts
128+ })
129+
130+ if(connected.length > opts.quota) {
131+ return earliest(connected, connected.length - opts.quota)
132+ .forEach(function (peer) {
133+ gossip.disconnect(peer)
134+ })
135+ }
136+
137+ select(peers, ts, and(filter, isOnline), opts)
138+ .forEach(function (peer) {
139+ gossip.connect(peer)
140+ })
141+ }
142+
143+
144+ var connecting = false
145+ function connections () {
146+ if(connecting) return
147+ connecting = true
148+ setTimeout(function () {
149+ connecting = false
150+ var ts = Date.now()
151+ var peers = gossip.peers()
152+
153+// if(Math.random() > 0.1) return
154+
155+ connect(peers, ts, 'following', exports.isFollowing, {
156+ min: 0, quota: 5, factor: 0, max: 0, groupMin: 0,
157+ disable: !conf('global', true)
158+ })
159+
160+ connect(peers, ts, 'attempt', exports.isUnattempted, {
161+ min: 0, quota: 1, factor: 2e3, max: 0, groupMin: 0,
162+ disable: !conf('global', true)
163+ })
164+
165+ //quota, groupMin, min, factor, max
166+ connect(peers, ts, 'retry', exports.isInactive, {
167+ min: 0,
168+ quota: 3, factor: 5*60e3, max: 3*60*60e3, groupMin: 5*50e3
169+ })
170+
171+ connect(peers, ts, 'legacy', exports.isLegacy, {
172+ quota: 3, factor: 5*min, max: 3*hour, groupMin: 5*min,
173+ disable: !conf('global', true)
174+ })
175+
176+ connect(peers, ts, 'longterm', exports.isLongterm, {
177+ quota: 3, factor: 10e3, max: 10*min, groupMin: 5e3,
178+ disable: !conf('global', true)
179+ })
180+
181+ connect(peers, ts, 'local', exports.isLocal, {
182+ quota: 3, factor: 2e3, max: 10*min, groupMin: 1e3,
183+ disable: !conf('local', true)
184+ })
185+
186+ }, 100*Math.random())
187+
188+ }
189+
190+ pull(
191+ gossip.changes(),
192+ pull.drain(function (ev) {
193+ if(ev.type == 'disconnect')
194+ connections()
195+ })
196+ )
197+
198+ var int = setInterval(connections, 2e3)
199+ if(int.unref) int.unref()
200+
201+ connections()
202+
203+}
204+
205+exports.isUnattempted = isUnattempted
206+exports.isInactive = isInactive
207+exports.isLongterm = isLongterm
208+exports.isFollowing = isFollowing
209+exports.isLegacy = isLegacy
210+exports.isLocal = isLocal
211+exports.isConnectedOrConnecting = isConnect
212+exports.select = select

Built with git-ssb-web