git ssb

4+

Dominic / scuttlebot



Tree: 89c3bab207307411d3f1ae88e16c193033db53e6

Files: 89c3bab207307411d3f1ae88e16c193033db53e6 / plugins / gossip / index.js

11832 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var Notify = require('pull-notify')
4var mdm = require('mdmanifest')
5var valid = require('../../lib/validators')
6var apidoc = require('../../lib/apidocs').gossip
7var u = require('../../lib/util')
8var ref = require('ssb-ref')
9var ping = require('pull-ping')
10var stats = require('statistics')
11var Schedule = require('./schedule')
12var Init = require('./init')
13var AtomicFile = require('atomic-file')
14var fs = require('fs')
15var path = require('path')
16var deepEqual = require('deep-equal')
17
18function isFunction (f) {
19 return 'function' === typeof f
20}
21
22function stringify(peer) {
23 return [peer.host, peer.port, peer.key].join(':')
24}
25
26function isObject (o) {
27 return o && 'object' == typeof o
28}
29
30function toBase64 (s) {
31 if(isString(s)) return s.substring(1, s.indexOf('.'))
32 else s.toString('base64') //assume a buffer
33}
34
35function isString (s) {
36 return 'string' == typeof s
37}
38
39function coearseAddress (address) {
40 if(isObject(address)) {
41 if(ref.isAddress(address.address))
42 return address.address
43 var protocol = 'net'
44 if (address.host && address.host.endsWith(".onion"))
45 protocol = 'onion'
46 return [protocol, address.host, address.port].join(':') +'~'+['shs', toBase64(address.key)].join(':')
47 }
48 return address
49}
50
51/*
52Peers : [{
53 //modern:
54 address: <multiserver address>,
55
56
57 //legacy
58 key: id,
59 host: ip,
60 port: int,
61
62 //to be backwards compatible with patchwork...
63 announcers: {length: int}
64 //TODO: availability
65 //availability: 0-1, //online probability estimate
66
67 //where this peer was added from. TODO: remove "pub" peers.
68 source: 'pub'|'manual'|'local'
69}]
70*/
71
72
73module.exports = {
74 name: 'gossip',
75 version: '1.0.0',
76 manifest: mdm.manifest(apidoc),
77 permissions: {
78 anonymous: {allow: ['ping']}
79 },
80 init: function (server, config) {
81 var notify = Notify()
82 var closed = false, closeScheduler
83 var conf = config.gossip || {}
84
85 var gossipJsonPath = path.join(config.path, 'gossip.json')
86 var stateFile = AtomicFile(gossipJsonPath)
87
88 var status = {}
89
90 //Known Peers
91 var peers = []
92
93 function getPeer(id) {
94 return u.find(peers, function (e) {
95 return e && e.key === id
96 })
97 }
98
99 function simplify (peer) {
100 return {
101 address: peer.address || coearseAddress(peer),
102 source: peer.source,
103 state: peer.state, stateChange: peer.stateChange,
104 failure: peer.failure,
105 client: peer.client,
106 stats: {
107 duration: peer.duration || undefined,
108 rtt: peer.ping ? peer.ping.rtt : undefined,
109 skew: peer.ping ? peer.ping.skew : undefined,
110 }
111 }
112 }
113
114 server.status.hook(function (fn) {
115 var _status = fn()
116 _status.gossip = status
117 peers.forEach(function (peer) {
118 if(peer.stateChange + 3e3 > Date.now() || peer.state === 'connected')
119 status[peer.key] = simplify(peer)
120 })
121 return _status
122
123 })
124
125 server.close.hook(function (fn, args) {
126 closed = true
127 closeScheduler()
128 for(var id in server.peers)
129 server.peers[id].forEach(function (peer) {
130 peer.close(true)
131 })
132 return fn.apply(this, args)
133 })
134
135 var timer_ping = 5*6e4
136
137 function setConfig(name, value) {
138 config.gossip = config.gossip || {}
139 config.gossip[name] = value
140
141 var cfgPath = path.join(config.path, 'config')
142 var existingConfig = {}
143
144 // load ~/.ssb/config
145 try { existingConfig = JSON.parse(fs.readFileSync(cfgPath, 'utf-8')) }
146 catch (e) {}
147
148 // update the plugins config
149 existingConfig.gossip = existingConfig.gossip || {}
150 existingConfig.gossip[name] = value
151
152 // write to disc
153 fs.writeFileSync(cfgPath, JSON.stringify(existingConfig, null, 2), 'utf-8')
154 }
155
156 var gossip = {
157 wakeup: 0,
158 peers: function () {
159 return peers
160 },
161 get: function (addr) {
162 //addr = ref.parseAddress(addr)
163 if(ref.isFeed(addr)) return getPeer(addr)
164 else if(ref.isFeed(addr.key)) return getPeer(addr.key)
165 else throw new Error('must provide id:'+JSON.stringify(addr))
166// return u.find(peers, function (a) {
167// return (
168// addr.port === a.port
169// && addr.host === a.host
170// && addr.key === a.key
171// )
172// })
173 },
174 connect: valid.async(function (addr, cb) {
175 if(ref.isFeed(addr))
176 addr = gossip.get(addr)
177 server.emit('log:info', ['SBOT', stringify(addr), 'CONNECTING'])
178 if(!ref.isAddress(addr.address))
179 addr = ref.parseAddress(addr)
180 if (!addr || typeof addr != 'object')
181 return cb(new Error('first param must be an address'))
182
183 if(!addr.address)
184 if(!addr.key) return cb(new Error('address must have ed25519 key'))
185 // add peer to the table, incase it isn't already.
186 gossip.add(addr, 'manual')
187 var p = gossip.get(addr)
188 if(!p) return cb()
189
190 p.stateChange = Date.now()
191 p.state = 'connecting'
192 server.connect(p.address, function (err, rpc) {
193 if (err) {
194 p.error = err.stack
195 p.state = undefined
196 p.failure = (p.failure || 0) + 1
197 p.stateChange = Date.now()
198 notify({ type: 'connect-failure', peer: p })
199 server.emit('log:info', ['SBOT', stringify(p), 'ERR', (err.message || err)])
200 p.duration = stats(p.duration, 0)
201 return (cb && cb(err))
202 }
203 else {
204 delete p.error
205 p.state = 'connected'
206 p.failure = 0
207 }
208 cb && cb(null, rpc)
209 })
210
211 }, 'string|object'),
212
213 disconnect: valid.async(function (addr, cb) {
214 var peer = gossip.get(addr)
215
216 peer.state = 'disconnecting'
217 peer.stateChange = Date.now()
218 if(!peer || !peer.disconnect) cb && cb()
219 else peer.disconnect(true, function (err) {
220 peer.stateChange = Date.now()
221 cb && cb()
222 })
223
224 }, 'string|object'),
225
226 changes: function () {
227 return notify.listen()
228 },
229 //add an address to the peer table.
230 add: valid.sync(function (addr, source) {
231
232 if(isObject(addr)) {
233 addr.address = coearseAddress(addr)
234 }
235 else {
236 var _addr = ref.parseAddress(addr)
237 if(!_addr) throw new Error('not a valid address:'+addr)
238 _addr.address = addr
239 addr = _addr
240 }
241 if(!ref.isAddress(addr.address) /*&& !ref.isAddress(addr)*/)
242 throw new Error('not a valid address:' + JSON.stringify(addr))
243 // check that this is a valid address, and not pointing at self.
244
245 if(addr.key === server.id) return
246
247 var f = gossip.get(addr)
248
249 if(!f) {
250 // new peer
251 addr.source = source
252 addr.announcers = 1
253 addr.duration = addr.duration || null
254 peers.push(addr)
255 notify({ type: 'discover', peer: addr, source: source || 'manual' })
256 return addr
257 } else if (source === 'friends' || source === 'local') {
258 // this peer is a friend or local, override old source to prioritize gossip
259 f.source = source
260 }
261 //don't count local over and over
262 else if(f.source != 'local')
263 f.announcers ++
264
265 return f
266 }, 'string|object', 'string?'),
267 remove: function (addr) {
268 var peer = gossip.get(addr)
269 var index = peers.indexOf(peer)
270 if (~index) {
271 peers.splice(index, 1)
272 notify({ type: 'remove', peer: peer })
273 }
274 },
275 ping: function (opts) {
276 var timeout = config.timers && config.timers.ping || 5*60e3
277 //between 10 seconds and 30 minutes, default 5 min
278 timeout = Math.max(10e3, Math.min(timeout, 30*60e3))
279 return ping({timeout: timeout})
280 },
281 reconnect: function () {
282 for(var id in server.peers)
283 if(id !== server.id) //don't disconnect local client
284 server.peers[id].forEach(function (peer) {
285 peer.close(true)
286 })
287 return gossip.wakeup = Date.now()
288 },
289 enable: valid.sync(function (type) {
290 type = type || 'global'
291 setConfig(type, true)
292 if(type === 'local' && server.local && server.local.init)
293 server.local.init()
294 return 'enabled gossip type ' + type
295 }, 'string?'),
296 disable: valid.sync(function (type) {
297 type = type || 'global'
298 setConfig(type, false)
299 return 'disabled gossip type ' + type
300 }, 'string?')
301 }
302
303 closeScheduler = Schedule (gossip, config, server)
304 Init (gossip, config, server)
305 //get current state
306
307 server.on('rpc:connect', function (rpc, isClient) {
308
309 // if we're not ready, close this connection immediately
310 if (!server.ready() && rpc.id !== server.id) return rpc.close()
311
312 var peer = getPeer(rpc.id)
313 //don't track clients that connect, but arn't considered peers.
314 //maybe we should though?
315 if(!peer) {
316 if(rpc.id !== server.id) {
317 server.emit('log:info', ['SBOT', rpc.id, 'Connected'])
318 rpc.on('closed', function () {
319 server.emit('log:info', ['SBOT', rpc.id, 'Disconnected'])
320 })
321 }
322 return
323 }
324
325 status[rpc.id] = simplify(peer)
326
327 server.emit('log:info', ['SBOT', stringify(peer), 'PEER JOINED'])
328 //means that we have created this connection, not received it.
329 peer.client = !!isClient
330 peer.state = 'connected'
331 peer.stateChange = Date.now()
332 peer.disconnect = function (err, cb) {
333 if(isFunction(err)) cb = err, err = null
334 rpc.close(err, cb)
335 }
336
337 if(isClient) {
338 //default ping is 5 minutes...
339 var pp = ping({serve: true, timeout: timer_ping}, function (_) {})
340 peer.ping = {rtt: pp.rtt, skew: pp.skew}
341 pull(
342 pp,
343 rpc.gossip.ping({timeout: timer_ping}, function (err) {
344 if(err.name === 'TypeError') peer.ping.fail = true
345 }),
346 pp
347 )
348 }
349
350 rpc.on('closed', function () {
351 delete status[rpc.id]
352 server.emit('log:info', ['SBOT', stringify(peer),
353 ['DISCONNECTED. state was', peer.state, 'for',
354 (new Date() - peer.stateChange)/1000, 'seconds'].join(' ')])
355 //track whether we have successfully connected.
356 //or how many failures there have been.
357 var since = peer.stateChange
358 peer.stateChange = Date.now()
359// if(peer.state === 'connected') //may be "disconnecting"
360 peer.duration = stats(peer.duration, peer.stateChange - since)
361 peer.state = undefined
362 notify({ type: 'disconnect', peer: peer })
363 })
364
365 notify({ type: 'connect', peer: peer })
366 })
367
368 var last
369 stateFile.get(function (err, ary) {
370 last = ary || []
371 if(Array.isArray(ary))
372 ary.forEach(function (v) {
373 delete v.state
374 // don't add local peers (wait to rediscover)
375 // adding peers back this way means old format gossip.json
376 // will be updated to having proper address values.
377 if(v.source !== 'local') {
378 gossip.add(v, 'stored')
379 }
380 })
381 })
382
383 var int = setInterval(function () {
384 var copy = JSON.parse(JSON.stringify(peers))
385 copy.filter(function (e) {
386 return e.source !== 'local'
387 }).forEach(function (e) {
388 delete e.state
389 })
390 if(deepEqual(copy, last)) return
391 last = copy
392 stateFile.set(copy, function(err) {
393 if (err) console.log(err)
394 })
395 }, 10*1000)
396
397 if(int.unref) int.unref()
398
399 return gossip
400 }
401}
402
403
404

Built with git-ssb-web