zcash: fix retry logic

Previously, when a request timed out, that peer would not be removed
from the pending handshake signal map. Since the peer was still in the
map, any attempt to reconnect would fail since it would look like we
were already waiting on a response.
This commit is contained in:
George Tankersley 2020-06-08 16:48:53 -04:00
parent 1ec079a0fe
commit 16d803bb86
2 changed files with 57 additions and 30 deletions

View File

@ -163,23 +163,32 @@ func (s *Seeder) ConnectOnDefaultPort(addr string) error {
return err
}
// Connect attempts to connect to a peer at the given address and port. It
// returns a handle to the peer connection if the connection is successful
// or nil and an error if it fails.
// Connect attempts to connect to a peer at the given address and port. It will
// not connect to addresses known to be unusable. It returns a handle to the peer
// connection if the connection is successful or nil and an error if it fails.
func (s *Seeder) Connect(addr, port string) (*peer.Peer, error) {
connectionString := net.JoinHostPort(addr, port)
p, err := peer.NewOutboundPeer(s.config, connectionString)
host := net.JoinHostPort(addr, port)
p, err := peer.NewOutboundPeer(s.config, host)
if err != nil {
return nil, errors.Wrap(err, "constructing outbound peer")
}
// PeerKeys are used in our internal maps to keep signals and responses from specific peers straight.
pk := peerKeyFromPeer(p)
if s.addrBook.IsBlacklisted(pk) {
return nil, ErrBlacklistedPeer
}
return s.connect(p)
}
// connect attempts to connect to a peer at the given address and port. It
// returns a handle to the peer connection if the connection is successful
// or nil and an error if it fails.
func (s *Seeder) connect(p *peer.Peer) (*peer.Peer, error) {
// PeerKeys are used in our internal maps to keep signals and responses from specific peers straight.
pk := peerKeyFromPeer(p)
_, alreadyPending := s.pendingPeers.Load(pk)
_, alreadyHandshaking := s.handshakeSignals.Load(pk)
_, alreadyLive := s.livePeers.Load(pk)
@ -189,12 +198,14 @@ func (s *Seeder) Connect(addr, port string) (*peer.Peer, error) {
return nil, ErrRepeatConnection
}
s.pendingPeers.Store(pk, p)
defer s.pendingPeers.Delete(pk)
if alreadyHandshaking {
s.logger.Printf("Peer is already handshaking: %s", p.Addr())
return nil, ErrRepeatConnection
}
s.handshakeSignals.Store(pk, make(chan struct{}, 1))
defer s.handshakeSignals.Delete(pk)
if alreadyLive {
s.logger.Printf("Peer is already live: %s", p.Addr())
@ -210,19 +221,20 @@ func (s *Seeder) Connect(addr, port string) (*peer.Peer, error) {
s.logger.Printf("Handshake initated with peer %s", p.Addr())
p.AssociateConnection(conn)
// Wait for
handshakeChan, _ := s.handshakeSignals.Load(pk)
select {
case <-handshakeChan.(chan struct{}):
s.logger.Printf("Handshake completed with peer %s", p.Addr())
s.handshakeSignals.Delete(pk)
return p, nil
case <-time.After(maximumHandshakeWait):
return nil, errors.New("peer handshake started but timed out")
// Wait for it
if handshakeChan, ok := s.handshakeSignals.Load(pk); ok {
select {
case <-handshakeChan.(chan struct{}):
s.logger.Printf("Handshake completed with peer %s", p.Addr())
return p, nil
case <-time.After(maximumHandshakeWait):
p.Disconnect()
p.WaitForDisconnect()
return nil, errors.New("peer handshake started but timed out")
}
}
panic("This should be unreachable")
return nil, errors.New("peer was not in handshake channel")
}
// GetPeer returns a live peer identified by "host:port" string, or an error if
@ -347,11 +359,6 @@ func (s *Seeder) RequestAddresses() int {
continue
}
if s.addrBook.IsBlacklisted(potentialPeer) {
s.logger.Printf("Previously blacklisted %s:%d", na.IP, na.Port)
continue
}
portString := strconv.Itoa(int(na.Port))
newPeer, err := s.Connect(na.IP.String(), portString)
@ -371,8 +378,6 @@ func (s *Seeder) RequestAddresses() int {
// Ask the newly discovered peer if they know anyone we haven't met yet.
newPeer.QueueMessage(wire.NewMsgGetAddr(), nil)
//s.DisconnectPeer(potentialPeer)
s.logger.Printf("Successfully learned about %s:%d.", na.IP, na.Port)
atomic.AddInt32(&peerCount, 1)
s.addrBook.Add(potentialPeer)
@ -391,12 +396,18 @@ func (s *Seeder) RequestAddresses() int {
// true, we immediately disconnect from the peers after verifying them.
func (s *Seeder) RefreshAddresses(disconnect bool) {
s.logger.Printf("Refreshing address book")
defer s.logger.Printf("RefreshAddresses() finished.")
var refreshQueue chan *Address
var wg sync.WaitGroup
// XXX lil awkward to allocate a channel whose size we can't determine without a lock here
s.addrBook.enqueueAddrs(&refreshQueue)
s.logger.Printf("Address book contains %d addresses", len(refreshQueue))
if len(refreshQueue) == 0 {
return
}
for i := 0; i < crawlerGoroutineCount; i++ {
wg.Add(1)
@ -434,19 +445,26 @@ func (s *Seeder) RefreshAddresses(disconnect bool) {
}
wg.Wait()
s.logger.Printf("RefreshAddresses() finished.")
}
// RetryBlacklist checks if the addresses in our blacklist are usable again.
// If the trial connection succeeds, they're removed from the blacklist.
func (s *Seeder) RetryBlacklist() {
s.logger.Printf("Giving the blacklist another chance")
defer s.logger.Printf("RetryBlacklist() finished.")
var blacklistQueue chan *Address
var wg sync.WaitGroup
// XXX lil awkward to allocate a channel whose size we can't determine without a lock here
s.addrBook.enqueueBlacklist(&blacklistQueue)
s.logger.Printf("Blacklist contains %d addresses", len(blacklistQueue))
if len(blacklistQueue) == 0 {
return
}
var peerCount int32
for i := 0; i < crawlerGoroutineCount; i++ {
wg.Add(1)
@ -456,10 +474,16 @@ func (s *Seeder) RetryBlacklist() {
next := <-blacklistQueue
na := next.netaddr
ipString := na.IP.String()
portString := strconv.Itoa(int(na.Port))
ip := na.IP.String()
port := strconv.Itoa(int(na.Port))
_, err := s.Connect(ipString, portString)
// Call internal connect directly to avoid being blocked
host := net.JoinHostPort(ip, port)
p, err := peer.NewOutboundPeer(s.config, host)
if err != nil {
continue
}
_, err = s.connect(p)
if err != nil {
// Connection failed. Peer remains blacklisted.
@ -467,6 +491,7 @@ func (s *Seeder) RetryBlacklist() {
// If we've been retrying for a while, forget about this peer entirely.
// This would deadlock if enqueueBlacklist still held the RLock.
s.addrBook.DropFromBlacklist(next.asPeerKey())
s.logger.Printf("Dropping %s from blacklist", next.asPeerKey())
}
continue
}
@ -475,6 +500,7 @@ func (s *Seeder) RetryBlacklist() {
// Remove the peer from the blacklist and add it back to the address book.
// This would deadlock if enqueueBlacklist still held the RLock.
atomic.AddInt32(&peerCount, 1)
s.addrBook.Redeem(next.asPeerKey())
}
wg.Done()
@ -482,7 +508,7 @@ func (s *Seeder) RetryBlacklist() {
}
wg.Wait()
s.logger.Printf("RetryBlacklist() finished.")
s.logger.Printf("Added %d on retry", peerCount)
}
// WaitForAddresses waits for n addresses to be confirmed and available in the address book.

View File

@ -13,7 +13,8 @@ func (s *Seeder) onVerAck(p *peer.Peer, msg *wire.MsgVerAck) {
if !ok {
s.logger.Printf("Got verack from unexpected peer %s", p.Addr())
// TODO: probably want to disconnect from the peer sending us out-of-order veracks
// Disconnect from the peer sending us out-of-order veracks
s.DisconnectPeer(pk)
return
}