Files: c5a7c1d8b9fae990ed1c54fd2de267eccdbdc14a / plugins / gossip / index.js
12460 bytesRaw
1 | |
2 | var pull = require('pull-stream') |
3 | var Notify = require('pull-notify') |
4 | var mdm = require('mdmanifest') |
5 | var valid = require('../../lib/validators') |
6 | var apidoc = require('../../lib/apidocs').gossip |
7 | var u = require('../../lib/util') |
8 | var ref = require('ssb-ref') |
9 | var ping = require('pull-ping') |
10 | var stats = require('statistics') |
11 | var Schedule = require('./schedule') |
12 | var Init = require('./init') |
13 | var AtomicFile = require('atomic-file') |
14 | var fs = require('fs') |
15 | var path = require('path') |
16 | var deepEqual = require('deep-equal') |
17 | |
18 | function isFunction (f) { |
19 | return 'function' === typeof f |
20 | } |
21 | |
22 | function stringify(peer) { |
23 | return [peer.host, peer.port, peer.key].join(':') |
24 | } |
25 | |
26 | function isObject (o) { |
27 | return o && 'object' == typeof o |
28 | } |
29 | |
30 | function toBase64 (s) { |
31 | if(isString(s)) return s |
32 | else s.toString('base64') //assume a buffer |
33 | } |
34 | |
35 | function isString (s) { |
36 | return 'string' == typeof s |
37 | } |
38 | |
39 | function coearseAddress (address) { |
40 | if(isObject(address)) { |
41 | var protocol = 'net' |
42 | if (address.host.endsWith(".onion")) |
43 | protocol = 'onion' |
44 | return [protocol, address.host, address.port].join(':') +'~'+['shs', toBase64(address.key)].join(':') |
45 | } |
46 | return address |
47 | } |
48 | |
49 | /* |
50 | Peers : [{ |
51 | key: id, |
52 | host: ip, |
53 | port: int, |
54 | //to be backwards compatible with patchwork... |
55 | announcers: {length: int} |
56 | source: 'pub'|'manual'|'local' |
57 | }] |
58 | */ |
59 | |
60 | |
61 | module.exports = { |
62 | name: 'gossip', |
63 | version: '1.0.0', |
64 | manifest: mdm.manifest(apidoc), |
65 | permissions: { |
66 | anonymous: {allow: ['ping']} |
67 | }, |
68 | init: function (server, config) { |
69 | var notify = Notify() |
70 | var closed = false, closeScheduler |
71 | var conf = config.gossip || {} |
72 | |
73 | var gossipJsonPath = path.join(config.path, 'gossip.json') |
74 | var stateFile = AtomicFile(gossipJsonPath) |
75 | stateFile.get(function (err, ary) { |
76 | var peers = ary || [] |
77 | server.emit('log:info', ['SBOT', ''+peers.length+' peers loaded from', gossipJsonPath]) |
78 | }) |
79 | |
80 | var status = {} |
81 | |
82 | //Known Peers |
83 | var peers = [] |
84 | |
85 | function getPeer(id) { |
86 | return u.find(peers, function (e) { |
87 | return e && e.key === id |
88 | }) |
89 | } |
90 | |
91 | function simplify (peer) { |
92 | return { |
93 | address: coearseAddress(peer), |
94 | source: peer.source, |
95 | state: peer.state, stateChange: peer.stateChange, |
96 | failure: peer.failure, |
97 | client: peer.client, |
98 | stats: { |
99 | duration: peer.duration || undefined, |
100 | rtt: peer.ping ? peer.ping.rtt : undefined, |
101 | skew: peer.ping ? peer.ping.skew : undefined, |
102 | } |
103 | } |
104 | } |
105 | |
106 | server.status.hook(function (fn) { |
107 | var _status = fn() |
108 | _status.gossip = status |
109 | peers.forEach(function (peer) { |
110 | if(peer.stateChange + 3e3 > Date.now() || peer.state === 'connected') |
111 | status[peer.key] = simplify(peer) |
112 | }) |
113 | return _status |
114 | |
115 | }) |
116 | |
117 | server.close.hook(function (fn, args) { |
118 | closed = true |
119 | closeScheduler() |
120 | for(var id in server.peers) |
121 | server.peers[id].forEach(function (peer) { |
122 | peer.close(true) |
123 | }) |
124 | return fn.apply(this, args) |
125 | }) |
126 | |
127 | var timer_ping = 5*6e4 |
128 | |
129 | function setConfig(name, value) { |
130 | config.gossip = config.gossip || {} |
131 | config.gossip[name] = value |
132 | |
133 | var cfgPath = path.join(config.path, 'config') |
134 | var existingConfig = {} |
135 | |
136 | // load ~/.ssb/config |
137 | try { existingConfig = JSON.parse(fs.readFileSync(cfgPath, 'utf-8')) } |
138 | catch (e) {} |
139 | |
140 | // update the plugins config |
141 | existingConfig.gossip = existingConfig.gossip || {} |
142 | existingConfig.gossip[name] = value |
143 | |
144 | // write to disc |
145 | fs.writeFileSync(cfgPath, JSON.stringify(existingConfig, null, 2), 'utf-8') |
146 | } |
147 | |
148 | var gossip = { |
149 | wakeup: 0, |
150 | peers: function () { |
151 | return peers |
152 | }, |
153 | get: function (addr) { |
154 | addr = ref.parseAddress(addr) |
155 | return u.find(peers, function (a) { |
156 | return ( |
157 | addr.port === a.port |
158 | && addr.host === a.host |
159 | && addr.key === a.key |
160 | ) |
161 | }) |
162 | }, |
163 | connect: valid.async(function (addr, cb) { |
164 | server.emit('log:info', ['SBOT', stringify(addr), 'CONNECTING']) |
165 | addr = ref.parseAddress(addr) |
166 | if (!addr || typeof addr != 'object') |
167 | return cb(new Error('first param must be an address')) |
168 | |
169 | if(!addr.key) return cb(new Error('address must have ed25519 key')) |
170 | // add peer to the table, incase it isn't already. |
171 | gossip.add(addr, 'manual') |
172 | var p = gossip.get(addr) |
173 | if(!p) return cb() |
174 | |
175 | p.stateChange = Date.now() |
176 | p.state = 'connecting' |
177 | server.connect(coearseAddress(p), function (err, rpc) { |
178 | if (err) { |
179 | p.error = err.stack |
180 | p.state = undefined |
181 | p.failure = (p.failure || 0) + 1 |
182 | p.stateChange = Date.now() |
183 | notify({ type: 'connect-failure', peer: p }) |
184 | server.emit('log:info', ['SBOT', stringify(p), 'ERR', (err.message || err)]) |
185 | p.duration = stats(p.duration, 0) |
186 | return (cb && cb(err)) |
187 | } |
188 | else { |
189 | delete p.error |
190 | p.state = 'connected' |
191 | p.failure = 0 |
192 | } |
193 | cb && cb(null, rpc) |
194 | }) |
195 | |
196 | }, 'string|object'), |
197 | |
198 | disconnect: valid.async(function (addr, cb) { |
199 | var peer = this.get(addr) |
200 | |
201 | peer.state = 'disconnecting' |
202 | peer.stateChange = Date.now() |
203 | if(!peer || !peer.disconnect) cb && cb() |
204 | else peer.disconnect(true, function (err) { |
205 | peer.stateChange = Date.now() |
206 | cb && cb() |
207 | }) |
208 | |
209 | }, 'string|object'), |
210 | |
211 | changes: function () { |
212 | return notify.listen() |
213 | }, |
214 | //add an address to the peer table. |
215 | add: valid.sync(function (addr, source) { |
216 | |
217 | addr = ref.parseAddress(addr) |
218 | if(!ref.isAddress(addr)) |
219 | throw new Error('not a valid address:' + JSON.stringify(addr)) |
220 | // check that this is a valid address, and not pointing at self. |
221 | |
222 | if(addr.key === server.id) return |
223 | |
224 | var f = gossip.get(addr) |
225 | |
226 | if(!f) { |
227 | // new peer |
228 | addr.source = source |
229 | addr.announcers = 1 |
230 | addr.duration = addr.duration || null |
231 | peers.push(addr) |
232 | notify({ type: 'discover', peer: addr, source: source || 'manual' }) |
233 | return addr |
234 | } else if (source === 'friends' || source === 'local') { |
235 | // this peer is a friend or local, override old source to prioritize gossip |
236 | f.source = source |
237 | } |
238 | //don't count local over and over |
239 | else if(f.source != 'local') |
240 | f.announcers ++ |
241 | |
242 | return f |
243 | }, 'string|object', 'string?'), |
244 | remove: function (addr) { |
245 | var peer = gossip.get(addr) |
246 | var index = peers.indexOf(peer) |
247 | if (~index) { |
248 | peers.splice(index, 1) |
249 | notify({ type: 'remove', peer: peer }) |
250 | } |
251 | }, |
252 | ping: function (opts) { |
253 | var timeout = config.timers && config.timers.ping || 5*60e3 |
254 | //between 10 seconds and 30 minutes, default 5 min |
255 | timeout = Math.max(10e3, Math.min(timeout, 30*60e3)) |
256 | return ping({timeout: timeout}) |
257 | }, |
258 | pingPeers: function () { |
259 | var drains = {} |
260 | var queue = [] |
261 | var waitingCb |
262 | for(var id in server.peers) { |
263 | if(id !== server.id) server.peers[id].forEach(function (peer) { |
264 | var pings |
265 | try { |
266 | pings = peer.gossip.ping() |
267 | } catch(e) { |
268 | return |
269 | } |
270 | pull( |
271 | pull.once(true), |
272 | pings, |
273 | pull.take(1), |
274 | drains[peer.id] = pull.collect(function (err, values) { |
275 | delete drains[peer.id] |
276 | var cb = waitingCb |
277 | if (err) { |
278 | if (cb && Object.keys(drains).length === 0) { |
279 | waitingCb = null |
280 | cb(true) |
281 | } |
282 | return |
283 | } |
284 | var val = {peer: peer.id, value: values[0]} |
285 | if (cb) { |
286 | waitingCb = null |
287 | cb(null, val) |
288 | } else { |
289 | queue.push(val) |
290 | } |
291 | }) |
292 | ) |
293 | }) |
294 | } |
295 | return function (abort, cb) { |
296 | if(abort) { |
297 | for(var id in drains) { |
298 | drains[id].abort(abort) |
299 | } |
300 | return cb(abort) |
301 | } |
302 | if (queue.length > 0) { |
303 | cb(null, queue.shift()) |
304 | } else if (Object.keys(drains).length === 0) { |
305 | cb(true) |
306 | } else { |
307 | waitingCb = cb |
308 | } |
309 | } |
310 | }, |
311 | reconnect: function () { |
312 | for(var id in server.peers) |
313 | if(id !== server.id) //don't disconnect local client |
314 | server.peers[id].forEach(function (peer) { |
315 | peer.close(true) |
316 | }) |
317 | return gossip.wakeup = Date.now() |
318 | }, |
319 | enable: valid.sync(function (type) { |
320 | type = type || 'global' |
321 | setConfig(type, true) |
322 | if(type === 'local' && server.local && server.local.init) |
323 | server.local.init() |
324 | return 'enabled gossip type ' + type |
325 | }, 'string?'), |
326 | disable: valid.sync(function (type) { |
327 | type = type || 'global' |
328 | setConfig(type, false) |
329 | return 'disabled gossip type ' + type |
330 | }, 'string?') |
331 | } |
332 | |
333 | closeScheduler = Schedule (gossip, config, server) |
334 | Init (gossip, config, server) |
335 | //get current state |
336 | |
337 | server.on('rpc:connect', function (rpc, isClient) { |
338 | |
339 | // if we're not ready, close this connection immediately |
340 | if (!server.ready() && rpc.id !== server.id) return rpc.close() |
341 | |
342 | var peer = getPeer(rpc.id) |
343 | //don't track clients that connect, but arn't considered peers. |
344 | //maybe we should though? |
345 | if(!peer) { |
346 | if(rpc.id !== server.id) { |
347 | server.emit('log:info', ['SBOT', rpc.id, 'Connected']) |
348 | rpc.on('closed', function () { |
349 | server.emit('log:info', ['SBOT', rpc.id, 'Disconnected']) |
350 | }) |
351 | } |
352 | return |
353 | } |
354 | |
355 | status[rpc.id] = simplify(peer) |
356 | |
357 | server.emit('log:info', ['SBOT', stringify(peer), 'PEER JOINED']) |
358 | //means that we have created this connection, not received it. |
359 | peer.client = !!isClient |
360 | peer.state = 'connected' |
361 | peer.stateChange = Date.now() |
362 | peer.disconnect = function (err, cb) { |
363 | if(isFunction(err)) cb = err, err = null |
364 | rpc.close(err, cb) |
365 | } |
366 | |
367 | if(isClient) { |
368 | //default ping is 5 minutes... |
369 | var pp = ping({serve: true, timeout: timer_ping}, function (_) {}) |
370 | peer.ping = {rtt: pp.rtt, skew: pp.skew} |
371 | pull( |
372 | pp, |
373 | rpc.gossip.ping({timeout: timer_ping}, function (err) { |
374 | if(err.name === 'TypeError') peer.ping.fail = true |
375 | }), |
376 | pp |
377 | ) |
378 | } |
379 | |
380 | rpc.on('closed', function () { |
381 | delete status[rpc.id] |
382 | server.emit('log:info', ['SBOT', stringify(peer), |
383 | ['DISCONNECTED. state was', peer.state, 'for', |
384 | (new Date() - peer.stateChange)/1000, 'seconds'].join(' ')]) |
385 | //track whether we have successfully connected. |
386 | //or how many failures there have been. |
387 | var since = peer.stateChange |
388 | peer.stateChange = Date.now() |
389 | // if(peer.state === 'connected') //may be "disconnecting" |
390 | peer.duration = stats(peer.duration, peer.stateChange - since) |
391 | peer.state = undefined |
392 | notify({ type: 'disconnect', peer: peer }) |
393 | }) |
394 | |
395 | notify({ type: 'connect', peer: peer }) |
396 | }) |
397 | |
398 | var last |
399 | stateFile.get(function (err, ary) { |
400 | last = ary || [] |
401 | if(Array.isArray(ary)) |
402 | ary.forEach(function (v) { |
403 | delete v.state |
404 | // don't add local peers (wait to rediscover) |
405 | if(v.source !== 'local') { |
406 | gossip.add(v, 'stored') |
407 | } |
408 | }) |
409 | }) |
410 | |
411 | var int = setInterval(function () { |
412 | var copy = JSON.parse(JSON.stringify(peers)) |
413 | copy.filter(function (e) { |
414 | return e.source !== 'local' |
415 | }).forEach(function (e) { |
416 | delete e.state |
417 | }) |
418 | if(deepEqual(copy, last)) return |
419 | last = copy |
420 | stateFile.set(copy, function(err) { |
421 | if (err) console.log(err) |
422 | }) |
423 | }, 10*1000) |
424 | |
425 | if(int.unref) int.unref() |
426 | |
427 | return gossip |
428 | } |
429 | } |
430 | |
431 | |
432 | |
433 | |
434 | |
435 | |
436 |
Built with git-ssb-web