git ssb

4+

Dominic / scuttlebot



Tree: c01f906259679ef403a39a6d9c6cd31f26971659

Files: c01f906259679ef403a39a6d9c6cd31f26971659 / plugins / gossip / index.js

7732 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var Notify = require('pull-notify')
4var mdm = require('mdmanifest')
5var valid = require('../../lib/validators')
6var apidoc = require('../../lib/apidocs').gossip
7var u = require('../../lib/util')
8var ref = require('ssb-ref')
9var ping = require('pull-ping')
10var stats = require('statistics')
11var Schedule = require('./schedule')
12var Init = require('./init')
13var AtomicFile = require('atomic-file')
14var path = require('path')
15var deepEqual = require('deep-equal')
16
17function isFunction (f) {
18 return 'function' === typeof f
19}
20
21function stringify(peer) {
22 return [peer.host, peer.port, peer.key].join(':')
23}
24
25/*
26Peers : [{
27 key: id,
28 host: ip,
29 port: int,
30 //to be backwards compatible with patchwork...
31 announcers: {length: int}
32 source: 'pub'|'manual'|'local'
33}]
34*/
35
36
37module.exports = {
38 name: 'gossip',
39 version: '1.0.0',
40 manifest: mdm.manifest(apidoc),
41 permissions: {
42 anonymous: {allow: ['ping']}
43 },
44 init: function (server, config) {
45 var notify = Notify()
46 var conf = config.gossip || {}
47 var home = ref.parseAddress(server.getAddress())
48
49 var stateFile = AtomicFile(path.join(config.path, 'gossip.json'))
50
51 //Known Peers
52 var peers = []
53
54 function getPeer(id) {
55 return u.find(peers, function (e) {
56 return e && e.key === id
57 })
58 }
59
60 var timer_ping = 5*6e4
61
62 var gossip = {
63 wakeup: 0,
64 peers: function () {
65 return peers
66 },
67 get: function (addr) {
68 addr = ref.parseAddress(addr)
69 return u.find(peers, function (a) {
70 return (
71 addr.port === a.port
72 && addr.host === a.host
73 && addr.key === a.key
74 )
75 })
76 },
77 connect: valid.async(function (addr, cb) {
78 addr = ref.parseAddress(addr)
79 if (!addr || typeof addr != 'object')
80 return cb(new Error('first param must be an address'))
81
82 if(!addr.key) return cb(new Error('address must have ed25519 key'))
83 // add peer to the table, incase it isn't already.
84 gossip.add(addr, 'manual')
85 var p = gossip.get(addr)
86 if(!p) return cb()
87
88 p.stateChange = Date.now()
89 p.state = 'connecting'
90 server.connect(p, function (err, rpc) {
91 if (err) {
92 p.state = undefined
93 p.failure = (p.failure || 0) + 1
94 p.stateChange = Date.now()
95 notify({ type: 'connect-failure', peer: p })
96 server.emit('log:info', ['SBOT', p.host+':'+p.port+p.key, 'connection failed', err.message || err])
97 p.duration = stats(p.duration, 0)
98 return (cb && cb(err))
99 }
100 else {
101 p.state = 'connected'
102 p.failure = 0
103 }
104 cb && cb(null, rpc)
105 })
106
107 }, 'string|object'),
108
109 disconnect: valid.async(function (addr, cb) {
110 var peer = this.get(addr)
111
112 peer.state = 'disconnecting'
113 peer.stateChange = Date.now()
114 if(!peer || !peer.disconnect) cb && cb()
115 else peer.disconnect(true, function (err) {
116 peer.stateChange = Date.now()
117 })
118
119 }, 'string|object'),
120
121 changes: function () {
122 return notify.listen()
123 },
124 //add an address to the peer table.
125 add: valid.sync(function (addr, source) {
126 addr = ref.parseAddress(addr)
127 if(!ref.isAddress(addr))
128 throw new Error('not a valid address:' + JSON.stringify(addr))
129 // check that this is a valid address, and not pointing at self.
130
131 if(addr.key === home.key) return
132 if(addr.host === home.host && addr.port === home.port) return
133
134 var f = gossip.get(addr)
135
136 if(!f) {
137 // new peer
138 addr.source = source
139 addr.announcers = 1
140 addr.duration = addr.duration || null
141 peers.push(addr)
142 notify({ type: 'discover', peer: addr, source: source || 'manual' })
143 return addr
144 } else if (source === 'friends' || source === 'local') {
145 // this peer is a friend or local, override old source to prioritize gossip
146 f.source = source
147 }
148 //don't count local over and over
149 else if(f.source != 'local')
150 f.announcers ++
151
152 return f
153 }, 'string|object', 'string?'),
154 remove: function (addr) {
155 var peer = gossip.get(addr)
156 var index = peers.indexOf(peer)
157 if (~index) {
158 peers.splice(index, 1)
159 }
160 },
161 ping: function (opts) {
162 var timeout = config.timers && config.timers.ping || 5*60e3
163 //between 10 seconds and 30 minutes, default 5 min
164 timeout = Math.max(10e3, Math.min(timeout, 30*60e3))
165 return ping({timeout: timeout})
166 },
167 reconnect: function () {
168 for(var id in server.peers)
169 if(id !== server.id) //don't disconnect local client
170 server.peers[id].forEach(function (peer) {
171 peer.close(true)
172 })
173 return gossip.wakeup = Date.now()
174 }
175 }
176
177 Schedule (gossip, config, server)
178 Init (gossip, config, server)
179 //get current state
180
181 server.on('rpc:connect', function (rpc, isClient) {
182 var peer = getPeer(rpc.id)
183 //don't track clients that connect, but arn't considered peers.
184 //maybe we should though?
185 if(!peer) {
186 if(rpc.id !== server.id) {
187 console.log('Connected', rpc.id)
188 rpc.on('closed', function () {
189 console.log('Disconnected', rpc.id)
190 })
191 }
192 return
193 }
194
195 console.log('Connected', stringify(peer))
196 //means that we have created this connection, not received it.
197 peer.client = !!isClient
198 peer.state = 'connected'
199 peer.stateChange = Date.now()
200 peer.disconnect = function (err, cb) {
201 if(isFunction(err)) cb = err, err = null
202 rpc.close(err, cb)
203 }
204
205 if(isClient) {
206 //default ping is 5 minutes...
207 var pp = ping({serve: true, timeout: timer_ping}, function (_) {})
208 peer.ping = {rtt: pp.rtt, skew: pp.skew}
209 pull(
210 pp,
211 rpc.gossip.ping({timeout: timer_ping}, function (err) {
212 if(err.name === 'TypeError') peer.ping.fail = true
213 }),
214 pp
215 )
216 }
217
218 rpc.on('closed', function () {
219 console.log('Disconnected', stringify(peer))
220 //track whether we have successfully connected.
221 //or how many failures there have been.
222 var since = peer.stateChange
223 peer.stateChange = Date.now()
224// if(peer.state === 'connected') //may be "disconnecting"
225 peer.duration = stats(peer.duration, peer.stateChange - since)
226// console.log(peer.duration)
227 peer.state = undefined
228 notify({ type: 'disconnect', peer: peer })
229 server.emit('log:info', ['SBOT', rpc.id, 'disconnect'])
230 })
231
232 notify({ type: 'connect', peer: peer })
233 })
234
235 var last
236 stateFile.get(function (err, ary) {
237 last = ary || []
238 if(Array.isArray(ary))
239 ary.forEach(function (v) {
240 delete v.state
241 // don't add local peers (wait to rediscover)
242 if(v.source !== 'local') {
243 gossip.add(v, 'stored')
244 }
245 })
246 })
247
248 var int = setInterval(function () {
249 var copy = JSON.parse(JSON.stringify(peers))
250 copy.filter(function (e) {
251 return e.source !== 'local'
252 }).forEach(function (e) {
253 delete e.state
254 })
255 if(deepEqual(copy, last)) return
256 last = copy
257 stateFile.set(copy, function(err) {
258 if (err) console.log(err)
259 })
260 }, 10*1000)
261
262 if(int.unref) int.unref()
263
264 return gossip
265 }
266}
267
268
269

Built with git-ssb-web