diff --git a/dnsseed/setup.go b/dnsseed/setup.go index a871c02..4365cad 100644 --- a/dnsseed/setup.go +++ b/dnsseed/setup.go @@ -65,18 +65,19 @@ func setup(c *caddy.Controller) error { // Send the initial request for more addresses; spawns goroutines to process the responses. // Ready() will flip to true once we've received and confirmed at least 10 peers. - seeder.RequestAddresses() + go func() { + // TODO load from storage if we already know some peers + log.Infof("Getting addresses from bootstrap peer %s:%s", address, port) + seeder.RequestAddresses() + runCrawl(seeder) + }() + // Start the update timer go func() { for { select { case <-time.After(updateInterval): - seeder.RequestAddresses() - err := seeder.WaitForAddresses(10, 30*time.Second) - if err != nil { - log.Errorf("Failed to refresh addresses: %v", err) - } - // XXX: If we wanted to crawl independently, this would be the place. + runCrawl(seeder) } } }() @@ -93,3 +94,25 @@ func setup(c *caddy.Controller) error { // All OK, return a nil error. return nil } + +func runCrawl(seeder *zcash.Seeder) { + log.Info("Beginning crawl") + start := time.Now() + + // Slow motion crawl: we'll get them all eventually! + + // 1. Make sure our addresses are still live and leave the + // connections open (true would disconnect immediately). + seeder.RefreshAddresses(false) + + // 2. Request addresses from everyone we're connected to, + // synchronously. This will block a while in an attempt + // to catch all of the addr responses it can. + newPeerCount := seeder.RequestAddresses() + + // 3. Disconnect from everyone & leave them alone for a while + seeder.DisconnectAllPeers() + + elapsed := time.Now().Sub(start).Truncate(time.Second).Seconds() + log.Infof("Crawl complete, met %d new peers of %d in %.2f seconds", newPeerCount, seeder.GetPeerCount(), elapsed) +} diff --git a/zcash/address_book.go b/zcash/address_book.go index a38b269..f619891 100644 --- a/zcash/address_book.go +++ b/zcash/address_book.go @@ -122,7 +122,7 @@ func (bk *AddressBook) Blacklist(s PeerKey) { } } -// Touch updates the last-seen timestamp if the peer is in the address book or does nothing if not. +// Touch updates the last-seen timestamp if the peer is in the valid address book or does nothing if not. func (bk *AddressBook) Touch(s PeerKey) { bk.addrState.Lock() defer bk.addrState.Unlock() @@ -132,13 +132,21 @@ func (bk *AddressBook) Touch(s PeerKey) { } } +func (bk *AddressBook) Count() int { + bk.addrState.RLock() + defer bk.addrState.RUnlock() + + return len(bk.peers) +} + // IsKnown returns true if the peer is already in our address book, false if not. func (bk *AddressBook) IsKnown(s PeerKey) bool { bk.addrState.RLock() defer bk.addrState.RUnlock() - _, known := bk.peers[s] - return known + _, knownGood := bk.peers[s] + _, knownBad := bk.blacklist[s] + return knownGood || knownBad } func (bk *AddressBook) IsBlacklisted(s PeerKey) bool { @@ -149,6 +157,18 @@ func (bk *AddressBook) IsBlacklisted(s PeerKey) bool { return blacklisted } +// enqueueAddrs sends all of our known valid peers to a channel for processing +// and adds the count to a WaitGroup counter to aid processing. +func (bk *AddressBook) enqueueAddrs(addrQueue chan *wire.NetAddress, count *sync.WaitGroup) { + bk.addrState.RLock() + defer bk.addrState.RUnlock() + + count.Add(len(bk.peers)) + for _, v := range bk.peers { + addrQueue <- v.netaddr + } +} + // WaitForAddresses waits for n addresses to be received and their initial // connection attempts to resolve. There is no escape if that does not happen - // this is intended for test runners or goroutines with a timeout. diff --git a/zcash/client.go b/zcash/client.go index 8e5d27e..356900a 100644 --- a/zcash/client.go +++ b/zcash/client.go @@ -7,6 +7,7 @@ import ( "runtime" "strconv" "sync" + "sync/atomic" "time" "github.com/btcsuite/btcd/addrmgr" @@ -34,7 +35,7 @@ var defaultPeerConfig = &peer.Config{ ProtocolVersion: 170009, // Blossom } -const ( +var ( // The minimum number of addresses we need to know about to begin serving introductions minimumReadyAddresses = 10 @@ -46,6 +47,9 @@ const ( // The amount of time crawler goroutines will wait for incoming addresses after a RequestAddresses() crawlerThreadTimeout = 30 * time.Second + + // The number of goroutines to spawn for a crawl request + crawlerGoroutineCount = runtime.NumCPU() * 16 ) // Seeder contains all of the state and configuration needed to request addresses from Zcash peers and present them to a DNS provider. @@ -73,6 +77,8 @@ func NewSeeder(network network.Network) (*Seeder, error) { return nil, errors.Wrap(err, "could not construct seeder") } + // sink, _ := os.OpenFile(os.DevNull, os.O_WRONLY, 0666) + // logger := log.New(sink, "zcash_seeder: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC) logger := log.New(os.Stdout, "zcash_seeder: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC) newSeeder := Seeder{ @@ -82,7 +88,7 @@ func NewSeeder(network network.Network) (*Seeder, error) { pendingPeers: NewPeerMap(), livePeers: NewPeerMap(), addrBook: NewAddressBook(), - addrQueue: make(chan *wire.NetAddress, 100), + addrQueue: make(chan *wire.NetAddress, 512), } newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck @@ -111,7 +117,7 @@ func newTestSeeder(network network.Network) (*Seeder, error) { pendingPeers: NewPeerMap(), livePeers: NewPeerMap(), addrBook: NewAddressBook(), - addrQueue: make(chan *wire.NetAddress, 100), + addrQueue: make(chan *wire.NetAddress, 512), } newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck @@ -278,8 +284,12 @@ func (s *Seeder) DisconnectAllPeers() { // RequestAddresses sends a request for more addresses to every peer we're connected to, // then checks to make sure the addresses that come back are usable before adding them to -// the address book. -func (s *Seeder) RequestAddresses() { +// the address book. The call attempts to block until all addresses have been processed, +// but since we can't know how many that will be it eventually times out. Therefore, +// while calling RequestAddresses synchronously is possible, it risks a major delay; most +// users will be better served by giving this its own goroutine and using WaitForAddresses +// with a timeout to pause only until a sufficient number of addresses are ready. +func (s *Seeder) RequestAddresses() int { s.livePeers.Range(func(key PeerKey, p *peer.Peer) bool { s.logger.Printf("Requesting addresses from peer %s", p.Addr()) p.QueueMessage(wire.NewMsgGetAddr(), nil) @@ -290,7 +300,12 @@ func (s *Seeder) RequestAddresses() { // GetAddr messages to briefly live trial connections without meaning to. It's // meant to be run on a timer that takes longer to fire than it takes to check addresses. - for i := 0; i < runtime.NumCPU()*4; i++ { + var peerCount int32 + + var wg sync.WaitGroup + wg.Add(crawlerGoroutineCount) + + for i := 0; i < crawlerGoroutineCount; i++ { go func() { var na *wire.NetAddress for { @@ -300,7 +315,7 @@ func (s *Seeder) RequestAddresses() { na = next case <-time.After(crawlerThreadTimeout): // Or die if there wasn't one - s.logger.Printf("Crawler thread timeout") + wg.Done() return } @@ -345,10 +360,63 @@ func (s *Seeder) RequestAddresses() { s.DisconnectPeer(potentialPeer) s.logger.Printf("Successfully learned about %s:%d.", na.IP, na.Port) + atomic.AddInt32(&peerCount, 1) s.addrBook.Add(potentialPeer) } }() } + + wg.Wait() + return int(peerCount) +} + +// RefreshAddresses checks to make sure the addresses we think we know are +// still usable and removes them from the address book if they aren't. +// The call blocks until all addresses have been processed. If disconnect is +// true, we immediately disconnect from the peers after verifying them. +func (s *Seeder) RefreshAddresses(disconnect bool) { + refreshQueue := make(chan *wire.NetAddress, 100) + var count sync.WaitGroup + + go s.addrBook.enqueueAddrs(refreshQueue, &count) + + for i := 0; i < crawlerGoroutineCount; i++ { + go func() { + var na *wire.NetAddress + for { + select { + case next := <-refreshQueue: + // Pull the next address off the queue + na = next + case <-time.After(crawlerThreadTimeout): + // Or die if there wasn't one + return + } + + peer := peerKeyFromNA(na) + portString := strconv.Itoa(int(na.Port)) + + err := s.Connect(na.IP.String(), portString) + + if err != nil { + if err != ErrRepeatConnection { + // Blacklist the peer. TODO: We might try to connect again later. + s.logger.Printf("Peer %s:%d unusable on refresh. Error: %s", na.IP, na.Port, err) + s.addrBook.Blacklist(peer) + } + count.Done() + continue + } + + if disconnect { + s.DisconnectPeer(peer) + } + count.Done() + } + }() + } + + count.Wait() } // WaitForAddresses waits for n addresses to be confirmed and available in the address book. @@ -378,6 +446,11 @@ func (s *Seeder) AddressesV6(n int) []net.IP { return s.addrBook.shuffleAddressList(n, true) } +// GetPeerCount returns how many valid peers we know about. +func (s *Seeder) GetPeerCount() int { + return s.addrBook.Count() +} + // testBlacklist adds a peer to the blacklist directly, for testing. func (s *Seeder) testBlacklist(pk PeerKey) { s.addrBook.Blacklist(pk) diff --git a/zcash/client_callbacks.go b/zcash/client_callbacks.go index 0548a78..60e1403 100644 --- a/zcash/client_callbacks.go +++ b/zcash/client_callbacks.go @@ -49,9 +49,14 @@ func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) { s.logger.Printf("Got %d addrs from peer %s", len(msg.AddrList), p.Addr()) - //queue := make(chan *wire.NetAddress, len(msg.AddrList)) - for _, na := range msg.AddrList { + // By checking if we know them before adding to the queue, we create + // the end condition for the crawler thread: it will time out after + // not processing any new addresses. + if s.addrBook.IsKnown(peerKeyFromNA(na)) { + s.logger.Printf("Already knew about %s:%d", na.IP, na.Port) + continue + } s.addrQueue <- na } } diff --git a/zcash/client_test.go b/zcash/client_test.go index f2c4b20..728109c 100644 --- a/zcash/client_test.go +++ b/zcash/client_test.go @@ -194,7 +194,7 @@ func TestRequestAddresses(t *testing.T) { t.Fatal(err) } - regSeeder.RequestAddresses() + go regSeeder.RequestAddresses() err = regSeeder.WaitForAddresses(1, 1*time.Second) if err != nil { @@ -219,7 +219,7 @@ func TestBlacklist(t *testing.T) { t.Fatal(err) } - regSeeder.RequestAddresses() + go regSeeder.RequestAddresses() err = regSeeder.WaitForAddresses(1, 1*time.Second) if err != nil {