git ssb

1+

punkmonk.termux / mvd



forked from ev / mvd

Tree: 5285e716749369d47e938077f6d87389eb282ba9

Files: 5285e716749369d47e938077f6d87389eb282ba9 / plugins / gossip / schedule.js

7551 bytesRaw
1'use strict'
2var ip = require('ip')
3var onWakeup = require('on-wakeup')
4var onNetwork = require('on-change-network')
5var hasNetwork = require('../../lib/has-network-debounced')
6
7var pull = require('pull-stream')
8
9function not (fn) {
10 return function (e) { return !fn(e) }
11}
12
13function and () {
14 var args = [].slice.call(arguments)
15 return function (value) {
16 return args.every(function (fn) { return fn.call(null, value) })
17 }
18}
19
20//min delay (delay since last disconnect of most recent peer in unconnected set)
21//unconnected filter delay peer < min delay
22function delay (failures, factor, max) {
23 return Math.min(Math.pow(2, failures)*factor, max || Infinity)
24}
25
26function maxStateChange (M, e) {
27 return Math.max(M, e.stateChange || 0)
28}
29
30function peerNext(peer, opts) {
31 return (peer.stateChange|0) + delay(peer.failure|0, opts.factor, opts.max)
32}
33
34
35//detect if not connected to wifi or other network
36//(i.e. if there is only localhost)
37
38function isOffline (e) {
39 if(ip.isLoopback(e.host) || e.host == 'localhost') return false
40 return !hasNetwork()
41}
42
43var isOnline = not(isOffline)
44
45function isLocal (e) {
46 // don't rely on private ip address, because
47 // cjdns creates fake private ip addresses.
48 // ignore localhost addresses, because sometimes they get broadcast.
49 return !ip.isLoopback(e.host) && ip.isPrivate(e.host) && e.source === 'local'
50}
51
52function isSeed (e) {
53 return e.source === 'seed'
54}
55
56function isFriend (e) {
57 return e.source === 'friends'
58}
59
60function isUnattempted (e) {
61 return !e.stateChange
62}
63
64//select peers which have never been successfully connected to yet,
65//but have been tried.
66function isInactive (e) {
67 return e.stateChange && (!e.duration || e.duration.mean == 0)
68}
69
70function isLongterm (e) {
71 return e.ping && e.ping.rtt && e.ping.rtt.mean > 0
72}
73
74//peers which we can connect to, but are not upgraded.
75//select peers which we can connect to, but are not upgraded to LT.
76//assume any peer is legacy, until we know otherwise...
77function isLegacy (peer) {
78 return peer.duration && (peer.duration && peer.duration.mean > 0) && !exports.isLongterm(peer)
79}
80
81function isConnect (e) {
82 return 'connected' === e.state || 'connecting' === e.state
83}
84
85//sort oldest to newest then take first n
86function earliest(peers, n) {
87 return peers.sort(function (a, b) {
88 return a.stateChange - b.stateChange
89 }).slice(0, Math.max(n, 0))
90}
91
92function select(peers, ts, filter, opts) {
93 if(opts.disable) return []
94 //opts: { quota, groupMin, min, factor, max }
95 var type = peers.filter(filter)
96 var unconnect = type.filter(not(isConnect))
97 var count = Math.max(opts.quota - type.filter(isConnect).length, 0)
98 var min = unconnect.reduce(maxStateChange, 0) + opts.groupMin
99 if(ts < min) return []
100
101 return earliest(unconnect.filter(function (peer) {
102 return peerNext(peer, opts) < ts
103 }), count)
104}
105
106var schedule = exports = module.exports =
107function (gossip, config, server) {
108 var min = 60e3, hour = 60*60e3, closed = false
109
110 //trigger hard reconnect after suspend or local network changes
111 onWakeup(gossip.reconnect)
112 onNetwork(gossip.reconnect)
113
114 function conf(name, def) {
115 if(config.gossip == null) return def
116 var value = config.gossip[name]
117 return (value == null || value === '') ? def : value
118 }
119
120 function connect (peers, ts, name, filter, opts) {
121 opts.group = name
122 var connected = peers.filter(isConnect).filter(filter)
123
124 //disconnect if over quota
125 if(connected.length > opts.quota) {
126 return earliest(connected, connected.length - opts.quota)
127 .forEach(function (peer) {
128 gossip.disconnect(peer)
129 })
130 }
131
132 //will return [] if the quota is full
133 var selected = select(peers, ts, and(filter, isOnline), opts)
134 selected
135 .forEach(function (peer) {
136 gossip.connect(peer)
137 })
138 }
139
140 var lastMessageAt
141 server.post(function (data) {
142 if(data.value.author != server.id) lastMessageAt = Date.now()
143 })
144
145 function isCurrentlyDownloading () {
146 // don't schedule gossip if currently downloading messages
147 if (lastMessageAt && lastMessageAt > Date.now() - 500) {
148 return true
149 }
150 }
151
152 var connecting = false
153 function connections () {
154 if(connecting || closed) return
155 connecting = true
156 var timer = setTimeout(function () {
157 connecting = false
158
159 // don't attempt to connect while migration is running
160 if (!server.ready() || isCurrentlyDownloading()) return
161
162 var ts = Date.now()
163 var peers = gossip.peers()
164
165 var connected = peers.filter(and(isConnect, not(isLocal), not(isFriend))).length
166
167 var connectedFriends = peers.filter(and(isConnect, isFriend)).length
168
169 if(conf('friends', true))
170 connect(peers, ts, 'friends', isFriend, {
171 quota: 3, factor: 2e3, max: 10*min, groupMin: 1e3,
172 })
173
174 if(conf('seed', true))
175 connect(peers, ts, 'seeds', isSeed, {
176 quota: 3, factor: 2e3, max: 10*min, groupMin: 1e3,
177 })
178
179 if(conf('local', true))
180 connect(peers, ts, 'local', isLocal, {
181 quota: 3, factor: 2e3, max: 10*min, groupMin: 1e3,
182 })
183
184 if(conf('global', true)) {
185 // prioritize friends
186 connect(peers, ts, 'friends', and(exports.isFriend, exports.isLongterm), {
187 quota: 2, factor: 10e3, max: 10*min, groupMin: 5e3,
188 })
189
190 if (connectedFriends < 2)
191 connect(peers, ts, 'attemptFriend', and(exports.isFriend, exports.isUnattempted), {
192 min: 0, quota: 1, factor: 0, max: 0, groupMin: 0,
193 })
194
195 connect(peers, ts, 'retryFriends', and(exports.isFriend, exports.isInactive), {
196 min: 0,
197 quota: 3, factor: 60e3, max: 3*60*60e3, groupMin: 5*60e3,
198 })
199
200 // standard longterm peers
201 connect(peers, ts, 'longterm', and(
202 exports.isLongterm,
203 not(exports.isFriend),
204 not(exports.isLocal)
205 ), {
206 quota: 2, factor: 10e3, max: 10*min, groupMin: 5e3,
207 })
208
209 if(!connected)
210 connect(peers, ts, 'attempt', exports.isUnattempted, {
211 min: 0, quota: 1, factor: 0, max: 0, groupMin: 0,
212 })
213
214 //quota, groupMin, min, factor, max
215 connect(peers, ts, 'retry', exports.isInactive, {
216 min: 0,
217 quota: 3, factor: 5*60e3, max: 3*60*60e3, groupMin: 5*50e3,
218 })
219
220 var longterm = peers.filter(isConnect).filter(isLongterm).length
221
222 connect(peers, ts, 'legacy', exports.isLegacy, {
223 quota: 3 - longterm,
224 factor: 5*min, max: 3*hour, groupMin: 5*min,
225 })
226 }
227
228 peers.filter(isConnect).forEach(function (e) {
229 var permanent = exports.isLongterm(e) || exports.isLocal(e)
230 if((!permanent || e.state === 'connecting') && e.stateChange + 10e3 < ts) {
231 gossip.disconnect(e)
232 }
233 })
234
235 }, 100*Math.random())
236 if(timer.unref) timer.unref()
237 }
238
239 pull(
240 gossip.changes(),
241 pull.drain(function (ev) {
242 if(ev.type == 'disconnect')
243 connections()
244 }, function () {
245 console.warn('[gossip/dc] warning: this can happen if the database closes', arguments)
246 })
247 )
248
249 var int = setInterval(connections, 2e3)
250 if(int.unref) int.unref()
251
252 connections()
253
254 return function onClose () {
255 closed = true
256 }
257
258}
259
260exports.isUnattempted = isUnattempted
261exports.isInactive = isInactive
262exports.isLongterm = isLongterm
263exports.isLegacy = isLegacy
264exports.isLocal = isLocal
265exports.isFriend = isFriend
266exports.isConnectedOrConnecting = isConnect
267exports.select = select
268

Built with git-ssb-web