Files: f8a3919796796fb604a4d98decee02f962ee7cee / lib / persistent-gossip / index.js
7685 bytesRaw
1 | |
2 | var pull = require('pull-stream') |
3 | var Notify = require('pull-notify') |
4 | var mdm = require('mdmanifest') |
5 | var valid = require('scuttlebot/lib/validators') |
6 | var apidoc = require('scuttlebot/lib/apidocs').gossip |
7 | var u = require('scuttlebot/lib/util') |
8 | var ref = require('ssb-ref') |
9 | var ping = require('pull-ping') |
10 | var stats = require('statistics') |
11 | var Schedule = require('./schedule') |
12 | var Init = require('./init') |
13 | var AtomicFile = require('atomic-file') |
14 | var path = require('path') |
15 | var deepEqual = require('deep-equal') |
16 | |
17 | function isFunction (f) { |
18 | return 'function' === typeof f |
19 | } |
20 | |
21 | function stringify(peer) { |
22 | return [peer.host, peer.port, peer.key].join(':') |
23 | } |
24 | |
25 | function isFriends (friends, a, b) { |
26 | return friends[a] && friends[b] && friends[a][b] && friends[b][a] |
27 | } |
28 | |
29 | /* |
30 | Peers : [{ |
31 | key: id, |
32 | host: ip, |
33 | port: int, |
34 | //to be backwards compatible with patchwork... |
35 | announcers: {length: int} |
36 | source: 'pub'|'manual'|'local' |
37 | }] |
38 | */ |
39 | |
40 | |
41 | module.exports = { |
42 | name: 'gossip', |
43 | version: '1.0.0', |
44 | manifest: mdm.manifest(apidoc), |
45 | permissions: { |
46 | anonymous: {allow: ['ping']} |
47 | }, |
48 | init: function (server, config) { |
49 | var notify = Notify() |
50 | var conf = config.gossip || {} |
51 | var home = ref.parseAddress(server.getAddress()) |
52 | |
53 | var stateFile = AtomicFile(path.join(config.path, 'gossip.json')) |
54 | |
55 | //Known Peers |
56 | var peers = [] |
57 | |
58 | function getPeer(id) { |
59 | return u.find(peers, function (e) { |
60 | return e && e.key === id |
61 | }) |
62 | } |
63 | |
64 | var timer_ping = 5*6e4 |
65 | |
66 | var gossip = { |
67 | wakeup: 0, |
68 | peers: function () { |
69 | return peers |
70 | }, |
71 | connectToFriends: function () { |
72 | server.friends.all((err, friends) => { |
73 | if (err) return |
74 | peers.forEach(function(peer) { |
75 | if (!Schedule.isConnect(peer) && isFriends(friends, server.id, peer.key)) { |
76 | gossip.connect(peer, function() {}) |
77 | } |
78 | }) |
79 | }) |
80 | }, |
81 | get: function (addr) { |
82 | addr = ref.parseAddress(addr) |
83 | return u.find(peers, function (a) { |
84 | return ( |
85 | addr.port === a.port |
86 | && addr.host === a.host |
87 | && addr.key === a.key |
88 | ) |
89 | }) |
90 | }, |
91 | connect: valid.async(function (addr, cb) { |
92 | addr = ref.parseAddress(addr) |
93 | if (!addr || typeof addr != 'object') |
94 | return cb(new Error('first param must be an address')) |
95 | |
96 | if(!addr.key) return cb(new Error('address must have ed25519 key')) |
97 | // add peer to the table, incase it isn't already. |
98 | gossip.add(addr, 'manual') |
99 | var p = gossip.get(addr) |
100 | if(!p) return cb() |
101 | |
102 | p.stateChange = Date.now() |
103 | p.state = 'connecting' |
104 | server.connect(p, function (err, rpc) { |
105 | if (err) { |
106 | p.state = undefined |
107 | p.failure = (p.failure || 0) + 1 |
108 | p.stateChange = Date.now() |
109 | notify({ type: 'connect-failure', peer: p }) |
110 | server.emit('log:info', ['SBOT', p.host+':'+p.port+p.key, 'connection failed', err.message || err]) |
111 | p.duration = stats(p.duration, 0) |
112 | return (cb && cb(err)) |
113 | } |
114 | else { |
115 | p.state = 'connected' |
116 | p.failure = 0 |
117 | } |
118 | cb && cb(null, rpc) |
119 | }) |
120 | |
121 | }, 'string|object'), |
122 | |
123 | disconnect: valid.async(function (addr, cb) { |
124 | var peer = this.get(addr) |
125 | |
126 | peer.state = 'disconnecting' |
127 | peer.stateChange = Date.now() |
128 | if(!peer || !peer.disconnect) cb && cb() |
129 | else peer.disconnect(true, function (err) { |
130 | peer.stateChange = Date.now() |
131 | }) |
132 | |
133 | }, 'string|object'), |
134 | |
135 | changes: function () { |
136 | return notify.listen() |
137 | }, |
138 | //add an address to the peer table. |
139 | add: valid.sync(function (addr, source) { |
140 | addr = ref.parseAddress(addr) |
141 | if(!ref.isAddress(addr)) |
142 | throw new Error('not a valid address:' + JSON.stringify(addr)) |
143 | // check that this is a valid address, and not pointing at self. |
144 | |
145 | if(addr.key === home.key) return |
146 | if(addr.host === home.host && addr.port === home.port) return |
147 | |
148 | var f = gossip.get(addr) |
149 | |
150 | if(!f) { |
151 | // new peer |
152 | addr.source = source |
153 | addr.announcers = 1 |
154 | addr.duration = addr.duration || null |
155 | peers.push(addr) |
156 | notify({ type: 'discover', peer: addr, source: source || 'manual' }) |
157 | return addr |
158 | } else if (source === 'local') { |
159 | // this peer is local to us, override old source |
160 | addr.source = 'local' |
161 | } |
162 | //don't count local over and over |
163 | else if(f.source != 'local') |
164 | f.announcers ++ |
165 | |
166 | return f |
167 | }, 'string|object', 'string?'), |
168 | ping: function (opts) { |
169 | var timeout = config.timers && config.timers.ping || 5*60e3 |
170 | //between 10 seconds and 30 minutes, default 5 min |
171 | timeout = Math.max(10e3, Math.min(timeout, 30*60e3)) |
172 | return ping({timeout: timeout}) |
173 | }, |
174 | reconnect: function () { |
175 | for(var id in server.peers) |
176 | if(id !== server.id) //don't disconnect local client |
177 | server.peers[id].forEach(function (peer) { |
178 | peer.close(true) |
179 | }) |
180 | return gossip.wakeup = Date.now() |
181 | } |
182 | } |
183 | |
184 | Schedule (gossip, config, server) |
185 | Init (gossip, config, server) |
186 | //get current state |
187 | |
188 | server.on('rpc:connect', function (rpc, isClient) { |
189 | var peer = getPeer(rpc.id) |
190 | //don't track clients that connect, but arn't considered peers. |
191 | //maybe we should though? |
192 | if(!peer) return |
193 | console.log('Connected', stringify(peer)) |
194 | //means that we have created this connection, not received it. |
195 | peer.client = !!isClient |
196 | peer.state = 'connected' |
197 | peer.stateChange = Date.now() |
198 | peer.disconnect = function (err, cb) { |
199 | if(isFunction(err)) cb = err, err = null |
200 | rpc.close(err, cb) |
201 | } |
202 | |
203 | if(isClient) { |
204 | //default ping is 5 minutes... |
205 | var pp = ping({serve: true, timeout: timer_ping}, function (_) {}) |
206 | peer.ping = {rtt: pp.rtt, skew: pp.skew} |
207 | pull( |
208 | pp, |
209 | rpc.gossip.ping({timeout: timer_ping}, function (err) { |
210 | if(err.name === 'TypeError') peer.ping.fail = true |
211 | }), |
212 | pp |
213 | ) |
214 | } |
215 | |
216 | rpc.on('closed', function () { |
217 | console.log('Disconnected', stringify(peer)) |
218 | //track whether we have successfully connected. |
219 | //or how many failures there have been. |
220 | var since = peer.stateChange |
221 | peer.stateChange = Date.now() |
222 | // if(peer.state === 'connected') //may be "disconnecting" |
223 | peer.duration = stats(peer.duration, peer.stateChange - since) |
224 | // console.log(peer.duration) |
225 | peer.state = undefined |
226 | notify({ type: 'disconnect', peer: peer }) |
227 | server.emit('log:info', ['SBOT', rpc.id, 'disconnect']) |
228 | }) |
229 | |
230 | notify({ type: 'connect', peer: peer }) |
231 | }) |
232 | |
233 | var last |
234 | stateFile.get(function (err, ary) { |
235 | last = ary || [] |
236 | if(Array.isArray(ary)) |
237 | ary.forEach(function (v) { |
238 | delete v.state |
239 | // don't add local peers (wait to rediscover) |
240 | if(v.source !== 'local') { |
241 | gossip.add(v, 'stored') |
242 | } |
243 | }) |
244 | }) |
245 | |
246 | var int = setInterval(function () { |
247 | var copy = JSON.parse(JSON.stringify(peers)) |
248 | copy.forEach(function (e) { |
249 | delete e.state |
250 | }) |
251 | if(deepEqual(copy, last)) return |
252 | last = copy |
253 | stateFile.set(copy, function(err) { |
254 | if (err) console.log(err) |
255 | }) |
256 | }, 10*1000) |
257 | |
258 | if(int.unref) int.unref() |
259 | |
260 | return gossip |
261 | } |
262 | } |
263 |
Built with git-ssb-web