From 814f8cb8502d0208f08886d5becdc6618d59d900 Mon Sep 17 00:00:00 2001 From: George Tankersley Date: Fri, 22 May 2020 01:02:32 -0400 Subject: [PATCH] dnsseed,zcash: implement slow-motion recursive crawling This will do one hop of additional graph crawl every time the update interval fires. It will blacklist peers who are not consistently available and over time should converge to the set of stable peers suitable for DNS bootstrapping. --- dnsseed/setup.go | 37 +++++++++++++---- zcash/address_book.go | 26 ++++++++++-- zcash/client.go | 87 +++++++++++++++++++++++++++++++++++---- zcash/client_callbacks.go | 9 +++- zcash/client_test.go | 4 +- 5 files changed, 142 insertions(+), 21 deletions(-) diff --git a/dnsseed/setup.go b/dnsseed/setup.go index a871c02..4365cad 100644 --- a/dnsseed/setup.go +++ b/dnsseed/setup.go @@ -65,18 +65,19 @@ func setup(c *caddy.Controller) error { // Send the initial request for more addresses; spawns goroutines to process the responses. // Ready() will flip to true once we've received and confirmed at least 10 peers. - seeder.RequestAddresses() + go func() { + // TODO load from storage if we already know some peers + log.Infof("Getting addresses from bootstrap peer %s:%s", address, port) + seeder.RequestAddresses() + runCrawl(seeder) + }() + // Start the update timer go func() { for { select { case <-time.After(updateInterval): - seeder.RequestAddresses() - err := seeder.WaitForAddresses(10, 30*time.Second) - if err != nil { - log.Errorf("Failed to refresh addresses: %v", err) - } - // XXX: If we wanted to crawl independently, this would be the place. + runCrawl(seeder) } } }() @@ -93,3 +94,25 @@ func setup(c *caddy.Controller) error { // All OK, return a nil error. return nil } + +func runCrawl(seeder *zcash.Seeder) { + log.Info("Beginning crawl") + start := time.Now() + + // Slow motion crawl: we'll get them all eventually! + + // 1. Make sure our addresses are still live and leave the + // connections open (true would disconnect immediately). + seeder.RefreshAddresses(false) + + // 2. Request addresses from everyone we're connected to, + // synchronously. This will block a while in an attempt + // to catch all of the addr responses it can. + newPeerCount := seeder.RequestAddresses() + + // 3. Disconnect from everyone & leave them alone for a while + seeder.DisconnectAllPeers() + + elapsed := time.Now().Sub(start).Truncate(time.Second).Seconds() + log.Infof("Crawl complete, met %d new peers of %d in %.2f seconds", newPeerCount, seeder.GetPeerCount(), elapsed) +} diff --git a/zcash/address_book.go b/zcash/address_book.go index a38b269..f619891 100644 --- a/zcash/address_book.go +++ b/zcash/address_book.go @@ -122,7 +122,7 @@ func (bk *AddressBook) Blacklist(s PeerKey) { } } -// Touch updates the last-seen timestamp if the peer is in the address book or does nothing if not. +// Touch updates the last-seen timestamp if the peer is in the valid address book or does nothing if not. func (bk *AddressBook) Touch(s PeerKey) { bk.addrState.Lock() defer bk.addrState.Unlock() @@ -132,13 +132,21 @@ func (bk *AddressBook) Touch(s PeerKey) { } } +func (bk *AddressBook) Count() int { + bk.addrState.RLock() + defer bk.addrState.RUnlock() + + return len(bk.peers) +} + // IsKnown returns true if the peer is already in our address book, false if not. func (bk *AddressBook) IsKnown(s PeerKey) bool { bk.addrState.RLock() defer bk.addrState.RUnlock() - _, known := bk.peers[s] - return known + _, knownGood := bk.peers[s] + _, knownBad := bk.blacklist[s] + return knownGood || knownBad } func (bk *AddressBook) IsBlacklisted(s PeerKey) bool { @@ -149,6 +157,18 @@ func (bk *AddressBook) IsBlacklisted(s PeerKey) bool { return blacklisted } +// enqueueAddrs sends all of our known valid peers to a channel for processing +// and adds the count to a WaitGroup counter to aid processing. +func (bk *AddressBook) enqueueAddrs(addrQueue chan *wire.NetAddress, count *sync.WaitGroup) { + bk.addrState.RLock() + defer bk.addrState.RUnlock() + + count.Add(len(bk.peers)) + for _, v := range bk.peers { + addrQueue <- v.netaddr + } +} + // WaitForAddresses waits for n addresses to be received and their initial // connection attempts to resolve. There is no escape if that does not happen - // this is intended for test runners or goroutines with a timeout. diff --git a/zcash/client.go b/zcash/client.go index 8e5d27e..356900a 100644 --- a/zcash/client.go +++ b/zcash/client.go @@ -7,6 +7,7 @@ import ( "runtime" "strconv" "sync" + "sync/atomic" "time" "github.com/btcsuite/btcd/addrmgr" @@ -34,7 +35,7 @@ var defaultPeerConfig = &peer.Config{ ProtocolVersion: 170009, // Blossom } -const ( +var ( // The minimum number of addresses we need to know about to begin serving introductions minimumReadyAddresses = 10 @@ -46,6 +47,9 @@ const ( // The amount of time crawler goroutines will wait for incoming addresses after a RequestAddresses() crawlerThreadTimeout = 30 * time.Second + + // The number of goroutines to spawn for a crawl request + crawlerGoroutineCount = runtime.NumCPU() * 16 ) // Seeder contains all of the state and configuration needed to request addresses from Zcash peers and present them to a DNS provider. @@ -73,6 +77,8 @@ func NewSeeder(network network.Network) (*Seeder, error) { return nil, errors.Wrap(err, "could not construct seeder") } + // sink, _ := os.OpenFile(os.DevNull, os.O_WRONLY, 0666) + // logger := log.New(sink, "zcash_seeder: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC) logger := log.New(os.Stdout, "zcash_seeder: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC) newSeeder := Seeder{ @@ -82,7 +88,7 @@ func NewSeeder(network network.Network) (*Seeder, error) { pendingPeers: NewPeerMap(), livePeers: NewPeerMap(), addrBook: NewAddressBook(), - addrQueue: make(chan *wire.NetAddress, 100), + addrQueue: make(chan *wire.NetAddress, 512), } newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck @@ -111,7 +117,7 @@ func newTestSeeder(network network.Network) (*Seeder, error) { pendingPeers: NewPeerMap(), livePeers: NewPeerMap(), addrBook: NewAddressBook(), - addrQueue: make(chan *wire.NetAddress, 100), + addrQueue: make(chan *wire.NetAddress, 512), } newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck @@ -278,8 +284,12 @@ func (s *Seeder) DisconnectAllPeers() { // RequestAddresses sends a request for more addresses to every peer we're connected to, // then checks to make sure the addresses that come back are usable before adding them to -// the address book. -func (s *Seeder) RequestAddresses() { +// the address book. The call attempts to block until all addresses have been processed, +// but since we can't know how many that will be it eventually times out. Therefore, +// while calling RequestAddresses synchronously is possible, it risks a major delay; most +// users will be better served by giving this its own goroutine and using WaitForAddresses +// with a timeout to pause only until a sufficient number of addresses are ready. +func (s *Seeder) RequestAddresses() int { s.livePeers.Range(func(key PeerKey, p *peer.Peer) bool { s.logger.Printf("Requesting addresses from peer %s", p.Addr()) p.QueueMessage(wire.NewMsgGetAddr(), nil) @@ -290,7 +300,12 @@ func (s *Seeder) RequestAddresses() { // GetAddr messages to briefly live trial connections without meaning to. It's // meant to be run on a timer that takes longer to fire than it takes to check addresses. - for i := 0; i < runtime.NumCPU()*4; i++ { + var peerCount int32 + + var wg sync.WaitGroup + wg.Add(crawlerGoroutineCount) + + for i := 0; i < crawlerGoroutineCount; i++ { go func() { var na *wire.NetAddress for { @@ -300,7 +315,7 @@ func (s *Seeder) RequestAddresses() { na = next case <-time.After(crawlerThreadTimeout): // Or die if there wasn't one - s.logger.Printf("Crawler thread timeout") + wg.Done() return } @@ -345,10 +360,63 @@ func (s *Seeder) RequestAddresses() { s.DisconnectPeer(potentialPeer) s.logger.Printf("Successfully learned about %s:%d.", na.IP, na.Port) + atomic.AddInt32(&peerCount, 1) s.addrBook.Add(potentialPeer) } }() } + + wg.Wait() + return int(peerCount) +} + +// RefreshAddresses checks to make sure the addresses we think we know are +// still usable and removes them from the address book if they aren't. +// The call blocks until all addresses have been processed. If disconnect is +// true, we immediately disconnect from the peers after verifying them. +func (s *Seeder) RefreshAddresses(disconnect bool) { + refreshQueue := make(chan *wire.NetAddress, 100) + var count sync.WaitGroup + + go s.addrBook.enqueueAddrs(refreshQueue, &count) + + for i := 0; i < crawlerGoroutineCount; i++ { + go func() { + var na *wire.NetAddress + for { + select { + case next := <-refreshQueue: + // Pull the next address off the queue + na = next + case <-time.After(crawlerThreadTimeout): + // Or die if there wasn't one + return + } + + peer := peerKeyFromNA(na) + portString := strconv.Itoa(int(na.Port)) + + err := s.Connect(na.IP.String(), portString) + + if err != nil { + if err != ErrRepeatConnection { + // Blacklist the peer. TODO: We might try to connect again later. + s.logger.Printf("Peer %s:%d unusable on refresh. Error: %s", na.IP, na.Port, err) + s.addrBook.Blacklist(peer) + } + count.Done() + continue + } + + if disconnect { + s.DisconnectPeer(peer) + } + count.Done() + } + }() + } + + count.Wait() } // WaitForAddresses waits for n addresses to be confirmed and available in the address book. @@ -378,6 +446,11 @@ func (s *Seeder) AddressesV6(n int) []net.IP { return s.addrBook.shuffleAddressList(n, true) } +// GetPeerCount returns how many valid peers we know about. +func (s *Seeder) GetPeerCount() int { + return s.addrBook.Count() +} + // testBlacklist adds a peer to the blacklist directly, for testing. func (s *Seeder) testBlacklist(pk PeerKey) { s.addrBook.Blacklist(pk) diff --git a/zcash/client_callbacks.go b/zcash/client_callbacks.go index 0548a78..60e1403 100644 --- a/zcash/client_callbacks.go +++ b/zcash/client_callbacks.go @@ -49,9 +49,14 @@ func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) { s.logger.Printf("Got %d addrs from peer %s", len(msg.AddrList), p.Addr()) - //queue := make(chan *wire.NetAddress, len(msg.AddrList)) - for _, na := range msg.AddrList { + // By checking if we know them before adding to the queue, we create + // the end condition for the crawler thread: it will time out after + // not processing any new addresses. + if s.addrBook.IsKnown(peerKeyFromNA(na)) { + s.logger.Printf("Already knew about %s:%d", na.IP, na.Port) + continue + } s.addrQueue <- na } } diff --git a/zcash/client_test.go b/zcash/client_test.go index f2c4b20..728109c 100644 --- a/zcash/client_test.go +++ b/zcash/client_test.go @@ -194,7 +194,7 @@ func TestRequestAddresses(t *testing.T) { t.Fatal(err) } - regSeeder.RequestAddresses() + go regSeeder.RequestAddresses() err = regSeeder.WaitForAddresses(1, 1*time.Second) if err != nil { @@ -219,7 +219,7 @@ func TestBlacklist(t *testing.T) { t.Fatal(err) } - regSeeder.RequestAddresses() + go regSeeder.RequestAddresses() err = regSeeder.WaitForAddresses(1, 1*time.Second) if err != nil {