Files: d2b5693b01842e3376dae3c3cf5c579e03e490c4 / plugins / gossip / index.js
12228 bytesRaw
1 | |
2 | var pull = require('pull-stream') |
3 | var Notify = require('pull-notify') |
4 | var mdm = require('mdmanifest') |
5 | var valid = require('../../lib/validators') |
6 | var apidoc = require('../../lib/apidocs').gossip |
7 | var u = require('../../lib/util') |
8 | var ref = require('ssb-ref') |
9 | var ping = require('pull-ping') |
10 | var stats = require('statistics') |
11 | var Schedule = require('./schedule') |
12 | var Init = require('./init') |
13 | var AtomicFile = require('atomic-file') |
14 | var fs = require('fs') |
15 | var path = require('path') |
16 | var deepEqual = require('deep-equal') |
17 | |
18 | function isFunction (f) { |
19 | return 'function' === typeof f |
20 | } |
21 | |
22 | function stringify(peer) { |
23 | return [peer.host, peer.port, peer.key].join(':') |
24 | } |
25 | |
26 | function isObject (o) { |
27 | return o && 'object' == typeof o |
28 | } |
29 | |
30 | function toBase64 (s) { |
31 | if(isString(s)) return s.substring(1, s.indexOf('.')) |
32 | else s.toString('base64') //assume a buffer |
33 | } |
34 | |
35 | function isString (s) { |
36 | return 'string' == typeof s |
37 | } |
38 | |
39 | function coearseAddress (address) { |
40 | if(isObject(address)) { |
41 | if(ref.isAddress(address.address)) |
42 | return address.address |
43 | var protocol = 'net' |
44 | if (address.host && address.host.endsWith(".onion")) |
45 | protocol = 'onion' |
46 | return [protocol, address.host, address.port].join(':') +'~'+['shs', toBase64(address.key)].join(':') |
47 | } |
48 | return address |
49 | } |
50 | |
51 | /* |
52 | Peers : [{ |
53 | //modern: |
54 | address: <multiserver address>, |
55 | |
56 | |
57 | //legacy |
58 | key: id, |
59 | host: ip, |
60 | port: int, |
61 | |
62 | //to be backwards compatible with patchwork... |
63 | announcers: {length: int} |
64 | //TODO: availability |
65 | //availability: 0-1, //online probability estimate |
66 | |
67 | //where this peer was added from. TODO: remove "pub" peers. |
68 | source: 'pub'|'manual'|'local' |
69 | }] |
70 | */ |
71 | |
72 | |
73 | module.exports = { |
74 | name: 'gossip', |
75 | version: '1.0.0', |
76 | manifest: mdm.manifest(apidoc), |
77 | permissions: { |
78 | anonymous: {allow: ['ping']} |
79 | }, |
80 | init: function (server, config) { |
81 | var notify = Notify() |
82 | var closed = false, closeScheduler |
83 | var conf = config.gossip || {} |
84 | |
85 | var gossipJsonPath = path.join(config.path, 'gossip.json') |
86 | var stateFile = AtomicFile(gossipJsonPath) |
87 | |
88 | var status = {} |
89 | |
90 | //Known Peers |
91 | var peers = [] |
92 | |
93 | function getPeer(id) { |
94 | return u.find(peers, function (e) { |
95 | return e && e.key === id |
96 | }) |
97 | } |
98 | |
99 | function simplify (peer) { |
100 | return { |
101 | address: peer.address || coearseAddress(peer), |
102 | source: peer.source, |
103 | state: peer.state, stateChange: peer.stateChange, |
104 | failure: peer.failure, |
105 | client: peer.client, |
106 | stats: { |
107 | duration: peer.duration || undefined, |
108 | rtt: peer.ping ? peer.ping.rtt : undefined, |
109 | skew: peer.ping ? peer.ping.skew : undefined, |
110 | } |
111 | } |
112 | } |
113 | |
114 | server.status.hook(function (fn) { |
115 | var _status = fn() |
116 | _status.gossip = status |
117 | peers.forEach(function (peer) { |
118 | if(peer.stateChange + 3e3 > Date.now() || peer.state === 'connected') |
119 | status[peer.key] = simplify(peer) |
120 | }) |
121 | return _status |
122 | |
123 | }) |
124 | |
125 | server.close.hook(function (fn, args) { |
126 | closed = true |
127 | closeScheduler() |
128 | for(var id in server.peers) |
129 | server.peers[id].forEach(function (peer) { |
130 | peer.close(true) |
131 | }) |
132 | return fn.apply(this, args) |
133 | }) |
134 | |
135 | var timer_ping = 5*6e4 |
136 | |
137 | function setConfig(name, value) { |
138 | config.gossip = config.gossip || {} |
139 | config.gossip[name] = value |
140 | |
141 | var cfgPath = path.join(config.path, 'config') |
142 | var existingConfig = {} |
143 | |
144 | // load ~/.ssb/config |
145 | try { existingConfig = JSON.parse(fs.readFileSync(cfgPath, 'utf-8')) } |
146 | catch (e) {} |
147 | |
148 | // update the plugins config |
149 | existingConfig.gossip = existingConfig.gossip || {} |
150 | existingConfig.gossip[name] = value |
151 | |
152 | // write to disc |
153 | fs.writeFileSync(cfgPath, JSON.stringify(existingConfig, null, 2), 'utf-8') |
154 | } |
155 | |
156 | var gossip = { |
157 | wakeup: 0, |
158 | peers: function () { |
159 | return peers |
160 | }, |
161 | get: function (addr) { |
162 | //addr = ref.parseAddress(addr) |
163 | if(ref.isFeed(addr)) return getPeer(addr) |
164 | else if(ref.isFeed(addr.key)) return getPeer(addr.key) |
165 | else throw new Error('must provide id:'+JSON.stringify(addr)) |
166 | // return u.find(peers, function (a) { |
167 | // return ( |
168 | // addr.port === a.port |
169 | // && addr.host === a.host |
170 | // && addr.key === a.key |
171 | // ) |
172 | // }) |
173 | }, |
174 | connect: valid.async(function (addr, cb) { |
175 | if(ref.isFeed(addr)) |
176 | addr = gossip.get(addr) |
177 | server.emit('log:info', ['ssb-server', stringify(addr), 'CONNECTING']) |
178 | if(!ref.isAddress(addr.address)) |
179 | addr = ref.parseAddress(addr) |
180 | if (!addr || typeof addr != 'object') |
181 | return cb(new Error('first param must be an address')) |
182 | |
183 | if(!addr.address) |
184 | if(!addr.key) return cb(new Error('address must have ed25519 key')) |
185 | // add peer to the table, incase it isn't already. |
186 | gossip.add(addr, 'manual') |
187 | var p = gossip.get(addr) |
188 | if(!p) return cb() |
189 | |
190 | p.stateChange = Date.now() |
191 | p.state = 'connecting' |
192 | server.connect(p.address, function (err, rpc) { |
193 | if (err) { |
194 | p.error = err.stack |
195 | p.state = undefined |
196 | p.failure = (p.failure || 0) + 1 |
197 | p.stateChange = Date.now() |
198 | notify({ type: 'connect-failure', peer: p }) |
199 | server.emit('log:info', ['ssb-server', stringify(p), 'ERR', (err.message || err)]) |
200 | p.duration = stats(p.duration, 0) |
201 | return (cb && cb(err)) |
202 | } |
203 | else { |
204 | delete p.error |
205 | p.state = 'connected' |
206 | p.failure = 0 |
207 | } |
208 | cb && cb(null, rpc) |
209 | }) |
210 | |
211 | }, 'string|object'), |
212 | |
213 | disconnect: valid.async(function (addr, cb) { |
214 | var peer = gossip.get(addr) |
215 | |
216 | peer.state = 'disconnecting' |
217 | peer.stateChange = Date.now() |
218 | if(!peer || !peer.disconnect) cb && cb() |
219 | else peer.disconnect(true, function (err) { |
220 | peer.stateChange = Date.now() |
221 | cb && cb() |
222 | }) |
223 | |
224 | }, 'string|object'), |
225 | |
226 | changes: function () { |
227 | return notify.listen() |
228 | }, |
229 | //add an address to the peer table. |
230 | add: valid.sync(function (addr, source) { |
231 | |
232 | if(isObject(addr)) { |
233 | addr.address = coearseAddress(addr) |
234 | } |
235 | else { |
236 | var _addr = ref.parseAddress(addr) |
237 | if(!_addr) throw new Error('not a valid address:'+addr) |
238 | _addr.address = addr |
239 | addr = _addr |
240 | } |
241 | if(!ref.isAddress(addr.address) /*&& !ref.isAddress(addr)*/) |
242 | throw new Error('not a valid address:' + JSON.stringify(addr)) |
243 | // check that this is a valid address, and not pointing at self. |
244 | |
245 | if(addr.key === server.id) return |
246 | |
247 | var f = gossip.get(addr) |
248 | |
249 | if(!f) { |
250 | // new peer |
251 | addr.source = source |
252 | addr.announcers = 1 |
253 | addr.duration = addr.duration || null |
254 | peers.push(addr) |
255 | notify({ type: 'discover', peer: addr, source: source || 'manual' }) |
256 | return addr |
257 | } else if (source === 'friends' || source === 'local') { |
258 | // this peer is a friend or local, override old source to prioritize gossip |
259 | f.source = source |
260 | } |
261 | //don't count local over and over |
262 | else if(f.source != 'local') |
263 | f.announcers ++ |
264 | |
265 | return f |
266 | }, 'string|object', 'string?'), |
267 | remove: function (addr) { |
268 | var peer = gossip.get(addr) |
269 | var index = peers.indexOf(peer) |
270 | if (~index) { |
271 | peers.splice(index, 1) |
272 | notify({ type: 'remove', peer: peer }) |
273 | } |
274 | }, |
275 | ping: function (opts) { |
276 | var timeout = config.timers && config.timers.ping || 5*60e3 |
277 | //between 10 seconds and 30 minutes, default 5 min |
278 | timeout = Math.max(10e3, Math.min(timeout, 30*60e3)) |
279 | return ping({timeout: timeout}) |
280 | }, |
281 | reconnect: function () { |
282 | for(var id in server.peers) |
283 | if(id !== server.id) //don't disconnect local client |
284 | server.peers[id].forEach(function (peer) { |
285 | peer.close(true) |
286 | }) |
287 | return gossip.wakeup = Date.now() |
288 | }, |
289 | enable: valid.sync(function (type) { |
290 | type = type || 'global' |
291 | setConfig(type, true) |
292 | if(type === 'local' && server.local && server.local.init) |
293 | server.local.init() |
294 | return 'enabled gossip type ' + type |
295 | }, 'string?'), |
296 | disable: valid.sync(function (type) { |
297 | type = type || 'global' |
298 | setConfig(type, false) |
299 | return 'disabled gossip type ' + type |
300 | }, 'string?') |
301 | } |
302 | |
303 | closeScheduler = Schedule (gossip, config, server) |
304 | Init (gossip, config, server) |
305 | //get current state |
306 | |
307 | server.on('rpc:connect', function (rpc, isClient) { |
308 | |
309 | // if we're not ready, close this connection immediately |
310 | if (!server.ready() && rpc.id !== server.id) return rpc.close() |
311 | |
312 | var peer = getPeer(rpc.id) |
313 | //don't track clients that connect, but arn't considered peers. |
314 | //maybe we should though? |
315 | if(!peer) { |
316 | if(rpc.id !== server.id) { |
317 | server.emit('log:info', ['ssb-server', rpc.id, 'Connected']) |
318 | rpc.on('closed', function () { |
319 | server.emit('log:info', ['ssb-server', rpc.id, 'Disconnected']) |
320 | }) |
321 | } |
322 | return |
323 | } |
324 | |
325 | status[rpc.id] = simplify(peer) |
326 | |
327 | server.emit('log:info', ['ssb-server', stringify(peer), 'PEER JOINED']) |
328 | //means that we have created this connection, not received it. |
329 | peer.client = !!isClient |
330 | peer.state = 'connected' |
331 | peer.stateChange = Date.now() |
332 | peer.disconnect = function (err, cb) { |
333 | if(isFunction(err)) cb = err, err = null |
334 | rpc.close(err, cb) |
335 | } |
336 | |
337 | if(isClient) { |
338 | //default ping is 5 minutes... |
339 | var pp = ping({serve: true, timeout: timer_ping}, function (_) {}) |
340 | peer.ping = {rtt: pp.rtt, skew: pp.skew} |
341 | pull( |
342 | pp, |
343 | rpc.gossip.ping({timeout: timer_ping}, function (err) { |
344 | if(err.name === 'TypeError') peer.ping.fail = true |
345 | }), |
346 | pp |
347 | ) |
348 | } |
349 | |
350 | rpc.on('closed', function () { |
351 | delete status[rpc.id] |
352 | server.emit('log:info', ['ssb-server', stringify(peer), |
353 | ['DISCONNECTED. state was', peer.state, 'for', |
354 | (new Date() - peer.stateChange)/1000, 'seconds'].join(' ')]) |
355 | //track whether we have successfully connected. |
356 | //or how many failures there have been. |
357 | var since = peer.stateChange |
358 | peer.stateChange = Date.now() |
359 | // if(peer.state === 'connected') //may be "disconnecting" |
360 | peer.duration = stats(peer.duration, peer.stateChange - since) |
361 | peer.state = undefined |
362 | notify({ type: 'disconnect', peer: peer }) |
363 | }) |
364 | |
365 | notify({ type: 'connect', peer: peer }) |
366 | }) |
367 | |
368 | var last |
369 | stateFile.get(function (err, ary) { |
370 | last = ary || [] |
371 | if(Array.isArray(ary)) |
372 | ary.forEach(function (v) { |
373 | delete v.state |
374 | // don't add local peers (wait to rediscover) |
375 | // adding peers back this way means old format gossip.json |
376 | // will be updated to having proper address values. |
377 | if(v.source !== 'local') { |
378 | gossip.add(v, 'stored') |
379 | } |
380 | }) |
381 | }) |
382 | |
383 | var int = setInterval(function () { |
384 | var copy = peers.filter(function (e) { |
385 | return e.source !== 'local' |
386 | }).map(function (e) { |
387 | var o = {} |
388 | for(var k in e) { |
389 | if(k !== 'state') o[k] = e[k] |
390 | } |
391 | |
392 | //try to ensure that the peer always has host and port |
393 | //so that the output file is understood by previous versions |
394 | //of scuttlebutt. |
395 | if(!o.host || !o.port) { |
396 | var _addr = ref.parseAddress(e.address) |
397 | o.host = _addr.host |
398 | o.port = _addr.port |
399 | } |
400 | |
401 | |
402 | return o |
403 | }) |
404 | if(deepEqual(copy, last)) return |
405 | last = copy |
406 | stateFile.set(copy, function(err) { |
407 | if (err) console.log(err) |
408 | }) |
409 | }, 10*1000) |
410 | |
411 | if(int.unref) int.unref() |
412 | |
413 | return gossip |
414 | } |
415 | } |
416 | |
417 | |
418 |
Built with git-ssb-web