diff --git a/zcash/client.go b/zcash/client.go index 7948ddd..2a3a323 100644 --- a/zcash/client.go +++ b/zcash/client.go @@ -163,23 +163,32 @@ func (s *Seeder) ConnectOnDefaultPort(addr string) error { return err } -// Connect attempts to connect to a peer at the given address and port. It -// returns a handle to the peer connection if the connection is successful -// or nil and an error if it fails. +// Connect attempts to connect to a peer at the given address and port. It will +// not connect to addresses known to be unusable. It returns a handle to the peer +// connection if the connection is successful or nil and an error if it fails. func (s *Seeder) Connect(addr, port string) (*peer.Peer, error) { - connectionString := net.JoinHostPort(addr, port) - p, err := peer.NewOutboundPeer(s.config, connectionString) + host := net.JoinHostPort(addr, port) + p, err := peer.NewOutboundPeer(s.config, host) if err != nil { return nil, errors.Wrap(err, "constructing outbound peer") } - // PeerKeys are used in our internal maps to keep signals and responses from specific peers straight. pk := peerKeyFromPeer(p) if s.addrBook.IsBlacklisted(pk) { return nil, ErrBlacklistedPeer } + return s.connect(p) +} + +// connect attempts to connect to a peer at the given address and port. It +// returns a handle to the peer connection if the connection is successful +// or nil and an error if it fails. +func (s *Seeder) connect(p *peer.Peer) (*peer.Peer, error) { + // PeerKeys are used in our internal maps to keep signals and responses from specific peers straight. + pk := peerKeyFromPeer(p) + _, alreadyPending := s.pendingPeers.Load(pk) _, alreadyHandshaking := s.handshakeSignals.Load(pk) _, alreadyLive := s.livePeers.Load(pk) @@ -189,12 +198,14 @@ func (s *Seeder) Connect(addr, port string) (*peer.Peer, error) { return nil, ErrRepeatConnection } s.pendingPeers.Store(pk, p) + defer s.pendingPeers.Delete(pk) if alreadyHandshaking { s.logger.Printf("Peer is already handshaking: %s", p.Addr()) return nil, ErrRepeatConnection } s.handshakeSignals.Store(pk, make(chan struct{}, 1)) + defer s.handshakeSignals.Delete(pk) if alreadyLive { s.logger.Printf("Peer is already live: %s", p.Addr()) @@ -210,19 +221,20 @@ func (s *Seeder) Connect(addr, port string) (*peer.Peer, error) { s.logger.Printf("Handshake initated with peer %s", p.Addr()) p.AssociateConnection(conn) - // Wait for - handshakeChan, _ := s.handshakeSignals.Load(pk) - - select { - case <-handshakeChan.(chan struct{}): - s.logger.Printf("Handshake completed with peer %s", p.Addr()) - s.handshakeSignals.Delete(pk) - return p, nil - case <-time.After(maximumHandshakeWait): - return nil, errors.New("peer handshake started but timed out") + // Wait for it + if handshakeChan, ok := s.handshakeSignals.Load(pk); ok { + select { + case <-handshakeChan.(chan struct{}): + s.logger.Printf("Handshake completed with peer %s", p.Addr()) + return p, nil + case <-time.After(maximumHandshakeWait): + p.Disconnect() + p.WaitForDisconnect() + return nil, errors.New("peer handshake started but timed out") + } } - panic("This should be unreachable") + return nil, errors.New("peer was not in handshake channel") } // GetPeer returns a live peer identified by "host:port" string, or an error if @@ -347,11 +359,6 @@ func (s *Seeder) RequestAddresses() int { continue } - if s.addrBook.IsBlacklisted(potentialPeer) { - s.logger.Printf("Previously blacklisted %s:%d", na.IP, na.Port) - continue - } - portString := strconv.Itoa(int(na.Port)) newPeer, err := s.Connect(na.IP.String(), portString) @@ -371,8 +378,6 @@ func (s *Seeder) RequestAddresses() int { // Ask the newly discovered peer if they know anyone we haven't met yet. newPeer.QueueMessage(wire.NewMsgGetAddr(), nil) - //s.DisconnectPeer(potentialPeer) - s.logger.Printf("Successfully learned about %s:%d.", na.IP, na.Port) atomic.AddInt32(&peerCount, 1) s.addrBook.Add(potentialPeer) @@ -391,12 +396,18 @@ func (s *Seeder) RequestAddresses() int { // true, we immediately disconnect from the peers after verifying them. func (s *Seeder) RefreshAddresses(disconnect bool) { s.logger.Printf("Refreshing address book") + defer s.logger.Printf("RefreshAddresses() finished.") var refreshQueue chan *Address var wg sync.WaitGroup // XXX lil awkward to allocate a channel whose size we can't determine without a lock here s.addrBook.enqueueAddrs(&refreshQueue) + s.logger.Printf("Address book contains %d addresses", len(refreshQueue)) + + if len(refreshQueue) == 0 { + return + } for i := 0; i < crawlerGoroutineCount; i++ { wg.Add(1) @@ -434,19 +445,26 @@ func (s *Seeder) RefreshAddresses(disconnect bool) { } wg.Wait() - s.logger.Printf("RefreshAddresses() finished.") } // RetryBlacklist checks if the addresses in our blacklist are usable again. // If the trial connection succeeds, they're removed from the blacklist. func (s *Seeder) RetryBlacklist() { s.logger.Printf("Giving the blacklist another chance") + defer s.logger.Printf("RetryBlacklist() finished.") var blacklistQueue chan *Address var wg sync.WaitGroup // XXX lil awkward to allocate a channel whose size we can't determine without a lock here s.addrBook.enqueueBlacklist(&blacklistQueue) + s.logger.Printf("Blacklist contains %d addresses", len(blacklistQueue)) + + if len(blacklistQueue) == 0 { + return + } + + var peerCount int32 for i := 0; i < crawlerGoroutineCount; i++ { wg.Add(1) @@ -456,10 +474,16 @@ func (s *Seeder) RetryBlacklist() { next := <-blacklistQueue na := next.netaddr - ipString := na.IP.String() - portString := strconv.Itoa(int(na.Port)) + ip := na.IP.String() + port := strconv.Itoa(int(na.Port)) - _, err := s.Connect(ipString, portString) + // Call internal connect directly to avoid being blocked + host := net.JoinHostPort(ip, port) + p, err := peer.NewOutboundPeer(s.config, host) + if err != nil { + continue + } + _, err = s.connect(p) if err != nil { // Connection failed. Peer remains blacklisted. @@ -467,6 +491,7 @@ func (s *Seeder) RetryBlacklist() { // If we've been retrying for a while, forget about this peer entirely. // This would deadlock if enqueueBlacklist still held the RLock. s.addrBook.DropFromBlacklist(next.asPeerKey()) + s.logger.Printf("Dropping %s from blacklist", next.asPeerKey()) } continue } @@ -475,6 +500,7 @@ func (s *Seeder) RetryBlacklist() { // Remove the peer from the blacklist and add it back to the address book. // This would deadlock if enqueueBlacklist still held the RLock. + atomic.AddInt32(&peerCount, 1) s.addrBook.Redeem(next.asPeerKey()) } wg.Done() @@ -482,7 +508,7 @@ func (s *Seeder) RetryBlacklist() { } wg.Wait() - s.logger.Printf("RetryBlacklist() finished.") + s.logger.Printf("Added %d on retry", peerCount) } // WaitForAddresses waits for n addresses to be confirmed and available in the address book. diff --git a/zcash/client_callbacks.go b/zcash/client_callbacks.go index 60e1403..ba2b44d 100644 --- a/zcash/client_callbacks.go +++ b/zcash/client_callbacks.go @@ -13,7 +13,8 @@ func (s *Seeder) onVerAck(p *peer.Peer, msg *wire.MsgVerAck) { if !ok { s.logger.Printf("Got verack from unexpected peer %s", p.Addr()) - // TODO: probably want to disconnect from the peer sending us out-of-order veracks + // Disconnect from the peer sending us out-of-order veracks + s.DisconnectPeer(pk) return }