Files: d2b5693b01842e3376dae3c3cf5c579e03e490c4 / plugins / gossip / schedule.js
7551 bytesRaw
1 | |
2 | var ip = require('ip') |
3 | var onWakeup = require('on-wakeup') |
4 | var onNetwork = require('on-change-network') |
5 | var hasNetwork = require('../../lib/has-network-debounced') |
6 | |
7 | var pull = require('pull-stream') |
8 | |
9 | function not (fn) { |
10 | return function (e) { return !fn(e) } |
11 | } |
12 | |
13 | function and () { |
14 | var args = [].slice.call(arguments) |
15 | return function (value) { |
16 | return args.every(function (fn) { return fn.call(null, value) }) |
17 | } |
18 | } |
19 | |
20 | //min delay (delay since last disconnect of most recent peer in unconnected set) |
21 | //unconnected filter delay peer < min delay |
22 | function delay (failures, factor, max) { |
23 | return Math.min(Math.pow(2, failures)*factor, max || Infinity) |
24 | } |
25 | |
26 | function maxStateChange (M, e) { |
27 | return Math.max(M, e.stateChange || 0) |
28 | } |
29 | |
30 | function peerNext(peer, opts) { |
31 | return (peer.stateChange|0) + delay(peer.failure|0, opts.factor, opts.max) |
32 | } |
33 | |
34 | |
35 | //detect if not connected to wifi or other network |
36 | //(i.e. if there is only localhost) |
37 | |
38 | function isOffline (e) { |
39 | if(ip.isLoopback(e.host) || e.host == 'localhost') return false |
40 | return !hasNetwork() |
41 | } |
42 | |
43 | var isOnline = not(isOffline) |
44 | |
45 | function isLocal (e) { |
46 | // don't rely on private ip address, because |
47 | // cjdns creates fake private ip addresses. |
48 | // ignore localhost addresses, because sometimes they get broadcast. |
49 | return !ip.isLoopback(e.host) && ip.isPrivate(e.host) && e.source === 'local' |
50 | } |
51 | |
52 | function isSeed (e) { |
53 | return e.source === 'seed' |
54 | } |
55 | |
56 | function isFriend (e) { |
57 | return e.source === 'friends' |
58 | } |
59 | |
60 | function isUnattempted (e) { |
61 | return !e.stateChange |
62 | } |
63 | |
64 | //select peers which have never been successfully connected to yet, |
65 | //but have been tried. |
66 | function isInactive (e) { |
67 | return e.stateChange && (!e.duration || e.duration.mean == 0) |
68 | } |
69 | |
70 | function isLongterm (e) { |
71 | return e.ping && e.ping.rtt && e.ping.rtt.mean > 0 |
72 | } |
73 | |
74 | //peers which we can connect to, but are not upgraded. |
75 | //select peers which we can connect to, but are not upgraded to LT. |
76 | //assume any peer is legacy, until we know otherwise... |
77 | function isLegacy (peer) { |
78 | return peer.duration && (peer.duration && peer.duration.mean > 0) && !exports.isLongterm(peer) |
79 | } |
80 | |
81 | function isConnect (e) { |
82 | return 'connected' === e.state || 'connecting' === e.state |
83 | } |
84 | |
85 | //sort oldest to newest then take first n |
86 | function earliest(peers, n) { |
87 | return peers.sort(function (a, b) { |
88 | return a.stateChange - b.stateChange |
89 | }).slice(0, Math.max(n, 0)) |
90 | } |
91 | |
92 | function select(peers, ts, filter, opts) { |
93 | if(opts.disable) return [] |
94 | //opts: { quota, groupMin, min, factor, max } |
95 | var type = peers.filter(filter) |
96 | var unconnect = type.filter(not(isConnect)) |
97 | var count = Math.max(opts.quota - type.filter(isConnect).length, 0) |
98 | var min = unconnect.reduce(maxStateChange, 0) + opts.groupMin |
99 | if(ts < min) return [] |
100 | |
101 | return earliest(unconnect.filter(function (peer) { |
102 | return peerNext(peer, opts) < ts |
103 | }), count) |
104 | } |
105 | |
106 | var schedule = exports = module.exports = |
107 | function (gossip, config, server) { |
108 | var min = 60e3, hour = 60*60e3, closed = false |
109 | |
110 | //trigger hard reconnect after suspend or local network changes |
111 | onWakeup(gossip.reconnect) |
112 | onNetwork(gossip.reconnect) |
113 | |
114 | function conf(name, def) { |
115 | if(config.gossip == null) return def |
116 | var value = config.gossip[name] |
117 | return (value == null || value === '') ? def : value |
118 | } |
119 | |
120 | function connect (peers, ts, name, filter, opts) { |
121 | opts.group = name |
122 | var connected = peers.filter(isConnect).filter(filter) |
123 | |
124 | //disconnect if over quota |
125 | if(connected.length > opts.quota) { |
126 | return earliest(connected, connected.length - opts.quota) |
127 | .forEach(function (peer) { |
128 | gossip.disconnect(peer) |
129 | }) |
130 | } |
131 | |
132 | //will return [] if the quota is full |
133 | var selected = select(peers, ts, and(filter, isOnline), opts) |
134 | selected |
135 | .forEach(function (peer) { |
136 | gossip.connect(peer) |
137 | }) |
138 | } |
139 | |
140 | var lastMessageAt |
141 | server.post(function (data) { |
142 | if(data.value.author != server.id) lastMessageAt = Date.now() |
143 | }) |
144 | |
145 | function isCurrentlyDownloading () { |
146 | // don't schedule gossip if currently downloading messages |
147 | if (lastMessageAt && lastMessageAt > Date.now() - 500) { |
148 | return true |
149 | } |
150 | } |
151 | |
152 | var connecting = false |
153 | function connections () { |
154 | if(connecting || closed) return |
155 | connecting = true |
156 | var timer = setTimeout(function () { |
157 | connecting = false |
158 | |
159 | // don't attempt to connect while migration is running |
160 | if (!server.ready() || isCurrentlyDownloading()) return |
161 | |
162 | var ts = Date.now() |
163 | var peers = gossip.peers() |
164 | |
165 | var connected = peers.filter(and(isConnect, not(isLocal), not(isFriend))).length |
166 | |
167 | var connectedFriends = peers.filter(and(isConnect, isFriend)).length |
168 | |
169 | if(conf('friends', true)) |
170 | connect(peers, ts, 'friends', isFriend, { |
171 | quota: 3, factor: 2e3, max: 10*min, groupMin: 1e3, |
172 | }) |
173 | |
174 | if(conf('seed', true)) |
175 | connect(peers, ts, 'seeds', isSeed, { |
176 | quota: 3, factor: 2e3, max: 10*min, groupMin: 1e3, |
177 | }) |
178 | |
179 | if(conf('local', true)) |
180 | connect(peers, ts, 'local', isLocal, { |
181 | quota: 3, factor: 2e3, max: 10*min, groupMin: 1e3, |
182 | }) |
183 | |
184 | if(conf('global', true)) { |
185 | // prioritize friends |
186 | connect(peers, ts, 'friends', and(exports.isFriend, exports.isLongterm), { |
187 | quota: 2, factor: 10e3, max: 10*min, groupMin: 5e3, |
188 | }) |
189 | |
190 | if (connectedFriends < 2) |
191 | connect(peers, ts, 'attemptFriend', and(exports.isFriend, exports.isUnattempted), { |
192 | min: 0, quota: 1, factor: 0, max: 0, groupMin: 0, |
193 | }) |
194 | |
195 | connect(peers, ts, 'retryFriends', and(exports.isFriend, exports.isInactive), { |
196 | min: 0, |
197 | quota: 3, factor: 60e3, max: 3*60*60e3, groupMin: 5*60e3, |
198 | }) |
199 | |
200 | // standard longterm peers |
201 | connect(peers, ts, 'longterm', and( |
202 | exports.isLongterm, |
203 | not(exports.isFriend), |
204 | not(exports.isLocal) |
205 | ), { |
206 | quota: 2, factor: 10e3, max: 10*min, groupMin: 5e3, |
207 | }) |
208 | |
209 | if(!connected) |
210 | connect(peers, ts, 'attempt', exports.isUnattempted, { |
211 | min: 0, quota: 1, factor: 0, max: 0, groupMin: 0, |
212 | }) |
213 | |
214 | //quota, groupMin, min, factor, max |
215 | connect(peers, ts, 'retry', exports.isInactive, { |
216 | min: 0, |
217 | quota: 3, factor: 5*60e3, max: 3*60*60e3, groupMin: 5*50e3, |
218 | }) |
219 | |
220 | var longterm = peers.filter(isConnect).filter(isLongterm).length |
221 | |
222 | connect(peers, ts, 'legacy', exports.isLegacy, { |
223 | quota: 3 - longterm, |
224 | factor: 5*min, max: 3*hour, groupMin: 5*min, |
225 | }) |
226 | } |
227 | |
228 | peers.filter(isConnect).forEach(function (e) { |
229 | var permanent = exports.isLongterm(e) || exports.isLocal(e) |
230 | if((!permanent || e.state === 'connecting') && e.stateChange + 10e3 < ts) { |
231 | gossip.disconnect(e) |
232 | } |
233 | }) |
234 | |
235 | }, 100*Math.random()) |
236 | if(timer.unref) timer.unref() |
237 | } |
238 | |
239 | pull( |
240 | gossip.changes(), |
241 | pull.drain(function (ev) { |
242 | if(ev.type == 'disconnect') |
243 | connections() |
244 | }, function () { |
245 | console.warn('[gossip/dc] warning: this can happen if the database closes', arguments) |
246 | }) |
247 | ) |
248 | |
249 | var int = setInterval(connections, 2e3) |
250 | if(int.unref) int.unref() |
251 | |
252 | connections() |
253 | |
254 | return function onClose () { |
255 | closed = true |
256 | } |
257 | |
258 | } |
259 | |
260 | exports.isUnattempted = isUnattempted |
261 | exports.isInactive = isInactive |
262 | exports.isLongterm = isLongterm |
263 | exports.isLegacy = isLegacy |
264 | exports.isLocal = isLocal |
265 | exports.isFriend = isFriend |
266 | exports.isConnectedOrConnecting = isConnect |
267 | exports.select = select |
268 |
Built with git-ssb-web