git ssb

10+

Matt McKegg / patchwork



Tree: 3eb1b173c39d9c87c4c84a786a93eb2059417ac2

Files: 3eb1b173c39d9c87c4c84a786a93eb2059417ac2 / lib / gossip-with-slow-rollout / index.js

6178 bytesRaw
1'use strict'
2var pull = require('pull-stream')
3var Notify = require('pull-notify')
4var mdm = require('mdmanifest')
5var valid = require('scuttlebot/lib/validators')
6var apidoc = require('scuttlebot/lib/apidocs').gossip
7var u = require('scuttlebot/lib/util')
8var ref = require('ssb-ref')
9var ping = require('pull-ping')
10var Stats = require('statistics')
11var Schedule = require('./schedule')
12var Init = require('./init')
13
14function isFunction (f) {
15 return 'function' === typeof f
16}
17
18function stringify(peer) {
19 return [peer.host, peer.port, peer.key].join(':')
20}
21
22/*
23Peers : [{
24 key: id,
25 host: ip,
26 port: int,
27 //to be backwards compatible with patchwork...
28 announcers: {length: int}
29 source: 'pub'|'manual'|'local'
30}]
31*/
32
33
34module.exports = {
35 name: 'gossip',
36 version: '1.0.0',
37 manifest: mdm.manifest(apidoc),
38 permissions: {
39 anonymous: {allow: ['ping']}
40 },
41 init: function (server, config) {
42 var notify = Notify()
43 var conf = config.gossip || {}
44 var home = ref.parseAddress(server.getAddress())
45
46 //Known Peers
47 var peers = []
48
49 function getPeer(id) {
50 return u.find(peers, function (e) {
51 return e && e.key === id
52 })
53 }
54
55 var timer_ping = 5*6e4
56
57 var gossip = {
58 wakeup: 0,
59 peers: function () {
60 return peers
61 },
62 get: function (addr) {
63 addr = ref.parseAddress(addr)
64 return u.find(peers, function (a) {
65 return (
66 addr.port === a.port
67 && addr.host === a.host
68 && addr.key === a.key
69 )
70 })
71 },
72 connect: valid.async(function (addr, cb) {
73 addr = ref.parseAddress(addr)
74 if (!addr || typeof addr != 'object')
75 return cb(new Error('first param must be an address'))
76
77 if(!addr.key) return cb(new Error('address must have ed25519 key'))
78 // add peer to the table, incase it isn't already.
79 gossip.add(addr, 'manual')
80 var p = gossip.get(addr)
81 if(!p) return cb()
82
83 p.stateChange = Date.now()
84 p.state = 'connecting'
85 server.connect(p, function (err, rpc) {
86 if (err) {
87 p.state = undefined
88 p.failure = (p.failure || 0) + 1
89 p.stateChange = Date.now()
90 notify({ type: 'connect-failure', peer: p })
91 server.emit('log:info', ['SBOT', p.host+':'+p.port+p.key, 'connection failed', err.message || err])
92 p.duration.value(0)
93 return (cb && cb(err))
94 }
95 else {
96 p.state = 'connected'
97 p.failure = 0
98 }
99 cb && cb(null, rpc)
100 })
101
102 }, 'string|object'),
103
104 disconnect: valid.async(function (addr, cb) {
105 var peer = this.get(addr)
106
107 peer.state = 'disconnecting'
108 peer.stateChange = Date.now()
109 if(!peer || !peer.disconnect) cb && cb()
110 else peer.disconnect(true, function (err) {
111 peer.stateChange = Date.now()
112 })
113
114 }, 'string|object'),
115
116 changes: function () {
117 return notify.listen()
118 },
119 //add an address to the peer table.
120 add: valid.sync(function (addr, source) {
121 addr = ref.parseAddress(addr)
122 if(!ref.isAddress(addr))
123 throw new Error('not a valid address:' + JSON.stringify(addr))
124 // check that this is a valid address, and not pointing at self.
125
126 if(addr.key === home.key) return
127 if(addr.host === home.host && addr.port === home.port) return
128
129 var f = gossip.get(addr)
130
131 if(!f) {
132 // new peer
133 addr.source = source
134 addr.announcers = 1
135 addr.duration = Stats()
136 peers.push(addr)
137 notify({ type: 'discover', peer: addr, source: source || 'manual' })
138 return addr
139 }
140 //don't count local over and over
141 else if(f.source != 'local')
142 f.announcers ++
143
144 return f
145 }, 'string|object', 'string?'),
146 ping: function (opts) {
147 var timeout = config.timers && config.timers.ping || 5*60e3
148 //between 10 seconds and 30 minutes, default 5 min
149 timeout = Math.max(10e3, Math.min(timeout, 30*60e3))
150 return ping({timeout: timeout})
151 },
152 reconnect: function () {
153 for(var id in server.peers)
154 if(id !== server.id) //don't disconnect local client
155 server.peers[id].forEach(function (peer) {
156 peer.close(true)
157 })
158 return gossip.wakeup = Date.now()
159 }
160 }
161
162 Schedule (gossip, config, server)
163 Init (gossip, config, server)
164 //get current state
165
166 server.on('rpc:connect', function (rpc, isClient) {
167 var peer = getPeer(rpc.id)
168 //don't track clients that connect, but arn't considered peers.
169 //maybe we should though?
170 if(!peer) return
171 console.log('Connected', stringify(peer))
172 //means that we have created this connection, not received it.
173 peer.client = !!isClient
174 peer.state = 'connected'
175 peer.stateChange = Date.now()
176 peer.disconnect = function (err, cb) {
177 if(isFunction(err)) cb = err, err = null
178 rpc.close(err, cb)
179 }
180
181 if(isClient) {
182 //default ping is 5 minutes...
183 var pp = ping({serve: true, timeout: timer_ping}, function (_) {})
184 peer.ping = {rtt: pp.rtt, skew: pp.skew}
185 pull(
186 pp,
187 rpc.gossip.ping({timeout: timer_ping}, function (err) {
188 if(err.name === 'TypeError') peer.ping.fail = true
189 }),
190 pp
191 )
192 }
193
194 rpc.on('closed', function () {
195 console.log('Disconnected', stringify(peer))
196 //track whether we have successfully connected.
197 //or how many failures there have been.
198 var since = peer.stateChange
199 peer.stateChange = Date.now()
200 if(peer.state === 'connected') //may be "disconnecting"
201 peer.duration.value(peer.stateChange - since)
202 peer.state = undefined
203 notify({ type: 'disconnect', peer: peer })
204 server.emit('log:info', ['SBOT', rpc.id, 'disconnect'])
205 })
206
207 notify({ type: 'connect', peer: peer })
208 })
209
210 return gossip
211 }
212}
213

Built with git-ssb-web