dnsseed,zcash: fix crawler deadlocks and increase buffer sizes

This commit is contained in:
George Tankersley 2020-05-22 17:33:54 -04:00
parent 814f8cb850
commit 0633d8a54b
3 changed files with 51 additions and 43 deletions

View File

@ -57,23 +57,25 @@ func setup(c *caddy.Controller) error {
return plugin.Error("dnsseed", err) return plugin.Error("dnsseed", err)
} }
// TODO load from storage if we already know some peers
// Send the initial request for more addresses; spawns goroutines to process the responses.
// Ready() will flip to true once we've received and confirmed at least 10 peers.
log.Infof("Getting addresses from bootstrap peer %s:%s", address, port)
// Connect to the bootstrap peer // Connect to the bootstrap peer
err = seeder.Connect(address, port) err = seeder.Connect(address, port)
if err != nil { if err != nil {
return plugin.Error("dnsseed", err) return plugin.Error("dnsseed", err)
} }
// Send the initial request for more addresses; spawns goroutines to process the responses. seeder.RequestAddresses()
// Ready() will flip to true once we've received and confirmed at least 10 peers. seeder.DisconnectAllPeers()
go func() {
// TODO load from storage if we already know some peers
log.Infof("Getting addresses from bootstrap peer %s:%s", address, port)
seeder.RequestAddresses()
runCrawl(seeder)
}()
// Start the update timer
// Start the update timer
go func() { go func() {
log.Infof("Starting update timer. Will crawl every %.0f minutes.", updateInterval.Minutes())
for { for {
select { select {
case <-time.After(updateInterval): case <-time.After(updateInterval):

View File

@ -157,15 +157,14 @@ func (bk *AddressBook) IsBlacklisted(s PeerKey) bool {
return blacklisted return blacklisted
} }
// enqueueAddrs sends all of our known valid peers to a channel for processing // enqueueAddrs puts all of our known valid peers to a channel for processing.
// and adds the count to a WaitGroup counter to aid processing. func (bk *AddressBook) enqueueAddrs(addrQueue *chan *Address) {
func (bk *AddressBook) enqueueAddrs(addrQueue chan *wire.NetAddress, count *sync.WaitGroup) {
bk.addrState.RLock() bk.addrState.RLock()
defer bk.addrState.RUnlock() defer bk.addrState.RUnlock()
count.Add(len(bk.peers)) *addrQueue = make(chan *Address, len(bk.peers))
for _, v := range bk.peers { for _, v := range bk.peers {
addrQueue <- v.netaddr *addrQueue <- v
} }
} }

View File

@ -45,11 +45,14 @@ var (
// The timeout for the underlying dial to a peer // The timeout for the underlying dial to a peer
connectionDialTimeout = 1 * time.Second connectionDialTimeout = 1 * time.Second
// The amount of time crawler goroutines will wait for incoming addresses after a RequestAddresses() // The amount of time crawler goroutines will wait after the last new incoming address
crawlerThreadTimeout = 30 * time.Second crawlerThreadTimeout = 30 * time.Second
// The number of goroutines to spawn for a crawl request // The number of goroutines to spawn for a crawl request
crawlerGoroutineCount = runtime.NumCPU() * 16 crawlerGoroutineCount = runtime.NumCPU() * 32
// The amount of space we allocate to keep things moving smoothly.
incomingAddressBufferSize = 1024
) )
// Seeder contains all of the state and configuration needed to request addresses from Zcash peers and present them to a DNS provider. // Seeder contains all of the state and configuration needed to request addresses from Zcash peers and present them to a DNS provider.
@ -88,7 +91,7 @@ func NewSeeder(network network.Network) (*Seeder, error) {
pendingPeers: NewPeerMap(), pendingPeers: NewPeerMap(),
livePeers: NewPeerMap(), livePeers: NewPeerMap(),
addrBook: NewAddressBook(), addrBook: NewAddressBook(),
addrQueue: make(chan *wire.NetAddress, 512), addrQueue: make(chan *wire.NetAddress, incomingAddressBufferSize),
} }
newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck
@ -117,7 +120,7 @@ func newTestSeeder(network network.Network) (*Seeder, error) {
pendingPeers: NewPeerMap(), pendingPeers: NewPeerMap(),
livePeers: NewPeerMap(), livePeers: NewPeerMap(),
addrBook: NewAddressBook(), addrBook: NewAddressBook(),
addrQueue: make(chan *wire.NetAddress, 512), addrQueue: make(chan *wire.NetAddress, incomingAddressBufferSize),
} }
newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck
@ -193,11 +196,11 @@ func (s *Seeder) Connect(addr, port string) error {
conn, err := net.DialTimeout("tcp", p.Addr(), connectionDialTimeout) conn, err := net.DialTimeout("tcp", p.Addr(), connectionDialTimeout)
if err != nil { if err != nil {
return errors.Wrap(err, "dialing new peer address") return errors.Wrap(err, "dialing peer address")
} }
// Begin connection negotiation. // Begin connection negotiation.
s.logger.Printf("Handshake initated with new peer %s", p.Addr()) s.logger.Printf("Handshake initated with peer %s", p.Addr())
p.AssociateConnection(conn) p.AssociateConnection(conn)
// Wait for // Wait for
@ -205,7 +208,7 @@ func (s *Seeder) Connect(addr, port string) error {
select { select {
case <-handshakeChan.(chan struct{}): case <-handshakeChan.(chan struct{}):
s.logger.Printf("Handshake completed with new peer %s", p.Addr()) s.logger.Printf("Handshake completed with peer %s", p.Addr())
s.handshakeSignals.Delete(pk) s.handshakeSignals.Delete(pk)
return nil return nil
case <-time.After(maximumHandshakeWait): case <-time.After(maximumHandshakeWait):
@ -307,6 +310,8 @@ func (s *Seeder) RequestAddresses() int {
for i := 0; i < crawlerGoroutineCount; i++ { for i := 0; i < crawlerGoroutineCount; i++ {
go func() { go func() {
defer wg.Done()
var na *wire.NetAddress var na *wire.NetAddress
for { for {
select { select {
@ -315,7 +320,6 @@ func (s *Seeder) RequestAddresses() int {
na = next na = next
case <-time.After(crawlerThreadTimeout): case <-time.After(crawlerThreadTimeout):
// Or die if there wasn't one // Or die if there wasn't one
wg.Done()
return return
} }
@ -367,6 +371,7 @@ func (s *Seeder) RequestAddresses() int {
} }
wg.Wait() wg.Wait()
s.logger.Printf("RequestAddresses() finished.")
return int(peerCount) return int(peerCount)
} }
@ -375,48 +380,50 @@ func (s *Seeder) RequestAddresses() int {
// The call blocks until all addresses have been processed. If disconnect is // The call blocks until all addresses have been processed. If disconnect is
// true, we immediately disconnect from the peers after verifying them. // true, we immediately disconnect from the peers after verifying them.
func (s *Seeder) RefreshAddresses(disconnect bool) { func (s *Seeder) RefreshAddresses(disconnect bool) {
refreshQueue := make(chan *wire.NetAddress, 100) s.logger.Printf("Refreshing address book")
var count sync.WaitGroup
go s.addrBook.enqueueAddrs(refreshQueue, &count) var refreshQueue chan *Address
var wg sync.WaitGroup
// XXX lil awkward to allocate a channel whose size we can't determine without a lock here
s.addrBook.enqueueAddrs(&refreshQueue)
for i := 0; i < crawlerGoroutineCount; i++ { for i := 0; i < crawlerGoroutineCount; i++ {
wg.Add(1)
go func() { go func() {
var na *wire.NetAddress for len(refreshQueue) > 0 {
for { // Pull the next address off the queue
select { next := <-refreshQueue
case next := <-refreshQueue: na := next.netaddr
// Pull the next address off the queue
na = next
case <-time.After(crawlerThreadTimeout):
// Or die if there wasn't one
return
}
peer := peerKeyFromNA(na) ipString := na.IP.String()
portString := strconv.Itoa(int(na.Port)) portString := strconv.Itoa(int(na.Port))
err := s.Connect(na.IP.String(), portString) err := s.Connect(ipString, portString)
if err != nil { if err != nil {
if err != ErrRepeatConnection { if err != ErrRepeatConnection {
// Blacklist the peer. TODO: We might try to connect again later.
s.logger.Printf("Peer %s:%d unusable on refresh. Error: %s", na.IP, na.Port, err) s.logger.Printf("Peer %s:%d unusable on refresh. Error: %s", na.IP, na.Port, err)
s.addrBook.Blacklist(peer) // Blacklist the peer. We might try to connect again later.
// This would deadlock if enqueueAddrs still holds the RLock,
// hence the awkward channel allocation above.
s.addrBook.Blacklist(next.asPeerKey())
} }
count.Done()
continue continue
} }
if disconnect { if disconnect {
s.DisconnectPeer(peer) s.DisconnectPeer(next.asPeerKey())
} }
count.Done()
s.logger.Printf("Validated %s", na.IP)
} }
wg.Done()
}() }()
} }
count.Wait() wg.Wait()
s.logger.Printf("RefreshAddresses() finished.")
} }
// WaitForAddresses waits for n addresses to be confirmed and available in the address book. // WaitForAddresses waits for n addresses to be confirmed and available in the address book.