git ssb

4+

Dominic / scuttlebot



Tree: 9f332f5cbf487b256a2b22ded2d6d97e1bb4e8c4

Files: 9f332f5cbf487b256a2b22ded2d6d97e1bb4e8c4 / plugins / gossip / index.js

7088 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var Notify = require('pull-notify')
4var toAddress = require('../../lib/util').toAddress
5var mdm = require('mdmanifest')
6var valid = require('../../lib/validators')
7var apidoc = require('../../lib/apidocs').gossip
8var u = require('../../lib/util')
9var ref = require('ssb-ref')
10var ping = require('pull-ping')
11var stats = require('statistics')
12var isArray = Array.isArray
13var Schedule = require('./schedule')
14var Init = require('./init')
15var AtomicFile = require('atomic-file')
16var path = require('path')
17var deepEqual = require('deep-equal')
18
19function isFunction (f) {
20 return 'function' === typeof f
21}
22
23function stringify(peer) {
24 return [peer.host, peer.port, peer.key].join(':')
25}
26
27/*
28Peers : [{
29 key: id,
30 host: ip,
31 port: int,
32 //to be backwards compatible with patchwork...
33 announcers: {length: int}
34 source: 'pub'|'manual'|'local'
35}]
36*/
37
38
39module.exports = {
40 name: 'gossip',
41 version: '1.0.0',
42 manifest: mdm.manifest(apidoc),
43 permissions: {
44 anonymous: {allow: ['ping']}
45 },
46 init: function (server, config) {
47 var notify = Notify()
48 var conf = config.gossip || {}
49 var home = ref.parseAddress(server.getAddress())
50
51 var stateFile = AtomicFile(path.join(config.path, 'gossip.json'))
52
53 //Known Peers
54 var peers = []
55
56 function getPeer(id) {
57 return u.find(peers, function (e) {
58 return e && e.key === id
59 })
60 }
61
62 var timer_ping = 5*6e4
63
64 var gossip = {
65 wakeup: 0,
66 peers: function () {
67 return peers
68 },
69 get: function (addr) {
70 addr = ref.parseAddress(addr)
71 return u.find(peers, function (a) {
72 return (
73 addr.port === a.port
74 && addr.host === a.host
75 && addr.key === a.key
76 )
77 })
78 },
79 connect: valid.async(function (addr, cb) {
80 addr = ref.parseAddress(addr)
81 if (!addr || typeof addr != 'object')
82 return cb(new Error('first param must be an address'))
83
84 if(!addr.key) return cb(new Error('address must have ed25519 key'))
85 // add peer to the table, incase it isn't already.
86 gossip.add(addr, 'manual')
87 var p = gossip.get(addr)
88 if(!p) return cb()
89
90 p.stateChange = Date.now()
91 p.state = 'connecting'
92 server.connect(p, function (err, rpc) {
93 if (err) {
94 p.state = undefined
95 p.failure = (p.failure || 0) + 1
96 p.stateChange = Date.now()
97 notify({ type: 'connect-failure', peer: p })
98 server.emit('log:info', ['SBOT', p.host+':'+p.port+p.key, 'connection failed', err.message || err])
99 p.duration = stats(p.duration, 0)
100 return (cb && cb(err))
101 }
102 else {
103 p.state = 'connected'
104 p.failure = 0
105 }
106 cb && cb(null, rpc)
107 })
108
109 }, 'string|object'),
110
111 disconnect: valid.async(function (addr, cb) {
112 var peer = this.get(addr)
113
114 peer.state = 'disconnecting'
115 peer.stateChange = Date.now()
116 if(!peer || !peer.disconnect) cb && cb()
117 else peer.disconnect(true, function (err) {
118 peer.stateChange = Date.now()
119 })
120
121 }, 'string|object'),
122
123 changes: function () {
124 return notify.listen()
125 },
126 //add an address to the peer table.
127 add: valid.sync(function (addr, source) {
128 addr = ref.parseAddress(addr)
129 if(!ref.isAddress(addr))
130 throw new Error('not a valid address:' + JSON.stringify(addr))
131 // check that this is a valid address, and not pointing at self.
132
133 if(addr.key === home.key) return
134 if(addr.host === home.host && addr.port === home.port) return
135
136 var f = gossip.get(addr)
137
138 if(!f) {
139 // new peer
140 addr.source = source
141 addr.announcers = 1
142 addr.duration = addr.duration || null
143 peers.push(addr)
144 notify({ type: 'discover', peer: addr, source: source || 'manual' })
145 return addr
146 }
147 //don't count local over and over
148 else if(f.source != 'local')
149 f.announcers ++
150
151 return f
152 }, 'string|object', 'string?'),
153 ping: function (opts) {
154 var timeout = config.timers && config.timers.ping || 5*60e3
155 //between 10 seconds and 30 minutes, default 5 min
156 timeout = Math.max(10e3, Math.min(timeout, 30*60e3))
157 return ping({timeout: timeout})
158 },
159 reconnect: function () {
160 for(var id in server.peers)
161 if(id !== server.id) //don't disconnect local client
162 server.peers[id].forEach(function (peer) {
163 peer.close(true)
164 })
165 return gossip.wakeup = Date.now()
166 }
167 }
168
169 Schedule (gossip, config, server)
170 Init (gossip, config, server)
171 //get current state
172
173 server.on('rpc:connect', function (rpc, isClient) {
174 var peer = getPeer(rpc.id)
175 //don't track clients that connect, but arn't considered peers.
176 //maybe we should though?
177 if(!peer) return
178 console.log('Connected', stringify(peer))
179 //means that we have created this connection, not received it.
180 peer.client = !!isClient
181 peer.state = 'connected'
182 peer.stateChange = Date.now()
183 peer.disconnect = function (err, cb) {
184 if(isFunction(err)) cb = err, err = null
185 rpc.close(err, cb)
186 }
187
188 if(isClient) {
189 //default ping is 5 minutes...
190 var pp = ping({serve: true, timeout: timer_ping}, function (_) {})
191 peer.ping = {rtt: pp.rtt, skew: pp.skew}
192 pull(
193 pp,
194 rpc.gossip.ping({timeout: timer_ping}, function (err) {
195 if(err.name === 'TypeError') peer.ping.fail = true
196 }),
197 pp
198 )
199 }
200
201 rpc.on('closed', function () {
202 console.log('Disconnected', stringify(peer))
203 //track whether we have successfully connected.
204 //or how many failures there have been.
205 var since = peer.stateChange
206 peer.stateChange = Date.now()
207// if(peer.state === 'connected') //may be "disconnecting"
208 peer.duration = stats(peer.duration, peer.stateChange - since)
209// console.log(peer.duration)
210 peer.state = undefined
211 notify({ type: 'disconnect', peer: peer })
212 server.emit('log:info', ['SBOT', rpc.id, 'disconnect'])
213 })
214
215 notify({ type: 'connect', peer: peer })
216 })
217
218 var last
219 stateFile.get(function (err, ary) {
220 last = ary || []
221 if(Array.isArray(ary))
222 ary.forEach(function (v) {
223 delete v.state
224 var p = gossip.add(v, 'stored')
225 })
226 })
227
228 var int = setInterval(function () {
229 var copy = JSON.parse(JSON.stringify(peers))
230 copy.forEach(function (e) {
231 delete e.state
232 })
233 if(deepEqual(copy, last)) return
234 last = copy
235 console.log("WRITE GOSSIP STATE")
236 stateFile.set(copy, console.log.bind(console))
237 }, 10*1000)
238
239 if(int.unref) int.unref()
240
241 return gossip
242 }
243}
244
245
246
247
248
249
250

Built with git-ssb-web