From f093cb58dd653280e65f0b81e4c371a5f45d0323 Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Fri, 3 Apr 2020 19:14:13 -0400 Subject: [PATCH] Added timeout for attempt to reconnect to staking peers --- networking/handshake_handlers.go | 133 +++++++++++++++++++------------ 1 file changed, 81 insertions(+), 52 deletions(-) diff --git a/networking/handshake_handlers.go b/networking/handshake_handlers.go index 83d1348..6836f20 100644 --- a/networking/handshake_handlers.go +++ b/networking/handshake_handlers.go @@ -54,6 +54,13 @@ Periodically gossip peerlists. stakers should be in the set). */ +/* +Attempt reconnections + - If a non-staker disconnects, delete the connection + - If a staker disconnects, attempt to reconnect to the node for awhile. If the + node isn't connected to after awhile delete the connection. +*/ + const ( // CurrentVersion this avalanche instance is executing. CurrentVersion = "avalanche/0.0.1" @@ -70,6 +77,9 @@ const ( // GetVersionTimeout is the amount of time to wait before sending a // getVersion message to a partially connected peer GetVersionTimeout = 2 * time.Second + // ReconnectTimeout is the amount of time to wait to reconnect to a staker + // before giving up + ReconnectTimeout = 1 * time.Minute ) // Manager is the struct that will be accessed on event calls @@ -100,6 +110,7 @@ type Handshake struct { connections AddrCert // Connections that I think are connected versionTimeout timer.TimeoutManager + reconnectTimeout timer.TimeoutManager peerListGossiper *timer.Repeater awaitingLock sync.Mutex @@ -143,6 +154,10 @@ func (nm *Handshake) Initialize( nm.versionTimeout.Initialize(GetVersionTimeout) go nm.log.RecoverAndPanic(nm.versionTimeout.Dispatch) + + nm.reconnectTimeout.Initialize(ReconnectTimeout) + go nm.log.RecoverAndPanic(nm.reconnectTimeout.Dispatch) + nm.peerListGossiper = timer.NewRepeater(nm.gossipPeerList, PeerListGossipSpacing) go nm.log.RecoverAndPanic(nm.peerListGossiper.Dispatch) } @@ -290,6 +305,69 @@ func checkPeerCertificate(_ *C.struct_msgnetwork_conn_t, connected C.bool, _ uns return connected } +func (nm *Handshake) connectedToPeer(conn *C.struct_peernetwork_conn_t, addr salticidae.NetAddr) { + ip := toIPDesc(addr) + // If we're enforcing staking, use a peer's certificate to uniquely identify them + // Otherwise, use a hash of their ip to identify them + cert := ids.ShortID{} + if nm.enableStaking { + cert = getPeerCert(conn) + } else { + cert = toShortID(ip) + } + + nm.log.Debug("Connected to %s", ip) + + nm.reconnectTimeout.Remove(cert.LongID()) + + nm.pending.Add(addr, cert) + + certID := cert.LongID() + handler := new(func()) + *handler = func() { + if nm.pending.ContainsIP(addr) { + nm.SendGetVersion(addr) + nm.versionTimeout.Put(certID, *handler) + } + } + (*handler)() +} + +func (nm *Handshake) disconnectedFromPeer(addr salticidae.NetAddr) { + cert := ids.ShortID{} + if pendingCert, exists := nm.pending.GetID(addr); exists { + cert = pendingCert + } else if connectedCert, exists := nm.connections.GetID(addr); exists { + cert = connectedCert + } else { + return + } + + nm.log.Info("Disconnected from %s", toIPDesc(addr)) + + if nm.vdrs.Contains(cert) { + nm.reconnectTimeout.Put(cert.LongID(), func() { + nm.net.DelPeer(addr) + }) + } else { + nm.net.DelPeer(addr) + } + + if !nm.enableStaking { + nm.vdrs.Remove(cert) + } + + nm.pending.RemoveIP(addr) + nm.connections.RemoveIP(addr) + nm.numPeers.Set(float64(nm.connections.Len())) + + nm.awaitingLock.Lock() + defer nm.awaitingLock.Unlock() + for _, awaiting := range HandshakeNet.awaiting { + awaiting.Remove(cert) + } +} + // peerHandler notifies a change to the set of connected peers // connected is true if a new peer is connected // connected is false if a formerly connected peer has disconnected @@ -298,60 +376,11 @@ func peerHandler(_conn *C.struct_peernetwork_conn_t, connected C.bool, _ unsafe. pConn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn)) addr := pConn.GetPeerAddr(true) - ip := toIPDesc(addr) - if !connected { - if !HandshakeNet.enableStaking { - cert := toShortID(ip) - HandshakeNet.vdrs.Remove(cert) - } - - cert := ids.ShortID{} - if pendingCert, exists := HandshakeNet.pending.GetID(addr); exists { - cert = pendingCert - } else if connectedCert, exists := HandshakeNet.connections.GetID(addr); exists { - cert = connectedCert - } else { - return - } - - HandshakeNet.pending.RemoveIP(addr) - HandshakeNet.connections.RemoveIP(addr) - - HandshakeNet.numPeers.Set(float64(HandshakeNet.connections.Len())) - - HandshakeNet.log.Warn("Disconnected from %s", ip) - - HandshakeNet.awaitingLock.Lock() - defer HandshakeNet.awaitingLock.Unlock() - - for _, awaiting := range HandshakeNet.awaiting { - awaiting.Remove(cert) - } - - return - } - - HandshakeNet.log.Debug("Connected to %s", ip) - - // If we're enforcing staking, use a peer's certificate to uniquely identify them - // Otherwise, use a hash of their ip to identify them - cert := ids.ShortID{} - if HandshakeNet.enableStaking { - cert = getPeerCert(_conn) + if connected { + HandshakeNet.connectedToPeer(_conn, addr) } else { - cert = toShortID(ip) + HandshakeNet.disconnectedFromPeer(addr) } - HandshakeNet.pending.Add(addr, cert) - - certID := cert.LongID() - handler := new(func()) - *handler = func() { - if HandshakeNet.pending.ContainsIP(addr) { - HandshakeNet.SendGetVersion(addr) - HandshakeNet.versionTimeout.Put(certID, *handler) - } - } - (*handler)() } // unknownPeerHandler notifies of an unknown peer connection attempt