git ssb

4+

Dominic / scuttlebot



Tree: c5a7c1d8b9fae990ed1c54fd2de267eccdbdc14a

Files: c5a7c1d8b9fae990ed1c54fd2de267eccdbdc14a / plugins / gossip / index.js

12460 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var Notify = require('pull-notify')
4var mdm = require('mdmanifest')
5var valid = require('../../lib/validators')
6var apidoc = require('../../lib/apidocs').gossip
7var u = require('../../lib/util')
8var ref = require('ssb-ref')
9var ping = require('pull-ping')
10var stats = require('statistics')
11var Schedule = require('./schedule')
12var Init = require('./init')
13var AtomicFile = require('atomic-file')
14var fs = require('fs')
15var path = require('path')
16var deepEqual = require('deep-equal')
17
18function isFunction (f) {
19 return 'function' === typeof f
20}
21
22function stringify(peer) {
23 return [peer.host, peer.port, peer.key].join(':')
24}
25
26function isObject (o) {
27 return o && 'object' == typeof o
28}
29
30function toBase64 (s) {
31 if(isString(s)) return s
32 else s.toString('base64') //assume a buffer
33}
34
35function isString (s) {
36 return 'string' == typeof s
37}
38
39function coearseAddress (address) {
40 if(isObject(address)) {
41 var protocol = 'net'
42 if (address.host.endsWith(".onion"))
43 protocol = 'onion'
44 return [protocol, address.host, address.port].join(':') +'~'+['shs', toBase64(address.key)].join(':')
45 }
46 return address
47}
48
49/*
50Peers : [{
51 key: id,
52 host: ip,
53 port: int,
54 //to be backwards compatible with patchwork...
55 announcers: {length: int}
56 source: 'pub'|'manual'|'local'
57}]
58*/
59
60
61module.exports = {
62 name: 'gossip',
63 version: '1.0.0',
64 manifest: mdm.manifest(apidoc),
65 permissions: {
66 anonymous: {allow: ['ping']}
67 },
68 init: function (server, config) {
69 var notify = Notify()
70 var closed = false, closeScheduler
71 var conf = config.gossip || {}
72
73 var gossipJsonPath = path.join(config.path, 'gossip.json')
74 var stateFile = AtomicFile(gossipJsonPath)
75 stateFile.get(function (err, ary) {
76 var peers = ary || []
77 server.emit('log:info', ['SBOT', ''+peers.length+' peers loaded from', gossipJsonPath])
78 })
79
80 var status = {}
81
82 //Known Peers
83 var peers = []
84
85 function getPeer(id) {
86 return u.find(peers, function (e) {
87 return e && e.key === id
88 })
89 }
90
91 function simplify (peer) {
92 return {
93 address: coearseAddress(peer),
94 source: peer.source,
95 state: peer.state, stateChange: peer.stateChange,
96 failure: peer.failure,
97 client: peer.client,
98 stats: {
99 duration: peer.duration || undefined,
100 rtt: peer.ping ? peer.ping.rtt : undefined,
101 skew: peer.ping ? peer.ping.skew : undefined,
102 }
103 }
104 }
105
106 server.status.hook(function (fn) {
107 var _status = fn()
108 _status.gossip = status
109 peers.forEach(function (peer) {
110 if(peer.stateChange + 3e3 > Date.now() || peer.state === 'connected')
111 status[peer.key] = simplify(peer)
112 })
113 return _status
114
115 })
116
117 server.close.hook(function (fn, args) {
118 closed = true
119 closeScheduler()
120 for(var id in server.peers)
121 server.peers[id].forEach(function (peer) {
122 peer.close(true)
123 })
124 return fn.apply(this, args)
125 })
126
127 var timer_ping = 5*6e4
128
129 function setConfig(name, value) {
130 config.gossip = config.gossip || {}
131 config.gossip[name] = value
132
133 var cfgPath = path.join(config.path, 'config')
134 var existingConfig = {}
135
136 // load ~/.ssb/config
137 try { existingConfig = JSON.parse(fs.readFileSync(cfgPath, 'utf-8')) }
138 catch (e) {}
139
140 // update the plugins config
141 existingConfig.gossip = existingConfig.gossip || {}
142 existingConfig.gossip[name] = value
143
144 // write to disc
145 fs.writeFileSync(cfgPath, JSON.stringify(existingConfig, null, 2), 'utf-8')
146 }
147
148 var gossip = {
149 wakeup: 0,
150 peers: function () {
151 return peers
152 },
153 get: function (addr) {
154 addr = ref.parseAddress(addr)
155 return u.find(peers, function (a) {
156 return (
157 addr.port === a.port
158 && addr.host === a.host
159 && addr.key === a.key
160 )
161 })
162 },
163 connect: valid.async(function (addr, cb) {
164 server.emit('log:info', ['SBOT', stringify(addr), 'CONNECTING'])
165 addr = ref.parseAddress(addr)
166 if (!addr || typeof addr != 'object')
167 return cb(new Error('first param must be an address'))
168
169 if(!addr.key) return cb(new Error('address must have ed25519 key'))
170 // add peer to the table, incase it isn't already.
171 gossip.add(addr, 'manual')
172 var p = gossip.get(addr)
173 if(!p) return cb()
174
175 p.stateChange = Date.now()
176 p.state = 'connecting'
177 server.connect(coearseAddress(p), function (err, rpc) {
178 if (err) {
179 p.error = err.stack
180 p.state = undefined
181 p.failure = (p.failure || 0) + 1
182 p.stateChange = Date.now()
183 notify({ type: 'connect-failure', peer: p })
184 server.emit('log:info', ['SBOT', stringify(p), 'ERR', (err.message || err)])
185 p.duration = stats(p.duration, 0)
186 return (cb && cb(err))
187 }
188 else {
189 delete p.error
190 p.state = 'connected'
191 p.failure = 0
192 }
193 cb && cb(null, rpc)
194 })
195
196 }, 'string|object'),
197
198 disconnect: valid.async(function (addr, cb) {
199 var peer = this.get(addr)
200
201 peer.state = 'disconnecting'
202 peer.stateChange = Date.now()
203 if(!peer || !peer.disconnect) cb && cb()
204 else peer.disconnect(true, function (err) {
205 peer.stateChange = Date.now()
206 cb && cb()
207 })
208
209 }, 'string|object'),
210
211 changes: function () {
212 return notify.listen()
213 },
214 //add an address to the peer table.
215 add: valid.sync(function (addr, source) {
216
217 addr = ref.parseAddress(addr)
218 if(!ref.isAddress(addr))
219 throw new Error('not a valid address:' + JSON.stringify(addr))
220 // check that this is a valid address, and not pointing at self.
221
222 if(addr.key === server.id) return
223
224 var f = gossip.get(addr)
225
226 if(!f) {
227 // new peer
228 addr.source = source
229 addr.announcers = 1
230 addr.duration = addr.duration || null
231 peers.push(addr)
232 notify({ type: 'discover', peer: addr, source: source || 'manual' })
233 return addr
234 } else if (source === 'friends' || source === 'local') {
235 // this peer is a friend or local, override old source to prioritize gossip
236 f.source = source
237 }
238 //don't count local over and over
239 else if(f.source != 'local')
240 f.announcers ++
241
242 return f
243 }, 'string|object', 'string?'),
244 remove: function (addr) {
245 var peer = gossip.get(addr)
246 var index = peers.indexOf(peer)
247 if (~index) {
248 peers.splice(index, 1)
249 notify({ type: 'remove', peer: peer })
250 }
251 },
252 ping: function (opts) {
253 var timeout = config.timers && config.timers.ping || 5*60e3
254 //between 10 seconds and 30 minutes, default 5 min
255 timeout = Math.max(10e3, Math.min(timeout, 30*60e3))
256 return ping({timeout: timeout})
257 },
258 pingPeers: function () {
259 var drains = {}
260 var queue = []
261 var waitingCb
262 for(var id in server.peers) {
263 if(id !== server.id) server.peers[id].forEach(function (peer) {
264 var pings
265 try {
266 pings = peer.gossip.ping()
267 } catch(e) {
268 return
269 }
270 pull(
271 pull.once(true),
272 pings,
273 pull.take(1),
274 drains[peer.id] = pull.collect(function (err, values) {
275 delete drains[peer.id]
276 var cb = waitingCb
277 if (err) {
278 if (cb && Object.keys(drains).length === 0) {
279 waitingCb = null
280 cb(true)
281 }
282 return
283 }
284 var val = {peer: peer.id, value: values[0]}
285 if (cb) {
286 waitingCb = null
287 cb(null, val)
288 } else {
289 queue.push(val)
290 }
291 })
292 )
293 })
294 }
295 return function (abort, cb) {
296 if(abort) {
297 for(var id in drains) {
298 drains[id].abort(abort)
299 }
300 return cb(abort)
301 }
302 if (queue.length > 0) {
303 cb(null, queue.shift())
304 } else if (Object.keys(drains).length === 0) {
305 cb(true)
306 } else {
307 waitingCb = cb
308 }
309 }
310 },
311 reconnect: function () {
312 for(var id in server.peers)
313 if(id !== server.id) //don't disconnect local client
314 server.peers[id].forEach(function (peer) {
315 peer.close(true)
316 })
317 return gossip.wakeup = Date.now()
318 },
319 enable: valid.sync(function (type) {
320 type = type || 'global'
321 setConfig(type, true)
322 if(type === 'local' && server.local && server.local.init)
323 server.local.init()
324 return 'enabled gossip type ' + type
325 }, 'string?'),
326 disable: valid.sync(function (type) {
327 type = type || 'global'
328 setConfig(type, false)
329 return 'disabled gossip type ' + type
330 }, 'string?')
331 }
332
333 closeScheduler = Schedule (gossip, config, server)
334 Init (gossip, config, server)
335 //get current state
336
337 server.on('rpc:connect', function (rpc, isClient) {
338
339 // if we're not ready, close this connection immediately
340 if (!server.ready() && rpc.id !== server.id) return rpc.close()
341
342 var peer = getPeer(rpc.id)
343 //don't track clients that connect, but arn't considered peers.
344 //maybe we should though?
345 if(!peer) {
346 if(rpc.id !== server.id) {
347 server.emit('log:info', ['SBOT', rpc.id, 'Connected'])
348 rpc.on('closed', function () {
349 server.emit('log:info', ['SBOT', rpc.id, 'Disconnected'])
350 })
351 }
352 return
353 }
354
355 status[rpc.id] = simplify(peer)
356
357 server.emit('log:info', ['SBOT', stringify(peer), 'PEER JOINED'])
358 //means that we have created this connection, not received it.
359 peer.client = !!isClient
360 peer.state = 'connected'
361 peer.stateChange = Date.now()
362 peer.disconnect = function (err, cb) {
363 if(isFunction(err)) cb = err, err = null
364 rpc.close(err, cb)
365 }
366
367 if(isClient) {
368 //default ping is 5 minutes...
369 var pp = ping({serve: true, timeout: timer_ping}, function (_) {})
370 peer.ping = {rtt: pp.rtt, skew: pp.skew}
371 pull(
372 pp,
373 rpc.gossip.ping({timeout: timer_ping}, function (err) {
374 if(err.name === 'TypeError') peer.ping.fail = true
375 }),
376 pp
377 )
378 }
379
380 rpc.on('closed', function () {
381 delete status[rpc.id]
382 server.emit('log:info', ['SBOT', stringify(peer),
383 ['DISCONNECTED. state was', peer.state, 'for',
384 (new Date() - peer.stateChange)/1000, 'seconds'].join(' ')])
385 //track whether we have successfully connected.
386 //or how many failures there have been.
387 var since = peer.stateChange
388 peer.stateChange = Date.now()
389// if(peer.state === 'connected') //may be "disconnecting"
390 peer.duration = stats(peer.duration, peer.stateChange - since)
391 peer.state = undefined
392 notify({ type: 'disconnect', peer: peer })
393 })
394
395 notify({ type: 'connect', peer: peer })
396 })
397
398 var last
399 stateFile.get(function (err, ary) {
400 last = ary || []
401 if(Array.isArray(ary))
402 ary.forEach(function (v) {
403 delete v.state
404 // don't add local peers (wait to rediscover)
405 if(v.source !== 'local') {
406 gossip.add(v, 'stored')
407 }
408 })
409 })
410
411 var int = setInterval(function () {
412 var copy = JSON.parse(JSON.stringify(peers))
413 copy.filter(function (e) {
414 return e.source !== 'local'
415 }).forEach(function (e) {
416 delete e.state
417 })
418 if(deepEqual(copy, last)) return
419 last = copy
420 stateFile.set(copy, function(err) {
421 if (err) console.log(err)
422 })
423 }, 10*1000)
424
425 if(int.unref) int.unref()
426
427 return gossip
428 }
429}
430
431
432
433
434
435
436

Built with git-ssb-web