git ssb

4+

Dominic / scuttlebot



Tree: e5d0b7d8bace9ec48ab12ec7a1a40872a050acad

Files: e5d0b7d8bace9ec48ab12ec7a1a40872a050acad / plugins / gossip / index.js

6245 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var Notify = require('pull-notify')
4var toAddress = require('../../lib/util').toAddress
5var mdm = require('mdmanifest')
6var valid = require('../../lib/validators')
7var apidoc = require('../../lib/apidocs').gossip
8var u = require('../../lib/util')
9var ref = require('ssb-ref')
10var ping = require('pull-ping')
11var Stats = require('statistics')
12var isArray = Array.isArray
13var Schedule = require('./schedule')
14var Init = require('./init')
15
16function isFunction (f) {
17 return 'function' === typeof f
18}
19
20function stringify(peer) {
21 return [peer.host, peer.port, peer.key].join(':')
22}
23
24/*
25Peers : [{
26 key: id,
27 host: ip,
28 port: int,
29 //to be backwards compatible with patchwork...
30 announcers: {length: int}
31 source: 'pub'|'manual'|'local'
32}]
33*/
34
35
36module.exports = {
37 name: 'gossip',
38 version: '1.0.0',
39 manifest: mdm.manifest(apidoc),
40 permissions: {
41 anonymous: {allow: ['ping']}
42 },
43 init: function (server, config) {
44 var notify = Notify()
45 var conf = config.gossip || {}
46 var home = ref.parseAddress(server.getAddress())
47
48 //Known Peers
49 var peers = []
50
51 function getPeer(id) {
52 return u.find(peers, function (e) {
53 return e && e.key === id
54 })
55 }
56
57 var timer_ping = 5*6e4
58
59 var gossip = {
60 wakeup: 0,
61 peers: function () {
62 return peers
63 },
64 get: function (addr) {
65 addr = ref.parseAddress(addr)
66 return u.find(peers, function (a) {
67 return (
68 addr.port === a.port
69 && addr.host === a.host
70 && addr.key === a.key
71 )
72 })
73 },
74 connect: valid.async(function (addr, cb) {
75 addr = ref.parseAddress(addr)
76 if (!addr || typeof addr != 'object')
77 return cb(new Error('first param must be an address'))
78
79 if(!addr.key) return cb(new Error('address must have ed25519 key'))
80 // add peer to the table, incase it isn't already.
81 gossip.add(addr, 'manual')
82 var p = gossip.get(addr)
83 if(!p) return cb()
84
85 p.stateChange = Date.now()
86 p.state = 'connecting'
87 server.connect(p, function (err, rpc) {
88 if (err) {
89 p.state = undefined
90 p.failure = (p.failure || 0) + 1
91 p.stateChange = Date.now()
92 notify({ type: 'connect-failure', peer: p })
93 server.emit('log:info', ['SBOT', p.host+':'+p.port+p.key, 'connection failed', err.message || err])
94 p.duration.value(0)
95 return (cb && cb(err))
96 }
97 else {
98 p.state = 'connected'
99 p.failure = 0
100 }
101 cb && cb(null, rpc)
102 })
103
104 }, 'string|object'),
105
106 disconnect: valid.async(function (addr, cb) {
107 var peer = this.get(addr)
108
109 peer.state = 'disconnecting'
110 peer.stateChange = Date.now()
111 if(!peer || !peer.disconnect) cb && cb()
112 else peer.disconnect(true, function (err) {
113 peer.stateChange = Date.now()
114 })
115
116 }, 'string|object'),
117
118 changes: function () {
119 return notify.listen()
120 },
121 //add an address to the peer table.
122 add: valid.sync(function (addr, source) {
123 addr = ref.parseAddress(addr)
124 if(!ref.isAddress(addr))
125 throw new Error('not a valid address:' + JSON.stringify(addr))
126 // check that this is a valid address, and not pointing at self.
127
128 if(addr.key === home.key) return
129 if(addr.host === home.host && addr.port === home.port) return
130
131 var f = gossip.get(addr)
132
133 if(!f) {
134 // new peer
135 addr.source = source
136 addr.announcers = 1
137 addr.duration = Stats()
138 peers.push(addr)
139 notify({ type: 'discover', peer: addr, source: source || 'manual' })
140 return addr
141 }
142 //don't count local over and over
143 else if(f.source != 'local')
144 f.announcers ++
145
146 return f
147 }, 'string|object', 'string?'),
148 ping: function (opts) {
149 var timeout = config.timers && config.timers.ping || 5*60e3
150 //between 10 seconds and 30 minutes, default 5 min
151 timeout = Math.max(10e3, Math.min(timeout, 30*60e3))
152 return ping({timeout: timeout})
153 },
154 reconnect: function () {
155 for(var id in server.peers)
156 if(id !== server.id) //don't disconnect local client
157 server.peers[id].forEach(function (peer) {
158 peer.close(true)
159 })
160 return gossip.wakeup = Date.now()
161 }
162 }
163
164 Schedule (gossip, config, server)
165 Init (gossip, config, server)
166 //get current state
167
168 server.on('rpc:connect', function (rpc, isClient) {
169 var peer = getPeer(rpc.id)
170 //don't track clients that connect, but arn't considered peers.
171 //maybe we should though?
172 if(!peer) return
173 console.log('Connected', stringify(peer))
174 //means that we have created this connection, not received it.
175 peer.client = !!isClient
176 peer.state = 'connected'
177 peer.stateChange = Date.now()
178 peer.disconnect = function (err, cb) {
179 if(isFunction(err)) cb = err, err = null
180 rpc.close(err, cb)
181 }
182
183 if(isClient) {
184 //default ping is 5 minutes...
185 var pp = ping({serve: true, timeout: timer_ping}, function (_) {})
186 peer.ping = {rtt: pp.rtt, skew: pp.skew}
187 pull(
188 pp,
189 rpc.gossip.ping({timeout: timer_ping}, function (err) {
190 if(err.name === 'TypeError') peer.ping.fail = true
191 }),
192 pp
193 )
194 }
195
196 rpc.on('closed', function () {
197 console.log('Disconnected', stringify(peer))
198 //track whether we have successfully connected.
199 //or how many failures there have been.
200 var since = peer.stateChange
201 peer.stateChange = Date.now()
202 if(peer.state === 'connected') //may be "disconnecting"
203 peer.duration.value(peer.stateChange - since)
204 peer.state = undefined
205 notify({ type: 'disconnect', peer: peer })
206 server.emit('log:info', ['SBOT', rpc.id, 'disconnect'])
207 })
208
209 notify({ type: 'connect', peer: peer })
210 })
211
212 return gossip
213 }
214}
215
216
217

Built with git-ssb-web