Commit c2421901bdd957ae5e4d3f66accdd85bfd7cd5c8
use persistent-gossip without the followed pub hack (which was causing reconnects and high cpu)
Matt McKegg committed on 11/2/2016, 10:43:48 AMParent: aefef382635ee1b4f4e8b5370d9dfa59f24fe15f
Files changed
lib/gossip-with-slow-rollout/index.js | ||
---|---|---|
@@ -1,212 +1,0 @@ | ||
1 | -'use strict' | |
2 | -var pull = require('pull-stream') | |
3 | -var Notify = require('pull-notify') | |
4 | -var mdm = require('mdmanifest') | |
5 | -var valid = require('scuttlebot/lib/validators') | |
6 | -var apidoc = require('scuttlebot/lib/apidocs').gossip | |
7 | -var u = require('scuttlebot/lib/util') | |
8 | -var ref = require('ssb-ref') | |
9 | -var ping = require('pull-ping') | |
10 | -var Stats = require('statistics') | |
11 | -var Schedule = require('./schedule') | |
12 | -var Init = require('./init') | |
13 | - | |
14 | -function isFunction (f) { | |
15 | - return 'function' === typeof f | |
16 | -} | |
17 | - | |
18 | -function stringify(peer) { | |
19 | - return [peer.host, peer.port, peer.key].join(':') | |
20 | -} | |
21 | - | |
22 | -/* | |
23 | -Peers : [{ | |
24 | - key: id, | |
25 | - host: ip, | |
26 | - port: int, | |
27 | - //to be backwards compatible with patchwork... | |
28 | - announcers: {length: int} | |
29 | - source: 'pub'|'manual'|'local' | |
30 | -}] | |
31 | -*/ | |
32 | - | |
33 | - | |
34 | -module.exports = { | |
35 | - name: 'gossip', | |
36 | - version: '1.0.0', | |
37 | - manifest: mdm.manifest(apidoc), | |
38 | - permissions: { | |
39 | - anonymous: {allow: ['ping']} | |
40 | - }, | |
41 | - init: function (server, config) { | |
42 | - var notify = Notify() | |
43 | - var conf = config.gossip || {} | |
44 | - var home = ref.parseAddress(server.getAddress()) | |
45 | - | |
46 | - //Known Peers | |
47 | - var peers = [] | |
48 | - | |
49 | - function getPeer(id) { | |
50 | - return u.find(peers, function (e) { | |
51 | - return e && e.key === id | |
52 | - }) | |
53 | - } | |
54 | - | |
55 | - var timer_ping = 5*6e4 | |
56 | - | |
57 | - var gossip = { | |
58 | - wakeup: 0, | |
59 | - peers: function () { | |
60 | - return peers | |
61 | - }, | |
62 | - get: function (addr) { | |
63 | - addr = ref.parseAddress(addr) | |
64 | - return u.find(peers, function (a) { | |
65 | - return ( | |
66 | - addr.port === a.port | |
67 | - && addr.host === a.host | |
68 | - && addr.key === a.key | |
69 | - ) | |
70 | - }) | |
71 | - }, | |
72 | - connect: valid.async(function (addr, cb) { | |
73 | - addr = ref.parseAddress(addr) | |
74 | - if (!addr || typeof addr != 'object') | |
75 | - return cb(new Error('first param must be an address')) | |
76 | - | |
77 | - if(!addr.key) return cb(new Error('address must have ed25519 key')) | |
78 | - // add peer to the table, incase it isn't already. | |
79 | - gossip.add(addr, 'manual') | |
80 | - var p = gossip.get(addr) | |
81 | - if(!p) return cb() | |
82 | - | |
83 | - p.stateChange = Date.now() | |
84 | - p.state = 'connecting' | |
85 | - server.connect(p, function (err, rpc) { | |
86 | - if (err) { | |
87 | - p.state = undefined | |
88 | - p.failure = (p.failure || 0) + 1 | |
89 | - p.stateChange = Date.now() | |
90 | - notify({ type: 'connect-failure', peer: p }) | |
91 | - server.emit('log:info', ['SBOT', p.host+':'+p.port+p.key, 'connection failed', err.message || err]) | |
92 | - p.duration.value(0) | |
93 | - return (cb && cb(err)) | |
94 | - } | |
95 | - else { | |
96 | - p.state = 'connected' | |
97 | - p.failure = 0 | |
98 | - } | |
99 | - cb && cb(null, rpc) | |
100 | - }) | |
101 | - | |
102 | - }, 'string|object'), | |
103 | - | |
104 | - disconnect: valid.async(function (addr, cb) { | |
105 | - var peer = this.get(addr) | |
106 | - | |
107 | - peer.state = 'disconnecting' | |
108 | - peer.stateChange = Date.now() | |
109 | - if(!peer || !peer.disconnect) cb && cb() | |
110 | - else peer.disconnect(true, function (err) { | |
111 | - peer.stateChange = Date.now() | |
112 | - }) | |
113 | - | |
114 | - }, 'string|object'), | |
115 | - | |
116 | - changes: function () { | |
117 | - return notify.listen() | |
118 | - }, | |
119 | - //add an address to the peer table. | |
120 | - add: valid.sync(function (addr, source) { | |
121 | - addr = ref.parseAddress(addr) | |
122 | - if(!ref.isAddress(addr)) | |
123 | - throw new Error('not a valid address:' + JSON.stringify(addr)) | |
124 | - // check that this is a valid address, and not pointing at self. | |
125 | - | |
126 | - if(addr.key === home.key) return | |
127 | - if(addr.host === home.host && addr.port === home.port) return | |
128 | - | |
129 | - var f = gossip.get(addr) | |
130 | - | |
131 | - if(!f) { | |
132 | - // new peer | |
133 | - addr.source = source | |
134 | - addr.announcers = 1 | |
135 | - addr.duration = Stats() | |
136 | - peers.push(addr) | |
137 | - notify({ type: 'discover', peer: addr, source: source || 'manual' }) | |
138 | - return addr | |
139 | - } | |
140 | - //don't count local over and over | |
141 | - else if(f.source != 'local') | |
142 | - f.announcers ++ | |
143 | - | |
144 | - return f | |
145 | - }, 'string|object', 'string?'), | |
146 | - ping: function (opts) { | |
147 | - var timeout = config.timers && config.timers.ping || 5*60e3 | |
148 | - //between 10 seconds and 30 minutes, default 5 min | |
149 | - timeout = Math.max(10e3, Math.min(timeout, 30*60e3)) | |
150 | - return ping({timeout: timeout}) | |
151 | - }, | |
152 | - reconnect: function () { | |
153 | - for(var id in server.peers) | |
154 | - if(id !== server.id) //don't disconnect local client | |
155 | - server.peers[id].forEach(function (peer) { | |
156 | - peer.close(true) | |
157 | - }) | |
158 | - return gossip.wakeup = Date.now() | |
159 | - } | |
160 | - } | |
161 | - | |
162 | - Schedule (gossip, config, server) | |
163 | - Init (gossip, config, server) | |
164 | - //get current state | |
165 | - | |
166 | - server.on('rpc:connect', function (rpc, isClient) { | |
167 | - var peer = getPeer(rpc.id) | |
168 | - //don't track clients that connect, but arn't considered peers. | |
169 | - //maybe we should though? | |
170 | - if(!peer) return | |
171 | - console.log('Connected', stringify(peer)) | |
172 | - //means that we have created this connection, not received it. | |
173 | - peer.client = !!isClient | |
174 | - peer.state = 'connected' | |
175 | - peer.stateChange = Date.now() | |
176 | - peer.disconnect = function (err, cb) { | |
177 | - if(isFunction(err)) cb = err, err = null | |
178 | - rpc.close(err, cb) | |
179 | - } | |
180 | - | |
181 | - if(isClient) { | |
182 | - //default ping is 5 minutes... | |
183 | - var pp = ping({serve: true, timeout: timer_ping}, function (_) {}) | |
184 | - peer.ping = {rtt: pp.rtt, skew: pp.skew} | |
185 | - pull( | |
186 | - pp, | |
187 | - rpc.gossip.ping({timeout: timer_ping}, function (err) { | |
188 | - if(err.name === 'TypeError') peer.ping.fail = true | |
189 | - }), | |
190 | - pp | |
191 | - ) | |
192 | - } | |
193 | - | |
194 | - rpc.on('closed', function () { | |
195 | - console.log('Disconnected', stringify(peer)) | |
196 | - //track whether we have successfully connected. | |
197 | - //or how many failures there have been. | |
198 | - var since = peer.stateChange | |
199 | - peer.stateChange = Date.now() | |
200 | - if(peer.state === 'connected') //may be "disconnecting" | |
201 | - peer.duration.value(peer.stateChange - since) | |
202 | - peer.state = undefined | |
203 | - notify({ type: 'disconnect', peer: peer }) | |
204 | - server.emit('log:info', ['SBOT', rpc.id, 'disconnect']) | |
205 | - }) | |
206 | - | |
207 | - notify({ type: 'connect', peer: peer }) | |
208 | - }) | |
209 | - | |
210 | - return gossip | |
211 | - } | |
212 | -} |
lib/gossip-with-slow-rollout/init.js | ||
---|---|---|
@@ -1,32 +1,0 @@ | ||
1 | -var isArray = Array.isArray | |
2 | -var pull = require('pull-stream') | |
3 | -var ref = require('ssb-ref') | |
4 | - | |
5 | -module.exports = function (gossip, config, server) { | |
6 | - | |
7 | - // populate peertable with configured seeds (mainly used in testing) | |
8 | - var seeds = config.seeds | |
9 | - | |
10 | - ;(isArray(seeds) ? seeds : [seeds]).filter(Boolean) | |
11 | - .forEach(function (addr) { gossip.add(addr, 'seed') }) | |
12 | - | |
13 | - // populate peertable with pub announcements on the feed | |
14 | - pull( | |
15 | - server.messagesByType({ | |
16 | - type: 'pub', live: true, keys: false | |
17 | - }), | |
18 | - pull.drain(function (msg) { | |
19 | - if (!msg.content.address) return | |
20 | - if (ref.isAddress(msg.content.address)) { | |
21 | - // PATCH: only self will be connected to immediately, others will roll out slowly | |
22 | - gossip.add(msg.content.address, msg.author === server.id ? 'self' : 'pub') | |
23 | - } | |
24 | - }) | |
25 | - ) | |
26 | - | |
27 | - // populate peertable with announcements on the LAN multicast | |
28 | - server.on('local', function (_peer) { | |
29 | - gossip.add(_peer, 'local') | |
30 | - }) | |
31 | - | |
32 | -} |
lib/gossip-with-slow-rollout/schedule.js | ||
---|---|---|
@@ -1,219 +1,0 @@ | ||
1 | -var nonPrivate = require('non-private-ip') | |
2 | -var ip = require('ip') | |
3 | -var onWakeup = require('on-wakeup') | |
4 | -var onNetwork = require('on-change-network') | |
5 | -var hasNetwork = require('has-network') | |
6 | - | |
7 | -var pull = require('pull-stream') | |
8 | -var u = require('scuttlebot/lib/util') | |
9 | - | |
10 | -function rand(array) { | |
11 | - return array[~~(Math.random()*array.length)] | |
12 | -} | |
13 | - | |
14 | -function not (fn) { | |
15 | - return function (e) { return !fn(e) } | |
16 | -} | |
17 | - | |
18 | -function and () { | |
19 | - var args = [].slice.call(arguments) | |
20 | - return function (value) { | |
21 | - return args.every(function (fn) { return fn.call(null, value) }) | |
22 | - } | |
23 | -} | |
24 | - | |
25 | -//min delay (delay since last disconnect of most recent peer in unconnected set) | |
26 | -//unconnected filter delay peer < min delay | |
27 | -function delay (failures, factor, max) { | |
28 | - return Math.min(Math.pow(2, failures)*factor, max || Infinity) | |
29 | -} | |
30 | - | |
31 | -function maxStateChange (M, e) { | |
32 | - return Math.max(M, e.stateChange || 0) | |
33 | -} | |
34 | - | |
35 | -function peerNext(peer, opts) { | |
36 | - return (peer.stateChange|0) + delay(peer.failure|0, opts.factor, opts.max) | |
37 | -} | |
38 | - | |
39 | -function isFollowing (e) { | |
40 | - return e.source === 'self' | |
41 | -} | |
42 | - | |
43 | -//detect if not connected to wifi or other network | |
44 | -//(i.e. if there is only localhost) | |
45 | - | |
46 | -function isOffline (e) { | |
47 | - if(ip.isLoopback(e.host)) return false | |
48 | - return !hasNetwork() | |
49 | -} | |
50 | - | |
51 | -var isOnline = not(isOffline) | |
52 | - | |
53 | -function isLocal (e) { | |
54 | - // don't rely on private ip address, because | |
55 | - // cjdns creates fake private ip addresses. | |
56 | - return ip.isPrivate(e.host) && e.type === 'local' | |
57 | -} | |
58 | - | |
59 | -function isUnattempted (e) { | |
60 | - return !e.stateChange | |
61 | -} | |
62 | - | |
63 | -//select peers which have never been successfully connected to yet, | |
64 | -//but have been tried. | |
65 | -function isInactive (e) { | |
66 | - return e.stateChange && (!e.duration || e.duration.mean == 0) | |
67 | -} | |
68 | - | |
69 | -function isLongterm (e) { | |
70 | - return e.ping && e.ping.rtt && e.ping.rtt.mean > 0 | |
71 | -} | |
72 | - | |
73 | -//peers which we can connect to, but are not upgraded. | |
74 | -//select peers which we can connect to, but are not upgraded to LT. | |
75 | -//assume any peer is legacy, until we know otherwise... | |
76 | -function isLegacy (peer) { | |
77 | - return peer.duration && peer.duration.mean > 0 && !exports.isLongterm(peer) | |
78 | -} | |
79 | - | |
80 | -function isConnect (e) { | |
81 | - return 'connected' === e.state || 'connecting' === e.state | |
82 | -} | |
83 | - | |
84 | -//sort oldest to newest then take first n | |
85 | -function earliest(peers, n) { | |
86 | - return peers.sort(function (a, b) { | |
87 | - return a.stateChange - b.stateChange | |
88 | - }).slice(0, Math.max(n, 0)) | |
89 | -} | |
90 | - | |
91 | -function select(peers, ts, filter, opts) { | |
92 | - if(opts.disable) return [] | |
93 | - //opts: { quota, groupMin, min, factor, max } | |
94 | - var type = peers.filter(filter) | |
95 | - var unconnect = type.filter(not(isConnect)) | |
96 | - var count = Math.max(opts.quota - type.filter(isConnect).length, 0) | |
97 | - var min = unconnect.reduce(maxStateChange, 0) + opts.groupMin | |
98 | - if(ts < min) return [] | |
99 | - | |
100 | - return earliest(unconnect.filter(function (peer) { | |
101 | - return peerNext(peer, opts) < ts | |
102 | - }), count) | |
103 | -} | |
104 | - | |
105 | -var schedule = exports = module.exports = | |
106 | -function (gossip, config, server) { | |
107 | -// return | |
108 | - var min = 60e3, hour = 60*60e3 | |
109 | - | |
110 | - //trigger hard reconnect after suspend or local network changes | |
111 | - onWakeup(gossip.reconnect) | |
112 | - onNetwork(gossip.reconnect) | |
113 | - | |
114 | - function conf(name, def) { | |
115 | - if(!config.gossip) return def | |
116 | - var value = config.gossip[name] | |
117 | - return (value === undefined || value === '') ? def : value | |
118 | - } | |
119 | - | |
120 | - function connect (peers, ts, name, filter, opts) { | |
121 | - var connected = peers.filter(isConnect).filter(filter) | |
122 | - | |
123 | - //disconnect if over quota | |
124 | - if(connected.length > opts.quota) { | |
125 | - return earliest(connected, connected.length - opts.quota) | |
126 | - .forEach(function (peer) { | |
127 | - gossip.disconnect(peer) | |
128 | - }) | |
129 | - } | |
130 | - | |
131 | - //will return [] if the quota is full | |
132 | - var selected = select(peers, ts, and(filter, isOnline), opts) | |
133 | - selected | |
134 | - .forEach(function (peer) { | |
135 | - gossip.connect(peer) | |
136 | - }) | |
137 | - } | |
138 | - | |
139 | - | |
140 | - var connecting = false | |
141 | - function connections () { | |
142 | - if(connecting) return | |
143 | - connecting = true | |
144 | - setTimeout(function () { | |
145 | - connecting = false | |
146 | - var ts = Date.now() | |
147 | - var peers = gossip.peers() | |
148 | - | |
149 | - var connected = peers.filter(isConnect).length | |
150 | - | |
151 | - connect(peers, ts, 'following', exports.isFollowing, { | |
152 | - min: 0, quota: 5, factor: 0, max: 0, groupMin: 0, | |
153 | - disable: !conf('global', true) | |
154 | - }) | |
155 | - | |
156 | - connect(peers, ts, 'longterm', exports.isLongterm, { | |
157 | - quota: 3, factor: 10e3, max: 10*min, groupMin: 5e3, | |
158 | - disable: !conf('global', true) | |
159 | - }) | |
160 | - | |
161 | - connect(peers, ts, 'local', exports.isLocal, { | |
162 | - quota: 3, factor: 2e3, max: 10*min, groupMin: 1e3, | |
163 | - disable: !conf('local', true) | |
164 | - }) | |
165 | - | |
166 | - if(connected === 0) | |
167 | - connect(peers, ts, 'attempt', exports.isUnattempted, { | |
168 | - min: 0, quota: 1, factor: 0, max: 0, groupMin: 0, | |
169 | - disable: !conf('global', true) | |
170 | - }) | |
171 | - | |
172 | - //quota, groupMin, min, factor, max | |
173 | - connect(peers, ts, 'retry', exports.isInactive, { | |
174 | - min: 0, | |
175 | - quota: 3, factor: 5*60e3, max: 3*60*60e3, groupMin: 5*50e3 | |
176 | - }) | |
177 | - | |
178 | - var longterm = peers.filter(isConnect).filter(exports.isLongterm).length | |
179 | - | |
180 | - connect(peers, ts, 'legacy', exports.isLegacy, { | |
181 | - quota: 3 - longterm, | |
182 | - factor: 5*min, max: 3*hour, groupMin: 5*min, | |
183 | - disable: !conf('global', true) | |
184 | - }) | |
185 | - | |
186 | - peers.filter(isConnect).forEach(function (e) { | |
187 | - if((!exports.isLongterm(e) || e.state === 'connecting') && e.stateChange + 10e3 < ts) { | |
188 | - gossip.disconnect(e) | |
189 | - } | |
190 | - | |
191 | - }) | |
192 | - | |
193 | - }, 100*Math.random()) | |
194 | - | |
195 | - } | |
196 | - | |
197 | - pull( | |
198 | - gossip.changes(), | |
199 | - pull.drain(function (ev) { | |
200 | - if(ev.type == 'disconnect') | |
201 | - connections() | |
202 | - }) | |
203 | - ) | |
204 | - | |
205 | - var int = setInterval(connections, 2e3) | |
206 | - if(int.unref) int.unref() | |
207 | - | |
208 | - connections() | |
209 | - | |
210 | -} | |
211 | - | |
212 | -exports.isUnattempted = isUnattempted | |
213 | -exports.isInactive = isInactive | |
214 | -exports.isFollowing = isFollowing | |
215 | -exports.isLongterm = isLongterm | |
216 | -exports.isLegacy = isLegacy | |
217 | -exports.isLocal = isLocal | |
218 | -exports.isConnectedOrConnecting = isConnect | |
219 | -exports.select = select |
lib/persistent-gossip/index.js | ||
---|---|---|
@@ -1,0 +1,212 @@ | ||
1 … | +'use strict' | |
2 … | +var pull = require('pull-stream') | |
3 … | +var Notify = require('pull-notify') | |
4 … | +var mdm = require('mdmanifest') | |
5 … | +var valid = require('scuttlebot/lib/validators') | |
6 … | +var apidoc = require('scuttlebot/lib/apidocs').gossip | |
7 … | +var u = require('scuttlebot/lib/util') | |
8 … | +var ref = require('ssb-ref') | |
9 … | +var ping = require('pull-ping') | |
10 … | +var Stats = require('statistics') | |
11 … | +var Schedule = require('./schedule') | |
12 … | +var Init = require('scuttlebot/plugins/gossip/init') | |
13 … | + | |
14 … | +function isFunction (f) { | |
15 … | + return 'function' === typeof f | |
16 … | +} | |
17 … | + | |
18 … | +function stringify(peer) { | |
19 … | + return [peer.host, peer.port, peer.key].join(':') | |
20 … | +} | |
21 … | + | |
22 … | +/* | |
23 … | +Peers : [{ | |
24 … | + key: id, | |
25 … | + host: ip, | |
26 … | + port: int, | |
27 … | + //to be backwards compatible with patchwork... | |
28 … | + announcers: {length: int} | |
29 … | + source: 'pub'|'manual'|'local' | |
30 … | +}] | |
31 … | +*/ | |
32 … | + | |
33 … | + | |
34 … | +module.exports = { | |
35 … | + name: 'gossip', | |
36 … | + version: '1.0.0', | |
37 … | + manifest: mdm.manifest(apidoc), | |
38 … | + permissions: { | |
39 … | + anonymous: {allow: ['ping']} | |
40 … | + }, | |
41 … | + init: function (server, config) { | |
42 … | + var notify = Notify() | |
43 … | + var conf = config.gossip || {} | |
44 … | + var home = ref.parseAddress(server.getAddress()) | |
45 … | + | |
46 … | + //Known Peers | |
47 … | + var peers = [] | |
48 … | + | |
49 … | + function getPeer(id) { | |
50 … | + return u.find(peers, function (e) { | |
51 … | + return e && e.key === id | |
52 … | + }) | |
53 … | + } | |
54 … | + | |
55 … | + var timer_ping = 5*6e4 | |
56 … | + | |
57 … | + var gossip = { | |
58 … | + wakeup: 0, | |
59 … | + peers: function () { | |
60 … | + return peers | |
61 … | + }, | |
62 … | + get: function (addr) { | |
63 … | + addr = ref.parseAddress(addr) | |
64 … | + return u.find(peers, function (a) { | |
65 … | + return ( | |
66 … | + addr.port === a.port | |
67 … | + && addr.host === a.host | |
68 … | + && addr.key === a.key | |
69 … | + ) | |
70 … | + }) | |
71 … | + }, | |
72 … | + connect: valid.async(function (addr, cb) { | |
73 … | + addr = ref.parseAddress(addr) | |
74 … | + if (!addr || typeof addr != 'object') | |
75 … | + return cb(new Error('first param must be an address')) | |
76 … | + | |
77 … | + if(!addr.key) return cb(new Error('address must have ed25519 key')) | |
78 … | + // add peer to the table, incase it isn't already. | |
79 … | + gossip.add(addr, 'manual') | |
80 … | + var p = gossip.get(addr) | |
81 … | + if(!p) return cb() | |
82 … | + | |
83 … | + p.stateChange = Date.now() | |
84 … | + p.state = 'connecting' | |
85 … | + server.connect(p, function (err, rpc) { | |
86 … | + if (err) { | |
87 … | + p.state = undefined | |
88 … | + p.failure = (p.failure || 0) + 1 | |
89 … | + p.stateChange = Date.now() | |
90 … | + notify({ type: 'connect-failure', peer: p }) | |
91 … | + server.emit('log:info', ['SBOT', p.host+':'+p.port+p.key, 'connection failed', err.message || err]) | |
92 … | + p.duration.value(0) | |
93 … | + return (cb && cb(err)) | |
94 … | + } | |
95 … | + else { | |
96 … | + p.state = 'connected' | |
97 … | + p.failure = 0 | |
98 … | + } | |
99 … | + cb && cb(null, rpc) | |
100 … | + }) | |
101 … | + | |
102 … | + }, 'string|object'), | |
103 … | + | |
104 … | + disconnect: valid.async(function (addr, cb) { | |
105 … | + var peer = this.get(addr) | |
106 … | + | |
107 … | + peer.state = 'disconnecting' | |
108 … | + peer.stateChange = Date.now() | |
109 … | + if(!peer || !peer.disconnect) cb && cb() | |
110 … | + else peer.disconnect(true, function (err) { | |
111 … | + peer.stateChange = Date.now() | |
112 … | + }) | |
113 … | + | |
114 … | + }, 'string|object'), | |
115 … | + | |
116 … | + changes: function () { | |
117 … | + return notify.listen() | |
118 … | + }, | |
119 … | + //add an address to the peer table. | |
120 … | + add: valid.sync(function (addr, source) { | |
121 … | + addr = ref.parseAddress(addr) | |
122 … | + if(!ref.isAddress(addr)) | |
123 … | + throw new Error('not a valid address:' + JSON.stringify(addr)) | |
124 … | + // check that this is a valid address, and not pointing at self. | |
125 … | + | |
126 … | + if(addr.key === home.key) return | |
127 … | + if(addr.host === home.host && addr.port === home.port) return | |
128 … | + | |
129 … | + var f = gossip.get(addr) | |
130 … | + | |
131 … | + if(!f) { | |
132 … | + // new peer | |
133 … | + addr.source = source | |
134 … | + addr.announcers = 1 | |
135 … | + addr.duration = Stats() | |
136 … | + peers.push(addr) | |
137 … | + notify({ type: 'discover', peer: addr, source: source || 'manual' }) | |
138 … | + return addr | |
139 … | + } | |
140 … | + //don't count local over and over | |
141 … | + else if(f.source != 'local') | |
142 … | + f.announcers ++ | |
143 … | + | |
144 … | + return f | |
145 … | + }, 'string|object', 'string?'), | |
146 … | + ping: function (opts) { | |
147 … | + var timeout = config.timers && config.timers.ping || 5*60e3 | |
148 … | + //between 10 seconds and 30 minutes, default 5 min | |
149 … | + timeout = Math.max(10e3, Math.min(timeout, 30*60e3)) | |
150 … | + return ping({timeout: timeout}) | |
151 … | + }, | |
152 … | + reconnect: function () { | |
153 … | + for(var id in server.peers) | |
154 … | + if(id !== server.id) //don't disconnect local client | |
155 … | + server.peers[id].forEach(function (peer) { | |
156 … | + peer.close(true) | |
157 … | + }) | |
158 … | + return gossip.wakeup = Date.now() | |
159 … | + } | |
160 … | + } | |
161 … | + | |
162 … | + Schedule (gossip, config, server) | |
163 … | + Init (gossip, config, server) | |
164 … | + //get current state | |
165 … | + | |
166 … | + server.on('rpc:connect', function (rpc, isClient) { | |
167 … | + var peer = getPeer(rpc.id) | |
168 … | + //don't track clients that connect, but arn't considered peers. | |
169 … | + //maybe we should though? | |
170 … | + if(!peer) return | |
171 … | + console.log('Connected', stringify(peer)) | |
172 … | + //means that we have created this connection, not received it. | |
173 … | + peer.client = !!isClient | |
174 … | + peer.state = 'connected' | |
175 … | + peer.stateChange = Date.now() | |
176 … | + peer.disconnect = function (err, cb) { | |
177 … | + if(isFunction(err)) cb = err, err = null | |
178 … | + rpc.close(err, cb) | |
179 … | + } | |
180 … | + | |
181 … | + if(isClient) { | |
182 … | + //default ping is 5 minutes... | |
183 … | + var pp = ping({serve: true, timeout: timer_ping}, function (_) {}) | |
184 … | + peer.ping = {rtt: pp.rtt, skew: pp.skew} | |
185 … | + pull( | |
186 … | + pp, | |
187 … | + rpc.gossip.ping({timeout: timer_ping}, function (err) { | |
188 … | + if(err.name === 'TypeError') peer.ping.fail = true | |
189 … | + }), | |
190 … | + pp | |
191 … | + ) | |
192 … | + } | |
193 … | + | |
194 … | + rpc.on('closed', function () { | |
195 … | + console.log('Disconnected', stringify(peer)) | |
196 … | + //track whether we have successfully connected. | |
197 … | + //or how many failures there have been. | |
198 … | + var since = peer.stateChange | |
199 … | + peer.stateChange = Date.now() | |
200 … | + if(peer.state === 'connected') //may be "disconnecting" | |
201 … | + peer.duration.value(peer.stateChange - since) | |
202 … | + peer.state = undefined | |
203 … | + notify({ type: 'disconnect', peer: peer }) | |
204 … | + server.emit('log:info', ['SBOT', rpc.id, 'disconnect']) | |
205 … | + }) | |
206 … | + | |
207 … | + notify({ type: 'connect', peer: peer }) | |
208 … | + }) | |
209 … | + | |
210 … | + return gossip | |
211 … | + } | |
212 … | +} |
lib/persistent-gossip/schedule.js | ||
---|---|---|
@@ -1,0 +1,214 @@ | ||
1 … | +var nonPrivate = require('non-private-ip') | |
2 … | +var ip = require('ip') | |
3 … | +var onWakeup = require('on-wakeup') | |
4 … | +var onNetwork = require('on-change-network') | |
5 … | +var hasNetwork = require('has-network') | |
6 … | + | |
7 … | +var pull = require('pull-stream') | |
8 … | +var u = require('scuttlebot/lib/util') | |
9 … | + | |
10 … | +function rand(array) { | |
11 … | + return array[~~(Math.random()*array.length)] | |
12 … | +} | |
13 … | + | |
14 … | +function not (fn) { | |
15 … | + return function (e) { return !fn(e) } | |
16 … | +} | |
17 … | + | |
18 … | +function and () { | |
19 … | + var args = [].slice.call(arguments) | |
20 … | + return function (value) { | |
21 … | + return args.every(function (fn) { return fn.call(null, value) }) | |
22 … | + } | |
23 … | +} | |
24 … | + | |
25 … | +//min delay (delay since last disconnect of most recent peer in unconnected set) | |
26 … | +//unconnected filter delay peer < min delay | |
27 … | +function delay (failures, factor, max) { | |
28 … | + return Math.min(Math.pow(2, failures)*factor, max || Infinity) | |
29 … | +} | |
30 … | + | |
31 … | +function maxStateChange (M, e) { | |
32 … | + return Math.max(M, e.stateChange || 0) | |
33 … | +} | |
34 … | + | |
35 … | +function peerNext(peer, opts) { | |
36 … | + return (peer.stateChange|0) + delay(peer.failure|0, opts.factor, opts.max) | |
37 … | +} | |
38 … | + | |
39 … | +function isFollowing (e) { | |
40 … | + return e.source === 'self' | |
41 … | +} | |
42 … | + | |
43 … | +//detect if not connected to wifi or other network | |
44 … | +//(i.e. if there is only localhost) | |
45 … | + | |
46 … | +function isOffline (e) { | |
47 … | + if(ip.isLoopback(e.host)) return false | |
48 … | + return !hasNetwork() | |
49 … | +} | |
50 … | + | |
51 … | +var isOnline = not(isOffline) | |
52 … | + | |
53 … | +function isLocal (e) { | |
54 … | + // don't rely on private ip address, because | |
55 … | + // cjdns creates fake private ip addresses. | |
56 … | + return ip.isPrivate(e.host) && e.type === 'local' | |
57 … | +} | |
58 … | + | |
59 … | +function isUnattempted (e) { | |
60 … | + return !e.stateChange | |
61 … | +} | |
62 … | + | |
63 … | +//select peers which have never been successfully connected to yet, | |
64 … | +//but have been tried. | |
65 … | +function isInactive (e) { | |
66 … | + return e.stateChange && (!e.duration || e.duration.mean == 0) | |
67 … | +} | |
68 … | + | |
69 … | +function isLongterm (e) { | |
70 … | + return e.ping && e.ping.rtt && e.ping.rtt.mean > 0 | |
71 … | +} | |
72 … | + | |
73 … | +//peers which we can connect to, but are not upgraded. | |
74 … | +//select peers which we can connect to, but are not upgraded to LT. | |
75 … | +//assume any peer is legacy, until we know otherwise... | |
76 … | +function isLegacy (peer) { | |
77 … | + return peer.duration && peer.duration.mean > 0 && !exports.isLongterm(peer) | |
78 … | +} | |
79 … | + | |
80 … | +function isConnect (e) { | |
81 … | + return 'connected' === e.state || 'connecting' === e.state | |
82 … | +} | |
83 … | + | |
84 … | +//sort oldest to newest then take first n | |
85 … | +function earliest(peers, n) { | |
86 … | + return peers.sort(function (a, b) { | |
87 … | + return a.stateChange - b.stateChange | |
88 … | + }).slice(0, Math.max(n, 0)) | |
89 … | +} | |
90 … | + | |
91 … | +function select(peers, ts, filter, opts) { | |
92 … | + if(opts.disable) return [] | |
93 … | + //opts: { quota, groupMin, min, factor, max } | |
94 … | + var type = peers.filter(filter) | |
95 … | + var unconnect = type.filter(not(isConnect)) | |
96 … | + var count = Math.max(opts.quota - type.filter(isConnect).length, 0) | |
97 … | + var min = unconnect.reduce(maxStateChange, 0) + opts.groupMin | |
98 … | + if(ts < min) return [] | |
99 … | + | |
100 … | + return earliest(unconnect.filter(function (peer) { | |
101 … | + return peerNext(peer, opts) < ts | |
102 … | + }), count) | |
103 … | +} | |
104 … | + | |
105 … | +var schedule = exports = module.exports = | |
106 … | +function (gossip, config, server) { | |
107 … | +// return | |
108 … | + var min = 60e3, hour = 60*60e3 | |
109 … | + | |
110 … | + //trigger hard reconnect after suspend or local network changes | |
111 … | + onWakeup(gossip.reconnect) | |
112 … | + onNetwork(gossip.reconnect) | |
113 … | + | |
114 … | + function conf(name, def) { | |
115 … | + if(!config.gossip) return def | |
116 … | + var value = config.gossip[name] | |
117 … | + return (value === undefined || value === '') ? def : value | |
118 … | + } | |
119 … | + | |
120 … | + function connect (peers, ts, name, filter, opts) { | |
121 … | + var connected = peers.filter(isConnect).filter(filter) | |
122 … | + | |
123 … | + //disconnect if over quota | |
124 … | + if(connected.length > opts.quota) { | |
125 … | + return earliest(connected, connected.length - opts.quota) | |
126 … | + .forEach(function (peer) { | |
127 … | + gossip.disconnect(peer) | |
128 … | + }) | |
129 … | + } | |
130 … | + | |
131 … | + //will return [] if the quota is full | |
132 … | + var selected = select(peers, ts, and(filter, isOnline), opts) | |
133 … | + selected | |
134 … | + .forEach(function (peer) { | |
135 … | + gossip.connect(peer) | |
136 … | + }) | |
137 … | + } | |
138 … | + | |
139 … | + | |
140 … | + var connecting = false | |
141 … | + function connections () { | |
142 … | + if(connecting) return | |
143 … | + connecting = true | |
144 … | + setTimeout(function () { | |
145 … | + connecting = false | |
146 … | + var ts = Date.now() | |
147 … | + var peers = gossip.peers() | |
148 … | + | |
149 … | + var connected = peers.filter(isConnect).length | |
150 … | + | |
151 … | + connect(peers, ts, 'longterm', exports.isLongterm, { | |
152 … | + quota: 3, factor: 10e3, max: 10*min, groupMin: 5e3, | |
153 … | + disable: !conf('global', true) | |
154 … | + }) | |
155 … | + | |
156 … | + connect(peers, ts, 'local', exports.isLocal, { | |
157 … | + quota: 3, factor: 2e3, max: 10*min, groupMin: 1e3, | |
158 … | + disable: !conf('local', true) | |
159 … | + }) | |
160 … | + | |
161 … | + if(connected === 0) | |
162 … | + connect(peers, ts, 'attempt', exports.isUnattempted, { | |
163 … | + min: 0, quota: 1, factor: 0, max: 0, groupMin: 0, | |
164 … | + disable: !conf('global', true) | |
165 … | + }) | |
166 … | + | |
167 … | + //quota, groupMin, min, factor, max | |
168 … | + connect(peers, ts, 'retry', exports.isInactive, { | |
169 … | + min: 0, | |
170 … | + quota: 3, factor: 5*60e3, max: 3*60*60e3, groupMin: 5*50e3 | |
171 … | + }) | |
172 … | + | |
173 … | + var longterm = peers.filter(isConnect).filter(exports.isLongterm).length | |
174 … | + | |
175 … | + connect(peers, ts, 'legacy', exports.isLegacy, { | |
176 … | + quota: 3 - longterm, | |
177 … | + factor: 5*min, max: 3*hour, groupMin: 5*min, | |
178 … | + disable: !conf('global', true) | |
179 … | + }) | |
180 … | + | |
181 … | + peers.filter(isConnect).forEach(function (e) { | |
182 … | + if((!exports.isLongterm(e) || e.state === 'connecting') && e.stateChange + 10e3 < ts) { | |
183 … | + gossip.disconnect(e) | |
184 … | + } | |
185 … | + | |
186 … | + }) | |
187 … | + | |
188 … | + }, 100*Math.random()) | |
189 … | + | |
190 … | + } | |
191 … | + | |
192 … | + pull( | |
193 … | + gossip.changes(), | |
194 … | + pull.drain(function (ev) { | |
195 … | + if(ev.type == 'disconnect') | |
196 … | + connections() | |
197 … | + }) | |
198 … | + ) | |
199 … | + | |
200 … | + var int = setInterval(connections, 2e3) | |
201 … | + if(int.unref) int.unref() | |
202 … | + | |
203 … | + connections() | |
204 … | + | |
205 … | +} | |
206 … | + | |
207 … | +exports.isUnattempted = isUnattempted | |
208 … | +exports.isInactive = isInactive | |
209 … | +exports.isFollowing = isFollowing | |
210 … | +exports.isLongterm = isLongterm | |
211 … | +exports.isLegacy = isLegacy | |
212 … | +exports.isLocal = isLocal | |
213 … | +exports.isConnectedOrConnecting = isConnect | |
214 … | +exports.select = select |
server-process.js | ||
---|---|---|
@@ -4,9 +4,9 @@ | ||
4 | 4 … | var electron = require('electron') |
5 | 5 … | |
6 | 6 … | var createSbot = require('scuttlebot') |
7 | 7 … | .use(require('scuttlebot/plugins/master')) |
8 | - .use(require('./lib/gossip-with-slow-rollout')) // override | |
8 … | + .use(require('./lib/persistent-gossip')) // override | |
9 | 9 … | .use(require('scuttlebot/plugins/friends')) |
10 | 10 … | .use(require('scuttlebot/plugins/replicate')) |
11 | 11 … | .use(require('ssb-blobs')) |
12 | 12 … | .use(require('scuttlebot/plugins/invite')) |
Built with git-ssb-web