From 21b4c750a3fc48913cd0a3b9e9ef44c2f4ae1112 Mon Sep 17 00:00:00 2001 From: George Tankersley Date: Wed, 16 Oct 2019 22:18:30 -0400 Subject: [PATCH] zcash: add address request handling --- zcash/address.go | 40 ++++++++++ zcash/client.go | 184 +++++++++++++++++++++++++++++++++++++++---- zcash/client_test.go | 15 +++- 3 files changed, 219 insertions(+), 20 deletions(-) create mode 100644 zcash/address.go diff --git a/zcash/address.go b/zcash/address.go new file mode 100644 index 0000000..aa77afb --- /dev/null +++ b/zcash/address.go @@ -0,0 +1,40 @@ +package zcash + +import ( + "net" + "strconv" + "time" + + "github.com/btcsuite/btcd/wire" +) + +type Address struct { + netaddr *wire.NetAddress + valid bool + blacklist bool + lastTried time.Time +} + +func (a *Address) IsGood() bool { + return a.valid == true +} + +func (a *Address) IsBad() bool { + return a.blacklist == true +} + +func (a *Address) String() string { + portString := strconv.Itoa(int(a.netaddr.Port)) + return net.JoinHostPort(a.netaddr.IP.String(), portString) +} + +// Addresses should be sortable by least-recently-tried + +type AddrList []*Address + +func (list AddrList) Len() int { return len(list) } +func (list AddrList) Swap(i, j int) { list[i], list[j] = list[j], list[i] } +func (list AddrList) Less(i, j int) bool { return list[i].lastTried.Before(list[j].lastTried) } + +// GetShuffledAddressList returns a slice of n valid addresses in random order. +func GetShuffledAddressList(addrList []*Address, n int) []*Address { return nil } diff --git a/zcash/client.go b/zcash/client.go index 8bb52a9..aa2c929 100644 --- a/zcash/client.go +++ b/zcash/client.go @@ -4,9 +4,11 @@ import ( "log" "net" "os" + "strconv" "sync" "time" + "github.com/btcsuite/btcd/addrmgr" "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" @@ -18,6 +20,7 @@ import ( var ( ErrRepeatConnection = errors.New("attempted repeat connection to existing peer") ErrNoSuchPeer = errors.New("no record of requested peer") + ErrAddressTimeout = errors.New("wait for addreses timed out") ) var defaultPeerConfig = &peer.Config{ @@ -34,14 +37,16 @@ type Seeder struct { config *peer.Config logger *log.Logger + // Peer list handling + peerState sync.RWMutex handshakeSignals *sync.Map pendingPeers *sync.Map livePeers *sync.Map - addrRecvChan chan *wire.NetAddress - - // For mutating the above - peerState sync.RWMutex + // Address list handling + addrState sync.RWMutex + addrRecvCond *sync.Cond + addrList []*Address } func NewSeeder(network network.Network) (*Seeder, error) { @@ -58,9 +63,11 @@ func NewSeeder(network network.Network) (*Seeder, error) { handshakeSignals: new(sync.Map), pendingPeers: new(sync.Map), livePeers: new(sync.Map), - addrRecvChan: make(chan *wire.NetAddress, 100), + addrList: make([]*Address, 0, 1000), } + newSeeder.addrRecvCond = sync.NewCond(&newSeeder.addrState) + newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck newSeeder.config.Listeners.OnAddr = newSeeder.onAddr @@ -85,9 +92,11 @@ func newTestSeeder(network network.Network) (*Seeder, error) { handshakeSignals: new(sync.Map), pendingPeers: new(sync.Map), livePeers: new(sync.Map), - addrRecvChan: make(chan *wire.NetAddress, 100), + addrList: make([]*Address, 0, 1000), } + newSeeder.addrRecvCond = sync.NewCond(&newSeeder.addrState) + newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck newSeeder.config.Listeners.OnAddr = newSeeder.onAddr @@ -134,10 +143,26 @@ func (s *Seeder) onVerAck(p *peer.Peer, msg *wire.MsgVerAck) { // Signal successful connection if signal, ok := s.handshakeSignals.Load(p.Addr()); ok { signal.(chan struct{}) <- struct{}{} + } else { + s.logger.Printf("Got verack from peer without a callback channel: %s", p.Addr()) + s.DisconnectPeer(p.Addr()) return } - s.logger.Printf("Got verack from peer without a callback channel: %s", p.Addr()) + // Add to list of known good addresses + newAddr := &Address{ + netaddr: p.NA(), + valid: true, + lastTried: time.Now(), + } + + s.logger.Printf("Adding %s to address list", p.Addr()) + + s.addrState.Lock() + s.addrList = append(s.addrList, newAddr) + s.addrState.Unlock() + + return } // ConnectToPeer attempts to connect to a peer on the default port at the @@ -155,15 +180,30 @@ func (s *Seeder) Connect(addr, port string) error { return errors.Wrap(err, "constructing outbound peer") } - _, alreadyPending := s.pendingPeers.LoadOrStore(p.Addr(), p) - _, alreadyHandshaking := s.handshakeSignals.LoadOrStore(p.Addr(), make(chan struct{}, 1)) + _, alreadyPending := s.pendingPeers.Load(p.Addr()) + _, alreadyHandshaking := s.handshakeSignals.Load(p.Addr()) _, alreadyLive := s.livePeers.Load(p.Addr()) - if alreadyPending || alreadyHandshaking || alreadyLive { - s.logger.Printf("Attempted repeat connection to peer %s", p.Addr()) + if alreadyPending { + s.logger.Printf("Peer is already pending: %s", p.Addr()) + return ErrRepeatConnection + } else { + s.pendingPeers.Store(p.Addr(), p) + } + + if alreadyHandshaking { + s.logger.Printf("Peer is already handshaking: %s", p.Addr()) + return ErrRepeatConnection + } else { + s.handshakeSignals.Store(p.Addr(), make(chan struct{}, 1)) + } + + if alreadyLive { + s.logger.Printf("Peer is already live: %s", p.Addr()) return ErrRepeatConnection } + // TODO time out conn, err := net.Dial("tcp", p.Addr()) if err != nil { return errors.Wrap(err, "dialing new peer address") @@ -212,6 +252,7 @@ func (s *Seeder) DisconnectPeer(addr string) error { // TODO: type safety and error handling v := p.(*peer.Peer) + s.logger.Printf("Disconnecting from peer %s", v.Addr()) v.Disconnect() v.WaitForDisconnect() s.livePeers.Delete(addr) @@ -219,6 +260,39 @@ func (s *Seeder) DisconnectPeer(addr string) error { return nil } +// DisconnectPeerDishonorably disconnects from a live peer identified by +// "host:port" string. It returns an error if we aren't connected to that peer. +// "Dishonorably" furthermore removes this peer from the list of known good +// addresses and adds them to a blacklist. +func (s *Seeder) DisconnectPeerDishonorably(addr string) error { + p, ok := s.livePeers.Load(addr) + + if !ok { + return ErrNoSuchPeer + } + + // TODO: type safety and error handling + + v := p.(*peer.Peer) + s.logger.Printf("Disconnecting from peer %s", v.Addr()) + v.Disconnect() + v.WaitForDisconnect() + s.livePeers.Delete(addr) + + s.addrState.Lock() + for i := 0; i < len(s.addrList); i++ { + address := s.addrList[i] + if address.String() == addr { + s.logger.Printf("Blacklisting peer %s", v.Addr()) + address.valid = false + address.blacklist = true + } + } + s.addrState.Unlock() + + return nil +} + // DisconnectAllPeers terminates the connections to all live and pending peers. func (s *Seeder) DisconnectAllPeers() { s.pendingPeers.Range(func(key, value interface{}) bool { @@ -227,7 +301,6 @@ func (s *Seeder) DisconnectAllPeers() { s.logger.Printf("Invalid peer in pendingPeers") return false } - s.logger.Printf("Disconnecting from pending peer %s", p.Addr()) p.Disconnect() p.WaitForDisconnect() s.pendingPeers.Delete(key) @@ -240,7 +313,6 @@ func (s *Seeder) DisconnectAllPeers() { s.logger.Printf("Invalid peer in livePeers") return false } - s.logger.Printf("Disconnecting from live peer %s", p.Addr()) s.DisconnectPeer(p.Addr()) return true }) @@ -259,8 +331,55 @@ func (s *Seeder) RequestAddresses() { }) } -func (s *Seeder) WaitForMoreAddresses() { - <-s.addrRecvChan +// WaitForAddresses waits for n addresses to be received and their initial +// connection attempts to resolve. There is no escape if that does not happen - +// this is intended for test runners. +func (s *Seeder) WaitForAddresses(n int) error { + s.addrState.Lock() + for { + addrCount := len(s.addrList) + if addrCount < n { + s.addrRecvCond.Wait() + } else { + break + } + } + s.addrState.Unlock() + return nil +} + +func (s *Seeder) alreadyKnowsAddress(na *wire.NetAddress) bool { + s.addrState.RLock() + defer s.addrState.RUnlock() + + ref := &Address{ + netaddr: na, + } + + for i := 0; i < len(s.addrList); i++ { + if s.addrList[i].String() == ref.String() { + return true + } + } + + return false +} + +func (s *Seeder) isBlacklistedAddress(na *wire.NetAddress) bool { + s.addrState.RLock() + defer s.addrState.RUnlock() + + ref := &Address{ + netaddr: na, + } + + for i := 0; i < len(s.addrList); i++ { + if s.addrList[i].String() == ref.String() { + return s.addrList[i].IsBad() + } + } + + return false } func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) { @@ -271,7 +390,38 @@ func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) { } s.logger.Printf("Got %d addrs from peer %s", len(msg.AddrList), p.Addr()) - for _, addr := range msg.AddrList { - s.addrRecvChan <- addr + + for _, na := range msg.AddrList { + s.logger.Printf("Trying %s:%d from peer %s", na.IP, na.Port, p.Addr()) + go func(na *wire.NetAddress) { + if !addrmgr.IsRoutable(na) && !s.config.AllowSelfConns { + s.logger.Printf("Got bad addr %s:%d from peer %s", na.IP, na.Port, p.Addr()) + s.DisconnectPeerDishonorably(p.Addr()) + return + } + + if s.alreadyKnowsAddress(na) { + s.logger.Printf("Already knew about address %s:%d", na.IP, na.Port) + return + } + + if s.isBlacklistedAddress(na) { + s.logger.Printf("Address %s:%d is blacklisted", na.IP, na.Port) + return + } + + portString := strconv.Itoa(int(na.Port)) + err := s.Connect(na.IP.String(), portString) + + if err != nil { + s.logger.Printf("Got unusable peer %s:%d from peer %s. Error: ", na.IP, na.Port, p.Addr(), err) + return + } + + peerString := net.JoinHostPort(na.IP.String(), portString) + s.DisconnectPeer(peerString) + + s.addrRecvCond.Broadcast() + }(na) } } diff --git a/zcash/client_test.go b/zcash/client_test.go index 0347d93..d9a4d64 100644 --- a/zcash/client_test.go +++ b/zcash/client_test.go @@ -39,9 +39,15 @@ func startMockLoop() { time.Now(), 0, net.ParseIP("127.0.0.1"), - uint16(8233), + uint16(18233), ) - cache = append(cache, addr) + addr2 := wire.NewNetAddressTimestamp( + time.Now(), + 0, + net.ParseIP("127.0.0.1"), + uint16(18344), + ) + cache = append(cache, addr, addr2) _, err := p.PushAddrMsg(cache) if err != nil { mockPeerLogger.Error(err) @@ -154,5 +160,8 @@ func TestRequestAddresses(t *testing.T) { } regSeeder.RequestAddresses() - regSeeder.WaitForMoreAddresses() + regSeeder.WaitForAddresses(1) + + // TODO It isn't possible to test this wait on a local mock peer without + // carving a path through absolutely all of the bad connection logic. }