From 0633d8a54b86696d7459e36ca10f3a5ab6060a48 Mon Sep 17 00:00:00 2001 From: George Tankersley Date: Fri, 22 May 2020 17:33:54 -0400 Subject: [PATCH] dnsseed,zcash: fix crawler deadlocks and increase buffer sizes --- dnsseed/setup.go | 20 +++++++------ zcash/address_book.go | 9 +++--- zcash/client.go | 65 ++++++++++++++++++++++++------------------- 3 files changed, 51 insertions(+), 43 deletions(-) diff --git a/dnsseed/setup.go b/dnsseed/setup.go index 4365cad..5a23a3c 100644 --- a/dnsseed/setup.go +++ b/dnsseed/setup.go @@ -57,23 +57,25 @@ func setup(c *caddy.Controller) error { return plugin.Error("dnsseed", err) } + // TODO load from storage if we already know some peers + + // Send the initial request for more addresses; spawns goroutines to process the responses. + // Ready() will flip to true once we've received and confirmed at least 10 peers. + + log.Infof("Getting addresses from bootstrap peer %s:%s", address, port) + // Connect to the bootstrap peer err = seeder.Connect(address, port) if err != nil { return plugin.Error("dnsseed", err) } - // Send the initial request for more addresses; spawns goroutines to process the responses. - // Ready() will flip to true once we've received and confirmed at least 10 peers. - go func() { - // TODO load from storage if we already know some peers - log.Infof("Getting addresses from bootstrap peer %s:%s", address, port) - seeder.RequestAddresses() - runCrawl(seeder) - }() - // Start the update timer + seeder.RequestAddresses() + seeder.DisconnectAllPeers() + // Start the update timer go func() { + log.Infof("Starting update timer. Will crawl every %.0f minutes.", updateInterval.Minutes()) for { select { case <-time.After(updateInterval): diff --git a/zcash/address_book.go b/zcash/address_book.go index f619891..861dca2 100644 --- a/zcash/address_book.go +++ b/zcash/address_book.go @@ -157,15 +157,14 @@ func (bk *AddressBook) IsBlacklisted(s PeerKey) bool { return blacklisted } -// enqueueAddrs sends all of our known valid peers to a channel for processing -// and adds the count to a WaitGroup counter to aid processing. -func (bk *AddressBook) enqueueAddrs(addrQueue chan *wire.NetAddress, count *sync.WaitGroup) { +// enqueueAddrs puts all of our known valid peers to a channel for processing. +func (bk *AddressBook) enqueueAddrs(addrQueue *chan *Address) { bk.addrState.RLock() defer bk.addrState.RUnlock() - count.Add(len(bk.peers)) + *addrQueue = make(chan *Address, len(bk.peers)) for _, v := range bk.peers { - addrQueue <- v.netaddr + *addrQueue <- v } } diff --git a/zcash/client.go b/zcash/client.go index 356900a..368d1bc 100644 --- a/zcash/client.go +++ b/zcash/client.go @@ -45,11 +45,14 @@ var ( // The timeout for the underlying dial to a peer connectionDialTimeout = 1 * time.Second - // The amount of time crawler goroutines will wait for incoming addresses after a RequestAddresses() + // The amount of time crawler goroutines will wait after the last new incoming address crawlerThreadTimeout = 30 * time.Second // The number of goroutines to spawn for a crawl request - crawlerGoroutineCount = runtime.NumCPU() * 16 + crawlerGoroutineCount = runtime.NumCPU() * 32 + + // The amount of space we allocate to keep things moving smoothly. + incomingAddressBufferSize = 1024 ) // Seeder contains all of the state and configuration needed to request addresses from Zcash peers and present them to a DNS provider. @@ -88,7 +91,7 @@ func NewSeeder(network network.Network) (*Seeder, error) { pendingPeers: NewPeerMap(), livePeers: NewPeerMap(), addrBook: NewAddressBook(), - addrQueue: make(chan *wire.NetAddress, 512), + addrQueue: make(chan *wire.NetAddress, incomingAddressBufferSize), } newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck @@ -117,7 +120,7 @@ func newTestSeeder(network network.Network) (*Seeder, error) { pendingPeers: NewPeerMap(), livePeers: NewPeerMap(), addrBook: NewAddressBook(), - addrQueue: make(chan *wire.NetAddress, 512), + addrQueue: make(chan *wire.NetAddress, incomingAddressBufferSize), } newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck @@ -193,11 +196,11 @@ func (s *Seeder) Connect(addr, port string) error { conn, err := net.DialTimeout("tcp", p.Addr(), connectionDialTimeout) if err != nil { - return errors.Wrap(err, "dialing new peer address") + return errors.Wrap(err, "dialing peer address") } // Begin connection negotiation. - s.logger.Printf("Handshake initated with new peer %s", p.Addr()) + s.logger.Printf("Handshake initated with peer %s", p.Addr()) p.AssociateConnection(conn) // Wait for @@ -205,7 +208,7 @@ func (s *Seeder) Connect(addr, port string) error { select { case <-handshakeChan.(chan struct{}): - s.logger.Printf("Handshake completed with new peer %s", p.Addr()) + s.logger.Printf("Handshake completed with peer %s", p.Addr()) s.handshakeSignals.Delete(pk) return nil case <-time.After(maximumHandshakeWait): @@ -307,6 +310,8 @@ func (s *Seeder) RequestAddresses() int { for i := 0; i < crawlerGoroutineCount; i++ { go func() { + defer wg.Done() + var na *wire.NetAddress for { select { @@ -315,7 +320,6 @@ func (s *Seeder) RequestAddresses() int { na = next case <-time.After(crawlerThreadTimeout): // Or die if there wasn't one - wg.Done() return } @@ -367,6 +371,7 @@ func (s *Seeder) RequestAddresses() int { } wg.Wait() + s.logger.Printf("RequestAddresses() finished.") return int(peerCount) } @@ -375,48 +380,50 @@ func (s *Seeder) RequestAddresses() int { // The call blocks until all addresses have been processed. If disconnect is // true, we immediately disconnect from the peers after verifying them. func (s *Seeder) RefreshAddresses(disconnect bool) { - refreshQueue := make(chan *wire.NetAddress, 100) - var count sync.WaitGroup + s.logger.Printf("Refreshing address book") - go s.addrBook.enqueueAddrs(refreshQueue, &count) + var refreshQueue chan *Address + var wg sync.WaitGroup + + // XXX lil awkward to allocate a channel whose size we can't determine without a lock here + s.addrBook.enqueueAddrs(&refreshQueue) for i := 0; i < crawlerGoroutineCount; i++ { + wg.Add(1) go func() { - var na *wire.NetAddress - for { - select { - case next := <-refreshQueue: - // Pull the next address off the queue - na = next - case <-time.After(crawlerThreadTimeout): - // Or die if there wasn't one - return - } + for len(refreshQueue) > 0 { + // Pull the next address off the queue + next := <-refreshQueue + na := next.netaddr - peer := peerKeyFromNA(na) + ipString := na.IP.String() portString := strconv.Itoa(int(na.Port)) - err := s.Connect(na.IP.String(), portString) + err := s.Connect(ipString, portString) if err != nil { if err != ErrRepeatConnection { - // Blacklist the peer. TODO: We might try to connect again later. s.logger.Printf("Peer %s:%d unusable on refresh. Error: %s", na.IP, na.Port, err) - s.addrBook.Blacklist(peer) + // Blacklist the peer. We might try to connect again later. + // This would deadlock if enqueueAddrs still holds the RLock, + // hence the awkward channel allocation above. + s.addrBook.Blacklist(next.asPeerKey()) } - count.Done() continue } if disconnect { - s.DisconnectPeer(peer) + s.DisconnectPeer(next.asPeerKey()) } - count.Done() + + s.logger.Printf("Validated %s", na.IP) } + wg.Done() }() } - count.Wait() + wg.Wait() + s.logger.Printf("RefreshAddresses() finished.") } // WaitForAddresses waits for n addresses to be confirmed and available in the address book.