From f1e7d75e287ef4fc1ea073f4be53b0e7daaaefd8 Mon Sep 17 00:00:00 2001 From: George Tankersley Date: Wed, 20 May 2020 19:10:55 -0400 Subject: [PATCH] zcash: move goroutine spawn to caller rather than callback --- zcash/client.go | 77 +++++++++++++++++++++++++++++++++++++-- zcash/client_callbacks.go | 67 +--------------------------------- 2 files changed, 76 insertions(+), 68 deletions(-) diff --git a/zcash/client.go b/zcash/client.go index e989cc5..c957cf7 100644 --- a/zcash/client.go +++ b/zcash/client.go @@ -4,9 +4,12 @@ import ( "log" "net" "os" + "runtime" + "strconv" "sync" "time" + "github.com/btcsuite/btcd/addrmgr" "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" @@ -51,6 +54,9 @@ type Seeder struct { // The set of known addresses addrBook *AddressBook + + // The queue of incoming potential addresses + addrQueue chan *wire.NetAddress } func NewSeeder(network network.Network) (*Seeder, error) { @@ -68,6 +74,7 @@ func NewSeeder(network network.Network) (*Seeder, error) { pendingPeers: NewPeerMap(), livePeers: NewPeerMap(), addrBook: NewAddressBook(), + addrQueue: make(chan *wire.NetAddress), } newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck @@ -96,6 +103,7 @@ func newTestSeeder(network network.Network) (*Seeder, error) { pendingPeers: NewPeerMap(), livePeers: NewPeerMap(), addrBook: NewAddressBook(), + addrQueue: make(chan *wire.NetAddress), } newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck @@ -260,13 +268,78 @@ func (s *Seeder) DisconnectAllPeers() { }) } -// RequestAddresses sends a request for more addresses to every peer we're connected to. +// RequestAddresses sends a request for more addresses to every peer we're connected to, +// then checks to make sure the addresses that come back are usable before adding them to +// the address book. func (s *Seeder) RequestAddresses() { s.livePeers.Range(func(key PeerKey, p *peer.Peer) bool { s.logger.Printf("Requesting addresses from peer %s", p.Addr()) p.QueueMessage(wire.NewMsgGetAddr(), nil) return true }) + + // There's a sync concern: if this is called repeatedly you could end up broadcasting + // GetAddr messages to briefly live trial connections without meaning to. It's + // meant to be run on a timer that takes longer to fire than it takes to check addresses. + + for i := 0; i < runtime.NumCPU()*2; i++ { + go func() { + var na *wire.NetAddress + for { + select { + case next := <-s.addrQueue: + // Pull the next address off the queue + na = next + case <-time.After(1 * time.Second): + // Or die if there wasn't one + return + } + + // Note that AllowSelfConns is only exposed in a fork of btcd + // pending https://github.com/btcsuite/btcd/pull/1481, which + // is why the module `replace`s btcd. + if !addrmgr.IsRoutable(na) && !s.config.AllowSelfConns { + s.logger.Printf("Got bad addr %s:%d from peer %s", na.IP, na.Port, "") + // TODO blacklist peers who give us crap addresses + //s.DisconnectAndBlacklist(peerKeyFromPeer(p)) + continue + } + + potentialPeer := peerKeyFromNA(na) + + if s.addrBook.IsKnown(potentialPeer) { + s.logger.Printf("Already knew about %s:%d", na.IP, na.Port) + continue + } + + if s.addrBook.IsBlacklisted(potentialPeer) { + s.logger.Printf("Previously blacklisted %s:%d", na.IP, na.Port) + continue + } + + portString := strconv.Itoa(int(na.Port)) + err := s.Connect(na.IP.String(), portString) + + if err != nil { + if err == ErrRepeatConnection { + //s.logger.Printf("Got duplicate peer %s:%d.", na.IP, na.Port) + continue + } + + // Blacklist the potential peer. We might try to connect again later, + // since we assume IsRoutable filtered out the truly wrong ones. + s.logger.Printf("Got unusable peer %s:%d. Error: %s", na.IP, na.Port, err) + s.addrBook.Blacklist(potentialPeer) + continue + } + + s.DisconnectPeer(potentialPeer) + + s.logger.Printf("Successfully learned about %s:%d.", na.IP, na.Port) + s.addrBook.Add(potentialPeer) + } + }() + } } // WaitForAddresses waits for n addresses to be confirmed and available in the address book. @@ -285,5 +358,3 @@ func (s *Seeder) WaitForAddresses(n int, timeout time.Duration) error { func (s *Seeder) Ready() bool { return s.WaitForAddresses(minimumReadyAddresses, 1*time.Millisecond) == nil } - -//func (s *Seeder) Addresses(count int) [] diff --git a/zcash/client_callbacks.go b/zcash/client_callbacks.go index 1f42ff5..0548a78 100644 --- a/zcash/client_callbacks.go +++ b/zcash/client_callbacks.go @@ -1,11 +1,6 @@ package zcash import ( - "runtime" - "strconv" - "time" - - "github.com/btcsuite/btcd/addrmgr" "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" ) @@ -54,67 +49,9 @@ func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) { s.logger.Printf("Got %d addrs from peer %s", len(msg.AddrList), p.Addr()) - queue := make(chan *wire.NetAddress, len(msg.AddrList)) + //queue := make(chan *wire.NetAddress, len(msg.AddrList)) for _, na := range msg.AddrList { - queue <- na - } - - for i := 0; i < runtime.NumCPU()*2; i++ { - go func() { - var na *wire.NetAddress - for { - select { - case next := <-queue: - // Pull the next address off the queue - na = next - case <-time.After(1 * time.Second): - // Or die if there wasn't one - return - } - - // Note that AllowSelfConns is only exposed in a fork of btcd - // pending https://github.com/btcsuite/btcd/pull/1481, which - // is why the module `replace`s btcd. - if !addrmgr.IsRoutable(na) && !s.config.AllowSelfConns { - s.logger.Printf("Got bad addr %s:%d from peer %s", na.IP, na.Port, p.Addr()) - s.DisconnectAndBlacklist(peerKeyFromPeer(p)) - continue - } - - potentialPeer := peerKeyFromNA(na) - - if s.addrBook.IsKnown(potentialPeer) { - s.logger.Printf("Already knew about %s:%d", na.IP, na.Port) - continue - } - - if s.addrBook.IsBlacklisted(potentialPeer) { - s.logger.Printf("Previously blacklisted %s:%d", na.IP, na.Port) - continue - } - - portString := strconv.Itoa(int(na.Port)) - err := s.Connect(na.IP.String(), portString) - - if err != nil { - if err == ErrRepeatConnection { - s.logger.Printf("Got duplicate peer %s:%d from peer %s. Error: %s", na.IP, na.Port, p.Addr(), err) - continue - } - - // Blacklist the potential peer. We might try to connect again later, - // since we assume IsRoutable filtered out the truly wrong ones. - s.logger.Printf("Got unusable peer %s:%d from peer %s. Error: %s", na.IP, na.Port, p.Addr(), err) - s.addrBook.Blacklist(potentialPeer) - continue - } - - s.DisconnectPeer(potentialPeer) - - s.logger.Printf("Successfully learned about %s:%d from %s.", na.IP, na.Port, p.Addr()) - s.addrBook.Add(potentialPeer) - } - }() + s.addrQueue <- na } }