Files: e5d0b7d8bace9ec48ab12ec7a1a40872a050acad / plugins / gossip / index.js
6245 bytesRaw
1 | |
2 | var pull = require('pull-stream') |
3 | var Notify = require('pull-notify') |
4 | var toAddress = require('../../lib/util').toAddress |
5 | var mdm = require('mdmanifest') |
6 | var valid = require('../../lib/validators') |
7 | var apidoc = require('../../lib/apidocs').gossip |
8 | var u = require('../../lib/util') |
9 | var ref = require('ssb-ref') |
10 | var ping = require('pull-ping') |
11 | var Stats = require('statistics') |
12 | var isArray = Array.isArray |
13 | var Schedule = require('./schedule') |
14 | var Init = require('./init') |
15 | |
16 | function isFunction (f) { |
17 | return 'function' === typeof f |
18 | } |
19 | |
20 | function stringify(peer) { |
21 | return [peer.host, peer.port, peer.key].join(':') |
22 | } |
23 | |
24 | /* |
25 | Peers : [{ |
26 | key: id, |
27 | host: ip, |
28 | port: int, |
29 | //to be backwards compatible with patchwork... |
30 | announcers: {length: int} |
31 | source: 'pub'|'manual'|'local' |
32 | }] |
33 | */ |
34 | |
35 | |
36 | module.exports = { |
37 | name: 'gossip', |
38 | version: '1.0.0', |
39 | manifest: mdm.manifest(apidoc), |
40 | permissions: { |
41 | anonymous: {allow: ['ping']} |
42 | }, |
43 | init: function (server, config) { |
44 | var notify = Notify() |
45 | var conf = config.gossip || {} |
46 | var home = ref.parseAddress(server.getAddress()) |
47 | |
48 | //Known Peers |
49 | var peers = [] |
50 | |
51 | function getPeer(id) { |
52 | return u.find(peers, function (e) { |
53 | return e && e.key === id |
54 | }) |
55 | } |
56 | |
57 | var timer_ping = 5*6e4 |
58 | |
59 | var gossip = { |
60 | wakeup: 0, |
61 | peers: function () { |
62 | return peers |
63 | }, |
64 | get: function (addr) { |
65 | addr = ref.parseAddress(addr) |
66 | return u.find(peers, function (a) { |
67 | return ( |
68 | addr.port === a.port |
69 | && addr.host === a.host |
70 | && addr.key === a.key |
71 | ) |
72 | }) |
73 | }, |
74 | connect: valid.async(function (addr, cb) { |
75 | addr = ref.parseAddress(addr) |
76 | if (!addr || typeof addr != 'object') |
77 | return cb(new Error('first param must be an address')) |
78 | |
79 | if(!addr.key) return cb(new Error('address must have ed25519 key')) |
80 | // add peer to the table, incase it isn't already. |
81 | gossip.add(addr, 'manual') |
82 | var p = gossip.get(addr) |
83 | if(!p) return cb() |
84 | |
85 | p.stateChange = Date.now() |
86 | p.state = 'connecting' |
87 | server.connect(p, function (err, rpc) { |
88 | if (err) { |
89 | p.state = undefined |
90 | p.failure = (p.failure || 0) + 1 |
91 | p.stateChange = Date.now() |
92 | notify({ type: 'connect-failure', peer: p }) |
93 | server.emit('log:info', ['SBOT', p.host+':'+p.port+p.key, 'connection failed', err.message || err]) |
94 | p.duration.value(0) |
95 | return (cb && cb(err)) |
96 | } |
97 | else { |
98 | p.state = 'connected' |
99 | p.failure = 0 |
100 | } |
101 | cb && cb(null, rpc) |
102 | }) |
103 | |
104 | }, 'string|object'), |
105 | |
106 | disconnect: valid.async(function (addr, cb) { |
107 | var peer = this.get(addr) |
108 | |
109 | peer.state = 'disconnecting' |
110 | peer.stateChange = Date.now() |
111 | if(!peer || !peer.disconnect) cb && cb() |
112 | else peer.disconnect(true, function (err) { |
113 | peer.stateChange = Date.now() |
114 | }) |
115 | |
116 | }, 'string|object'), |
117 | |
118 | changes: function () { |
119 | return notify.listen() |
120 | }, |
121 | //add an address to the peer table. |
122 | add: valid.sync(function (addr, source) { |
123 | addr = ref.parseAddress(addr) |
124 | if(!ref.isAddress(addr)) |
125 | throw new Error('not a valid address:' + JSON.stringify(addr)) |
126 | // check that this is a valid address, and not pointing at self. |
127 | |
128 | if(addr.key === home.key) return |
129 | if(addr.host === home.host && addr.port === home.port) return |
130 | |
131 | var f = gossip.get(addr) |
132 | |
133 | if(!f) { |
134 | // new peer |
135 | addr.source = source |
136 | addr.announcers = 1 |
137 | addr.duration = Stats() |
138 | peers.push(addr) |
139 | notify({ type: 'discover', peer: addr, source: source || 'manual' }) |
140 | return addr |
141 | } |
142 | //don't count local over and over |
143 | else if(f.source != 'local') |
144 | f.announcers ++ |
145 | |
146 | return f |
147 | }, 'string|object', 'string?'), |
148 | ping: function (opts) { |
149 | var timeout = config.timers && config.timers.ping || 5*60e3 |
150 | //between 10 seconds and 30 minutes, default 5 min |
151 | timeout = Math.max(10e3, Math.min(timeout, 30*60e3)) |
152 | return ping({timeout: timeout}) |
153 | }, |
154 | reconnect: function () { |
155 | for(var id in server.peers) |
156 | if(id !== server.id) //don't disconnect local client |
157 | server.peers[id].forEach(function (peer) { |
158 | peer.close(true) |
159 | }) |
160 | return gossip.wakeup = Date.now() |
161 | } |
162 | } |
163 | |
164 | Schedule (gossip, config, server) |
165 | Init (gossip, config, server) |
166 | //get current state |
167 | |
168 | server.on('rpc:connect', function (rpc, isClient) { |
169 | var peer = getPeer(rpc.id) |
170 | //don't track clients that connect, but arn't considered peers. |
171 | //maybe we should though? |
172 | if(!peer) return |
173 | console.log('Connected', stringify(peer)) |
174 | //means that we have created this connection, not received it. |
175 | peer.client = !!isClient |
176 | peer.state = 'connected' |
177 | peer.stateChange = Date.now() |
178 | peer.disconnect = function (err, cb) { |
179 | if(isFunction(err)) cb = err, err = null |
180 | rpc.close(err, cb) |
181 | } |
182 | |
183 | if(isClient) { |
184 | //default ping is 5 minutes... |
185 | var pp = ping({serve: true, timeout: timer_ping}, function (_) {}) |
186 | peer.ping = {rtt: pp.rtt, skew: pp.skew} |
187 | pull( |
188 | pp, |
189 | rpc.gossip.ping({timeout: timer_ping}, function (err) { |
190 | if(err.name === 'TypeError') peer.ping.fail = true |
191 | }), |
192 | pp |
193 | ) |
194 | } |
195 | |
196 | rpc.on('closed', function () { |
197 | console.log('Disconnected', stringify(peer)) |
198 | //track whether we have successfully connected. |
199 | //or how many failures there have been. |
200 | var since = peer.stateChange |
201 | peer.stateChange = Date.now() |
202 | if(peer.state === 'connected') //may be "disconnecting" |
203 | peer.duration.value(peer.stateChange - since) |
204 | peer.state = undefined |
205 | notify({ type: 'disconnect', peer: peer }) |
206 | server.emit('log:info', ['SBOT', rpc.id, 'disconnect']) |
207 | }) |
208 | |
209 | notify({ type: 'connect', peer: peer }) |
210 | }) |
211 | |
212 | return gossip |
213 | } |
214 | } |
215 | |
216 | |
217 |
Built with git-ssb-web