git ssb

0+

alanz / patchwork



forked from Matt McKegg / patchwork

Tree: f8a3919796796fb604a4d98decee02f962ee7cee

Files: f8a3919796796fb604a4d98decee02f962ee7cee / lib / persistent-gossip / index.js

7685 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var Notify = require('pull-notify')
4var mdm = require('mdmanifest')
5var valid = require('scuttlebot/lib/validators')
6var apidoc = require('scuttlebot/lib/apidocs').gossip
7var u = require('scuttlebot/lib/util')
8var ref = require('ssb-ref')
9var ping = require('pull-ping')
10var stats = require('statistics')
11var Schedule = require('./schedule')
12var Init = require('./init')
13var AtomicFile = require('atomic-file')
14var path = require('path')
15var deepEqual = require('deep-equal')
16
17function isFunction (f) {
18 return 'function' === typeof f
19}
20
21function stringify(peer) {
22 return [peer.host, peer.port, peer.key].join(':')
23}
24
25function isFriends (friends, a, b) {
26 return friends[a] && friends[b] && friends[a][b] && friends[b][a]
27}
28
29/*
30Peers : [{
31 key: id,
32 host: ip,
33 port: int,
34 //to be backwards compatible with patchwork...
35 announcers: {length: int}
36 source: 'pub'|'manual'|'local'
37}]
38*/
39
40
41module.exports = {
42 name: 'gossip',
43 version: '1.0.0',
44 manifest: mdm.manifest(apidoc),
45 permissions: {
46 anonymous: {allow: ['ping']}
47 },
48 init: function (server, config) {
49 var notify = Notify()
50 var conf = config.gossip || {}
51 var home = ref.parseAddress(server.getAddress())
52
53 var stateFile = AtomicFile(path.join(config.path, 'gossip.json'))
54
55 //Known Peers
56 var peers = []
57
58 function getPeer(id) {
59 return u.find(peers, function (e) {
60 return e && e.key === id
61 })
62 }
63
64 var timer_ping = 5*6e4
65
66 var gossip = {
67 wakeup: 0,
68 peers: function () {
69 return peers
70 },
71 connectToFriends: function () {
72 server.friends.all((err, friends) => {
73 if (err) return
74 peers.forEach(function(peer) {
75 if (!Schedule.isConnect(peer) && isFriends(friends, server.id, peer.key)) {
76 gossip.connect(peer, function() {})
77 }
78 })
79 })
80 },
81 get: function (addr) {
82 addr = ref.parseAddress(addr)
83 return u.find(peers, function (a) {
84 return (
85 addr.port === a.port
86 && addr.host === a.host
87 && addr.key === a.key
88 )
89 })
90 },
91 connect: valid.async(function (addr, cb) {
92 addr = ref.parseAddress(addr)
93 if (!addr || typeof addr != 'object')
94 return cb(new Error('first param must be an address'))
95
96 if(!addr.key) return cb(new Error('address must have ed25519 key'))
97 // add peer to the table, incase it isn't already.
98 gossip.add(addr, 'manual')
99 var p = gossip.get(addr)
100 if(!p) return cb()
101
102 p.stateChange = Date.now()
103 p.state = 'connecting'
104 server.connect(p, function (err, rpc) {
105 if (err) {
106 p.state = undefined
107 p.failure = (p.failure || 0) + 1
108 p.stateChange = Date.now()
109 notify({ type: 'connect-failure', peer: p })
110 server.emit('log:info', ['SBOT', p.host+':'+p.port+p.key, 'connection failed', err.message || err])
111 p.duration = stats(p.duration, 0)
112 return (cb && cb(err))
113 }
114 else {
115 p.state = 'connected'
116 p.failure = 0
117 }
118 cb && cb(null, rpc)
119 })
120
121 }, 'string|object'),
122
123 disconnect: valid.async(function (addr, cb) {
124 var peer = this.get(addr)
125
126 peer.state = 'disconnecting'
127 peer.stateChange = Date.now()
128 if(!peer || !peer.disconnect) cb && cb()
129 else peer.disconnect(true, function (err) {
130 peer.stateChange = Date.now()
131 })
132
133 }, 'string|object'),
134
135 changes: function () {
136 return notify.listen()
137 },
138 //add an address to the peer table.
139 add: valid.sync(function (addr, source) {
140 addr = ref.parseAddress(addr)
141 if(!ref.isAddress(addr))
142 throw new Error('not a valid address:' + JSON.stringify(addr))
143 // check that this is a valid address, and not pointing at self.
144
145 if(addr.key === home.key) return
146 if(addr.host === home.host && addr.port === home.port) return
147
148 var f = gossip.get(addr)
149
150 if(!f) {
151 // new peer
152 addr.source = source
153 addr.announcers = 1
154 addr.duration = addr.duration || null
155 peers.push(addr)
156 notify({ type: 'discover', peer: addr, source: source || 'manual' })
157 return addr
158 } else if (source === 'local') {
159 // this peer is local to us, override old source
160 addr.source = 'local'
161 }
162 //don't count local over and over
163 else if(f.source != 'local')
164 f.announcers ++
165
166 return f
167 }, 'string|object', 'string?'),
168 ping: function (opts) {
169 var timeout = config.timers && config.timers.ping || 5*60e3
170 //between 10 seconds and 30 minutes, default 5 min
171 timeout = Math.max(10e3, Math.min(timeout, 30*60e3))
172 return ping({timeout: timeout})
173 },
174 reconnect: function () {
175 for(var id in server.peers)
176 if(id !== server.id) //don't disconnect local client
177 server.peers[id].forEach(function (peer) {
178 peer.close(true)
179 })
180 return gossip.wakeup = Date.now()
181 }
182 }
183
184 Schedule (gossip, config, server)
185 Init (gossip, config, server)
186 //get current state
187
188 server.on('rpc:connect', function (rpc, isClient) {
189 var peer = getPeer(rpc.id)
190 //don't track clients that connect, but arn't considered peers.
191 //maybe we should though?
192 if(!peer) return
193 console.log('Connected', stringify(peer))
194 //means that we have created this connection, not received it.
195 peer.client = !!isClient
196 peer.state = 'connected'
197 peer.stateChange = Date.now()
198 peer.disconnect = function (err, cb) {
199 if(isFunction(err)) cb = err, err = null
200 rpc.close(err, cb)
201 }
202
203 if(isClient) {
204 //default ping is 5 minutes...
205 var pp = ping({serve: true, timeout: timer_ping}, function (_) {})
206 peer.ping = {rtt: pp.rtt, skew: pp.skew}
207 pull(
208 pp,
209 rpc.gossip.ping({timeout: timer_ping}, function (err) {
210 if(err.name === 'TypeError') peer.ping.fail = true
211 }),
212 pp
213 )
214 }
215
216 rpc.on('closed', function () {
217 console.log('Disconnected', stringify(peer))
218 //track whether we have successfully connected.
219 //or how many failures there have been.
220 var since = peer.stateChange
221 peer.stateChange = Date.now()
222// if(peer.state === 'connected') //may be "disconnecting"
223 peer.duration = stats(peer.duration, peer.stateChange - since)
224// console.log(peer.duration)
225 peer.state = undefined
226 notify({ type: 'disconnect', peer: peer })
227 server.emit('log:info', ['SBOT', rpc.id, 'disconnect'])
228 })
229
230 notify({ type: 'connect', peer: peer })
231 })
232
233 var last
234 stateFile.get(function (err, ary) {
235 last = ary || []
236 if(Array.isArray(ary))
237 ary.forEach(function (v) {
238 delete v.state
239 // don't add local peers (wait to rediscover)
240 if(v.source !== 'local') {
241 gossip.add(v, 'stored')
242 }
243 })
244 })
245
246 var int = setInterval(function () {
247 var copy = JSON.parse(JSON.stringify(peers))
248 copy.forEach(function (e) {
249 delete e.state
250 })
251 if(deepEqual(copy, last)) return
252 last = copy
253 stateFile.set(copy, function(err) {
254 if (err) console.log(err)
255 })
256 }, 10*1000)
257
258 if(int.unref) int.unref()
259
260 return gossip
261 }
262}
263

Built with git-ssb-web