From 54a67e5e7206160e7324688b16d261327244ff2b Mon Sep 17 00:00:00 2001 From: George Tankersley Date: Mon, 30 Dec 2019 20:54:38 -0500 Subject: [PATCH] zcash: working but unpleasant refactoring of address handling --- go.sum | 2 + zcash/{address.go => address_book.go} | 24 ++++- zcash/client.go | 149 +++----------------------- zcash/client_callbacks.go | 119 ++++++++++++++++++++ zcash/client_test.go | 15 ++- zcash/peer_map.go | 13 ++- 6 files changed, 181 insertions(+), 141 deletions(-) rename zcash/{address.go => address_book.go} (78%) create mode 100644 zcash/client_callbacks.go diff --git a/go.sum b/go.sum index 9aeeeae..24e5a4c 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,4 @@ +github.com/aead/siphash v1.0.1 h1:FwHfE/T45KPKYuuSAKyyvE+oPWcaQ+CUmFW0bPlM+kg= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f h1:bAs4lUbRJpnnkd9VhRV3jjAVU7DJVjMaK+IsvSeZvFo= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= @@ -23,6 +24,7 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= +github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= diff --git a/zcash/address.go b/zcash/address_book.go similarity index 78% rename from zcash/address.go rename to zcash/address_book.go index 7eea87c..ef3b802 100644 --- a/zcash/address.go +++ b/zcash/address_book.go @@ -16,6 +16,8 @@ type Address struct { lastTried time.Time } +// NewAddress returns a new address that is marked valid, not blacklisted, and +// last tried at time.Now(). func NewAddress(na *wire.NetAddress) *Address { return &Address{ netaddr: na, @@ -56,6 +58,8 @@ func (bk *AddressBook) Add(newAddr *Address) { bk.addrState.Lock() bk.addrList = append(bk.addrList, newAddr) bk.addrState.Unlock() + + bk.addrRecvCond.Broadcast() } func (bk *AddressBook) Blacklist(addr PeerKey) { @@ -99,7 +103,7 @@ func (bk *AddressBook) IsBlacklistedAddress(na *wire.NetAddress) bool { return false } -func (bk *AddressBook) UpdateAddressState(update *Address) { +func (bk *AddressBook) UpdateAddressStateFromTemplate(update *Address) { bk.addrState.Lock() defer bk.addrState.Unlock() @@ -121,5 +125,23 @@ func NewAddressBook(capacity int) *AddressBook { return addrBook } +// WaitForAddresses waits for n addresses to be received and their initial +// connection attempts to resolve. There is no escape if that does not happen - +// this is intended for test runners or goroutines with a timeout. +func (bk *AddressBook) waitForAddresses(n int, done chan struct{}) { + bk.addrState.Lock() + for { + addrCount := len(bk.addrList) + if addrCount < n { + bk.addrRecvCond.Wait() + } else { + break + } + } + bk.addrState.Unlock() + done <- struct{}{} + return +} + // GetShuffledAddressList returns a slice of n valid addresses in random order. func (ab *AddressBook) GetShuffledAddressList(n int) []*Address { return nil } diff --git a/zcash/client.go b/zcash/client.go index 1d9d640..ced63c9 100644 --- a/zcash/client.go +++ b/zcash/client.go @@ -4,11 +4,9 @@ import ( "log" "net" "os" - "strconv" "sync" "time" - "github.com/btcsuite/btcd/addrmgr" "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" @@ -20,7 +18,7 @@ import ( var ( ErrRepeatConnection = errors.New("attempted repeat connection to existing peer") ErrNoSuchPeer = errors.New("no record of requested peer") - ErrAddressTimeout = errors.New("wait for addreses timed out") + ErrAddressTimeout = errors.New("wait for addresses timed out") ErrBlacklistedPeer = errors.New("peer is blacklisted") ) @@ -77,8 +75,9 @@ func newTestSeeder(network network.Network) (*Seeder, error) { return nil, errors.Wrap(err, "could not construct seeder") } - sink, _ := os.OpenFile(os.DevNull, os.O_WRONLY, 0666) - logger := log.New(sink, "zcash_seeder: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC) + // sink, _ := os.OpenFile(os.DevNull, os.O_WRONLY, 0666) + // logger := log.New(sink, "zcash_seeder: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC) + logger := log.New(os.Stdout, "zcash_seeder: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC) // Allows connections to self for easy mocking config.AllowSelfConns = true @@ -120,46 +119,7 @@ func (s *Seeder) GetNetworkDefaultPort() string { return s.config.ChainParams.DefaultPort } -func (s *Seeder) onVerAck(p *peer.Peer, msg *wire.MsgVerAck) { - // Check if we're expecting to hear from this peer - _, ok := s.pendingPeers.Load(peerKeyFromPeer(p)) - - if !ok { - s.logger.Printf("Got verack from unexpected peer %s", p.Addr()) - return - } - - // Add to set of live peers - s.livePeers.Store(peerKeyFromPeer(p), p) - - // Remove from set of pending peers - s.pendingPeers.Delete(peerKeyFromPeer(p)) - - // Signal successful connection - if signal, ok := s.handshakeSignals.Load(p.Addr()); ok { - signal.(chan struct{}) <- struct{}{} - } else { - s.logger.Printf("Got verack from peer without a callback channel: %s", p.Addr()) - s.DisconnectPeer(peerKeyFromPeer(p)) - return - } - - // Add to list of known good addresses if we don't already have it. - // Otherwise, update the last-valid time. - - if s.addrBook.AlreadyKnowsAddress(p.NA()) { - newAddr := NewAddress(p.NA()) - s.updateAddressState(newAddr) - return - } - - s.logger.Printf("Adding %s to address list", p.Addr()) - - s.addrBook.Add(newAddr) - return -} - -// ConnectToPeer attempts to connect to a peer on the default port at the +// ConnectOnDefaultPort attempts to connect to a peer on the default port at the // specified address. It returns an error if it can't complete handshake with // the peer. Otherwise it returns nil and adds the peer to the list of live // connections and known-good addresses. @@ -185,16 +145,14 @@ func (s *Seeder) Connect(addr, port string) error { if alreadyPending { s.logger.Printf("Peer is already pending: %s", p.Addr()) return ErrRepeatConnection - } else { - s.pendingPeers.Store(peerKeyFromPeer(p), p) } + s.pendingPeers.Store(peerKeyFromPeer(p), p) if alreadyHandshaking { s.logger.Printf("Peer is already handshaking: %s", p.Addr()) return ErrRepeatConnection - } else { - s.handshakeSignals.Store(p.Addr(), make(chan struct{}, 1)) } + s.handshakeSignals.Store(p.Addr(), make(chan struct{}, 1)) if alreadyLive { s.logger.Printf("Peer is already live: %s", p.Addr()) @@ -231,7 +189,7 @@ func (s *Seeder) GetPeer(addr PeerKey) (*peer.Peer, error) { p, ok := s.livePeers.Load(addr) if ok { - return p.(*peer.Peer), nil + return p, nil } return nil, ErrNoSuchPeer @@ -287,7 +245,7 @@ func (s *Seeder) DisconnectAllPeers() { }) s.livePeers.Range(func(key PeerKey, p *peer.Peer) bool { - s.DisconnectPeer(p.Addr()) + s.DisconnectPeer(key) return true }) } @@ -303,86 +261,13 @@ func (s *Seeder) RequestAddresses() { // WaitForAddresses waits for n addresses to be received and their initial // connection attempts to resolve. There is no escape if that does not happen - // this is intended for test runners. -func (s *Seeder) WaitForAddresses(n int) error { - s.addrState.Lock() - for { - addrCount := len(s.addrList) - if addrCount < n { - s.addrRecvCond.Wait() - } else { - break - } - } - s.addrState.Unlock() - return nil -} - -func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) { - if len(msg.AddrList) == 0 { - s.logger.Printf("Got empty addr message from peer %s. Disconnecting.", p.Addr()) - s.DisconnectPeer(p.Addr()) - return - } - - s.logger.Printf("Got %d addrs from peer %s", len(msg.AddrList), p.Addr()) - - queue := make(chan *wire.NetAddress, len(msg.AddrList)) - - for _, na := range msg.AddrList { - queue <- na - } - - for i := 0; i < 32; i++ { - go func() { - var na *wire.NetAddress - for { - select { - case next := <-queue: - na = next - case <-time.After(1 * time.Second): - return - } - - if !addrmgr.IsRoutable(na) && !s.config.AllowSelfConns { - s.logger.Printf("Got bad addr %s:%d from peer %s", na.IP, na.Port, p.Addr()) - s.DisconnectPeerDishonorably(p.Addr()) - continue - } - - if s.addrBook.AlreadyKnowsAddress(na) { - s.logger.Printf("Already knew about address %s:%d", na.IP, na.Port) - continue - } - - if s.addrBook.IsBlacklistedAddress(na) { - s.logger.Printf("Address %s:%d is blacklisted", na.IP, na.Port) - continue - } - - portString := strconv.Itoa(int(na.Port)) - err := s.Connect(na.IP.String(), portString) - - if err != nil { - s.logger.Printf("Got unusable peer %s:%d from peer %s. Error: %s", na.IP, na.Port, p.Addr(), err) - - // Mark previously-known peers as invalid - newAddr := &Address{ - netaddr: p.NA(), - valid: false, - lastTried: time.Now(), - } - - if s.alreadyKnowsAddress(p.NA()) { - s.updateAddressState(newAddr) - } - continue - } - - peerString := net.JoinHostPort(na.IP.String(), portString) - s.DisconnectPeer(peerString) - - s.addrRecvCond.Broadcast() - } - }() +func (s *Seeder) WaitForAddresses(n int, timeout time.Duration) error { + done := make(chan struct{}) + go s.addrBook.waitForAddresses(n, done) + select { + case <-done: + return nil + case <-time.After(timeout): + return ErrAddressTimeout } } diff --git a/zcash/client_callbacks.go b/zcash/client_callbacks.go new file mode 100644 index 0000000..435dfef --- /dev/null +++ b/zcash/client_callbacks.go @@ -0,0 +1,119 @@ +package zcash + +import ( + "strconv" + "time" + + "github.com/btcsuite/btcd/addrmgr" + "github.com/btcsuite/btcd/peer" + "github.com/btcsuite/btcd/wire" +) + +func (s *Seeder) onVerAck(p *peer.Peer, msg *wire.MsgVerAck) { + // Check if we're expecting to hear from this peer + _, ok := s.pendingPeers.Load(peerKeyFromPeer(p)) + + if !ok { + s.logger.Printf("Got verack from unexpected peer %s", p.Addr()) + return + } + + // Add to set of live peers + s.livePeers.Store(peerKeyFromPeer(p), p) + + // Remove from set of pending peers + s.pendingPeers.Delete(peerKeyFromPeer(p)) + + // Signal successful connection + if signal, ok := s.handshakeSignals.Load(p.Addr()); ok { + signal.(chan struct{}) <- struct{}{} + } else { + s.logger.Printf("Got verack from peer without a callback channel: %s", p.Addr()) + s.DisconnectPeer(peerKeyFromPeer(p)) + return + } + + // Add to list of known good addresses if we don't already have it. + // Otherwise, update the last-valid time. + newAddr := NewAddress(p.NA()) + + if s.addrBook.AlreadyKnowsAddress(p.NA()) { + s.addrBook.UpdateAddressStateFromTemplate(newAddr) + return + } + + s.logger.Printf("Adding %s to address list", p.Addr()) + + s.addrBook.Add(newAddr) + return +} + +func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) { + if len(msg.AddrList) == 0 { + s.logger.Printf("Got empty addr message from peer %s. Disconnecting.", p.Addr()) + s.DisconnectPeer(peerKeyFromPeer(p)) + return + } + + s.logger.Printf("Got %d addrs from peer %s", len(msg.AddrList), p.Addr()) + + queue := make(chan *wire.NetAddress, len(msg.AddrList)) + + for _, na := range msg.AddrList { + queue <- na + } + + for i := 0; i < 32; i++ { + go func() { + var na *wire.NetAddress + for { + select { + case next := <-queue: + na = next + case <-time.After(1 * time.Second): + return + } + + if !addrmgr.IsRoutable(na) && !s.config.AllowSelfConns { + s.logger.Printf("Got bad addr %s:%d from peer %s", na.IP, na.Port, p.Addr()) + s.DisconnectPeerDishonorably(peerKeyFromPeer(p)) + continue + } + + if s.addrBook.AlreadyKnowsAddress(na) { + s.logger.Printf("Already knew about address %s:%d", na.IP, na.Port) + continue + } + + if s.addrBook.IsBlacklistedAddress(na) { + s.logger.Printf("Address %s:%d is blacklisted", na.IP, na.Port) + continue + } + + portString := strconv.Itoa(int(na.Port)) + err := s.Connect(na.IP.String(), portString) + + if err != nil { + s.logger.Printf("Got unusable peer %s:%d from peer %s. Error: %s", na.IP, na.Port, p.Addr(), err) + + // Mark previously-known peers as invalid + newAddr := &Address{ + netaddr: p.NA(), + valid: false, + lastTried: time.Now(), + } + + // TODO: function for marking bad addresses directly. needs better storage layer + + if s.addrBook.AlreadyKnowsAddress(p.NA()) { + s.addrBook.UpdateAddressStateFromTemplate(newAddr) + } + continue + } + + s.DisconnectPeer(peerKeyFromNA(na)) + s.addrBook.Add(NewAddress(na)) + } + }() + } +} diff --git a/zcash/client_test.go b/zcash/client_test.go index d9a4d64..dffd361 100644 --- a/zcash/client_test.go +++ b/zcash/client_test.go @@ -155,13 +155,18 @@ func TestRequestAddresses(t *testing.T) { err = regSeeder.ConnectOnDefaultPort("127.0.0.1") if err != nil { - t.Error(err) - return + t.Fatal(err) } regSeeder.RequestAddresses() - regSeeder.WaitForAddresses(1) + err = regSeeder.WaitForAddresses(1, 1*time.Second) - // TODO It isn't possible to test this wait on a local mock peer without - // carving a path through absolutely all of the bad connection logic. + if err != nil { + t.Errorf("Error getting one mocked address: %v", err) + } + + err = regSeeder.WaitForAddresses(500, 1*time.Second) + if err != ErrAddressTimeout { + t.Errorf("Should have timed out, instead got: %v", err) + } } diff --git a/zcash/peer_map.go b/zcash/peer_map.go index f6c8437..aa4ceda 100644 --- a/zcash/peer_map.go +++ b/zcash/peer_map.go @@ -9,7 +9,7 @@ import ( "github.com/btcsuite/btcd/wire" ) -// The "host:port" format used throughout our maps and lists. +// PeerKey is a convenient marker type for the "host:port" format used throughout our maps and lists. type PeerKey string func peerKeyFromPeer(p *peer.Peer) PeerKey { @@ -40,7 +40,7 @@ func (pm *PeerMap) Load(key PeerKey) (*peer.Peer, bool) { v, mapOk := pm.m.Load(key) if mapOk { p, typeOk := v.(*peer.Peer) - if typeOK { + if typeOk { return p, true } } @@ -77,5 +77,12 @@ func (pm *PeerMap) Delete(key PeerKey) { // Range may be O(N) with the number of elements in the map even if f returns // false after a constant number of calls. func (pm *PeerMap) Range(f func(key PeerKey, value *peer.Peer) bool) { - pm.m.Range(f) + + // TODO: gaaaaaah + fUntyped := func(untypedKey, untypedValue interface{}) bool { + typedKey, _ := untypedKey.(PeerKey) + typedValue, _ := untypedValue.(*peer.Peer) + return f(typedKey, typedValue) + } + pm.m.Range(fUntyped) }