Added timeout for attempt to reconnect to staking peers

This commit is contained in:
StephenButtolph 2020-04-03 19:14:13 -04:00
parent e69449e20b
commit f093cb58dd
1 changed files with 81 additions and 52 deletions

View File

@ -54,6 +54,13 @@ Periodically gossip peerlists.
stakers should be in the set).
*/
/*
Attempt reconnections
- If a non-staker disconnects, delete the connection
- If a staker disconnects, attempt to reconnect to the node for awhile. If the
node isn't connected to after awhile delete the connection.
*/
const (
// CurrentVersion this avalanche instance is executing.
CurrentVersion = "avalanche/0.0.1"
@ -70,6 +77,9 @@ const (
// GetVersionTimeout is the amount of time to wait before sending a
// getVersion message to a partially connected peer
GetVersionTimeout = 2 * time.Second
// ReconnectTimeout is the amount of time to wait to reconnect to a staker
// before giving up
ReconnectTimeout = 1 * time.Minute
)
// Manager is the struct that will be accessed on event calls
@ -100,6 +110,7 @@ type Handshake struct {
connections AddrCert // Connections that I think are connected
versionTimeout timer.TimeoutManager
reconnectTimeout timer.TimeoutManager
peerListGossiper *timer.Repeater
awaitingLock sync.Mutex
@ -143,6 +154,10 @@ func (nm *Handshake) Initialize(
nm.versionTimeout.Initialize(GetVersionTimeout)
go nm.log.RecoverAndPanic(nm.versionTimeout.Dispatch)
nm.reconnectTimeout.Initialize(ReconnectTimeout)
go nm.log.RecoverAndPanic(nm.reconnectTimeout.Dispatch)
nm.peerListGossiper = timer.NewRepeater(nm.gossipPeerList, PeerListGossipSpacing)
go nm.log.RecoverAndPanic(nm.peerListGossiper.Dispatch)
}
@ -290,6 +305,69 @@ func checkPeerCertificate(_ *C.struct_msgnetwork_conn_t, connected C.bool, _ uns
return connected
}
func (nm *Handshake) connectedToPeer(conn *C.struct_peernetwork_conn_t, addr salticidae.NetAddr) {
ip := toIPDesc(addr)
// If we're enforcing staking, use a peer's certificate to uniquely identify them
// Otherwise, use a hash of their ip to identify them
cert := ids.ShortID{}
if nm.enableStaking {
cert = getPeerCert(conn)
} else {
cert = toShortID(ip)
}
nm.log.Debug("Connected to %s", ip)
nm.reconnectTimeout.Remove(cert.LongID())
nm.pending.Add(addr, cert)
certID := cert.LongID()
handler := new(func())
*handler = func() {
if nm.pending.ContainsIP(addr) {
nm.SendGetVersion(addr)
nm.versionTimeout.Put(certID, *handler)
}
}
(*handler)()
}
func (nm *Handshake) disconnectedFromPeer(addr salticidae.NetAddr) {
cert := ids.ShortID{}
if pendingCert, exists := nm.pending.GetID(addr); exists {
cert = pendingCert
} else if connectedCert, exists := nm.connections.GetID(addr); exists {
cert = connectedCert
} else {
return
}
nm.log.Info("Disconnected from %s", toIPDesc(addr))
if nm.vdrs.Contains(cert) {
nm.reconnectTimeout.Put(cert.LongID(), func() {
nm.net.DelPeer(addr)
})
} else {
nm.net.DelPeer(addr)
}
if !nm.enableStaking {
nm.vdrs.Remove(cert)
}
nm.pending.RemoveIP(addr)
nm.connections.RemoveIP(addr)
nm.numPeers.Set(float64(nm.connections.Len()))
nm.awaitingLock.Lock()
defer nm.awaitingLock.Unlock()
for _, awaiting := range HandshakeNet.awaiting {
awaiting.Remove(cert)
}
}
// peerHandler notifies a change to the set of connected peers
// connected is true if a new peer is connected
// connected is false if a formerly connected peer has disconnected
@ -298,60 +376,11 @@ func peerHandler(_conn *C.struct_peernetwork_conn_t, connected C.bool, _ unsafe.
pConn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn))
addr := pConn.GetPeerAddr(true)
ip := toIPDesc(addr)
if !connected {
if !HandshakeNet.enableStaking {
cert := toShortID(ip)
HandshakeNet.vdrs.Remove(cert)
}
cert := ids.ShortID{}
if pendingCert, exists := HandshakeNet.pending.GetID(addr); exists {
cert = pendingCert
} else if connectedCert, exists := HandshakeNet.connections.GetID(addr); exists {
cert = connectedCert
} else {
return
}
HandshakeNet.pending.RemoveIP(addr)
HandshakeNet.connections.RemoveIP(addr)
HandshakeNet.numPeers.Set(float64(HandshakeNet.connections.Len()))
HandshakeNet.log.Warn("Disconnected from %s", ip)
HandshakeNet.awaitingLock.Lock()
defer HandshakeNet.awaitingLock.Unlock()
for _, awaiting := range HandshakeNet.awaiting {
awaiting.Remove(cert)
}
return
}
HandshakeNet.log.Debug("Connected to %s", ip)
// If we're enforcing staking, use a peer's certificate to uniquely identify them
// Otherwise, use a hash of their ip to identify them
cert := ids.ShortID{}
if HandshakeNet.enableStaking {
cert = getPeerCert(_conn)
if connected {
HandshakeNet.connectedToPeer(_conn, addr)
} else {
cert = toShortID(ip)
HandshakeNet.disconnectedFromPeer(addr)
}
HandshakeNet.pending.Add(addr, cert)
certID := cert.LongID()
handler := new(func())
*handler = func() {
if HandshakeNet.pending.ContainsIP(addr) {
HandshakeNet.SendGetVersion(addr)
HandshakeNet.versionTimeout.Put(certID, *handler)
}
}
(*handler)()
}
// unknownPeerHandler notifies of an unknown peer connection attempt