From 7716305c89599df58163d206a85e67e7789190c5 Mon Sep 17 00:00:00 2001 From: George Tankersley Date: Sat, 12 Oct 2019 22:00:20 -0400 Subject: [PATCH] zcash: implement address requests and improve test reliability --- zcash/client.go | 62 +++++++++++++++++++++++--- zcash/client_test.go | 104 ++++++++++++++++++++++++++----------------- 2 files changed, 120 insertions(+), 46 deletions(-) diff --git a/zcash/client.go b/zcash/client.go index aa547e8..1c297af 100644 --- a/zcash/client.go +++ b/zcash/client.go @@ -17,6 +17,7 @@ import ( var ( ErrRepeatConnection = errors.New("attempted repeat connection to existing peer") + ErrNoSuchPeer = errors.New("no record of requested peer") ) var defaultPeerConfig = &peer.Config{ @@ -37,6 +38,8 @@ type Seeder struct { pendingPeers *sync.Map livePeers *sync.Map + addrRecvChan chan *wire.NetAddress + // For mutating the above peerState sync.RWMutex } @@ -55,9 +58,11 @@ func NewSeeder(network network.Network) (*Seeder, error) { handshakeSignals: new(sync.Map), pendingPeers: new(sync.Map), livePeers: new(sync.Map), + addrRecvChan: make(chan *wire.NetAddress, 100), } newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck + newSeeder.config.Listeners.OnAddr = newSeeder.onAddr return &newSeeder, nil } @@ -80,9 +85,11 @@ func newTestSeeder(network network.Network) (*Seeder, error) { handshakeSignals: new(sync.Map), pendingPeers: new(sync.Map), livePeers: new(sync.Map), + addrRecvChan: make(chan *wire.NetAddress, 100), } newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck + newSeeder.config.Listeners.OnAddr = newSeeder.onAddr return &newSeeder, nil } @@ -156,6 +163,7 @@ func (s *Seeder) ConnectToPeer(addr string) error { s.logger.Printf("Handshake initated with new peer %s", p.Addr()) p.AssociateConnection(conn) + // TODO: handle disconnect during this handshakeChan, _ := s.handshakeSignals.Load(p.Addr()) select { @@ -178,7 +186,25 @@ func (s *Seeder) GetPeer(addr string) (*peer.Peer, error) { return p.(*peer.Peer), nil } - return nil, errors.New("no such active peer") + return nil, ErrNoSuchPeer +} + +func (s *Seeder) DisconnectPeer(addr string) error { + lookupKey := net.JoinHostPort(addr, s.config.ChainParams.DefaultPort) + p, ok := s.livePeers.Load(lookupKey) + + if !ok { + return ErrNoSuchPeer + } + + // TODO: type safety and error handling + + v := p.(*peer.Peer) + v.Disconnect() + v.WaitForDisconnect() + s.livePeers.Delete(lookupKey) + + return nil } func (s *Seeder) DisconnectAllPeers() { @@ -202,11 +228,37 @@ func (s *Seeder) DisconnectAllPeers() { return false } s.logger.Printf("Disconnecting from live peer %s", p.Addr()) - p.Disconnect() - p.WaitForDisconnect() - s.livePeers.Delete(key) + s.DisconnectPeer(p.Addr()) return true }) } -func (s *Seeder) RequestAddresses() {} +func (s *Seeder) RequestAddresses() { + s.livePeers.Range(func(key, value interface{}) bool { + p, ok := value.(*peer.Peer) + if !ok { + s.logger.Printf("Invalid peer in livePeers") + return false + } + s.logger.Printf("Requesting addresses from peer %s", p.Addr()) + p.QueueMessage(wire.NewMsgGetAddr(), nil) + return true + }) +} + +func (s *Seeder) WaitForMoreAddresses() { + <-s.addrRecvChan +} + +func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) { + if len(msg.AddrList) == 0 { + s.logger.Printf("Got empty addr message from peer %s. Disconnecting.", p.Addr()) + s.DisconnectPeer(p.Addr()) + return + } + + s.logger.Printf("Got %d addrs from peer %s", len(msg.AddrList), p.Addr()) + for _, addr := range msg.AddrList { + s.addrRecvChan <- addr + } +} diff --git a/zcash/client_test.go b/zcash/client_test.go index 5b371dd..877e4e9 100644 --- a/zcash/client_test.go +++ b/zcash/client_test.go @@ -1,101 +1,106 @@ package zcash import ( - "context" "net" + "os" "sync" "testing" + "time" "github.com/btcsuite/btcd/peer" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btclog" "github.com/gtank/coredns-zcash/zcash/network" ) -func mockLocalPeer(ctx context.Context) error { +func TestMain(m *testing.M) { + startMockLoop() + exitCode := m.Run() + os.Exit(exitCode) +} + +func startMockLoop() { // Configure peer to act as a regtest node that offers no services. config, err := newSeederPeerConfig(network.Regtest, defaultPeerConfig) if err != nil { - return err + return } config.AllowSelfConns = true - // backendLogger := btclog.NewBackend(os.Stdout) - // mockPeerLogger := backendLogger.Logger("mockPeer") - // mockPeerLogger.SetLevel(btclog.LevelTrace) - // peer.UseLogger(mockPeerLogger) + backendLogger := btclog.NewBackend(os.Stdout) + mockPeerLogger := backendLogger.Logger("mockPeer") + //mockPeerLogger.SetLevel(btclog.LevelTrace) + peer.UseLogger(mockPeerLogger) - mockPeer := peer.NewInboundPeer(config) + config.Listeners.OnGetAddr = func(p *peer.Peer, msg *wire.MsgGetAddr) { + cache := make([]*wire.NetAddress, 0, 1) + addr := wire.NewNetAddressTimestamp( + time.Now(), + 0, + net.ParseIP("127.0.0.1"), + uint16(8233), + ) + cache = append(cache, addr) + _, err := p.PushAddrMsg(cache) + if err != nil { + mockPeerLogger.Error(err) + } + } listenAddr := net.JoinHostPort("127.0.0.1", config.ChainParams.DefaultPort) listener, err := net.Listen("tcp", listenAddr) if err != nil { - return err + return } go func() { - conn, err := listener.Accept() - if err != nil { - return - } + for { + conn, err := listener.Accept() + if err != nil { + return + } - mockPeer.AssociateConnection(conn) - - select { - case <-ctx.Done(): - mockPeer.Disconnect() - mockPeer.WaitForDisconnect() - return + mockPeer := peer.NewInboundPeer(config) + mockPeer.AssociateConnection(conn) } }() - - return nil } func TestOutboundPeerSync(t *testing.T) { - testContext, cancel := context.WithCancel(context.Background()) - defer cancel() - - if err := mockLocalPeer(testContext); err != nil { - t.Logf("error starting mock peer (%v).", err) - } - regSeeder, err := newTestSeeder(network.Regtest) if err != nil { - t.Fatal(err) + t.Error(err) + return } err = regSeeder.ConnectToPeer("127.0.0.1") if err != nil { - t.Fatal(err) + t.Error(err) + return } // Can we address that peer if we want to? p, err := regSeeder.GetPeer("127.0.0.1") if err != nil { - t.Fatal(err) + t.Error(err) + return } if p.Connected() { - regSeeder.DisconnectAllPeers() + regSeeder.DisconnectPeer("127.0.0.1") } else { t.Error("Peer never connected") } // Can we STILL address a flushed peer? p, err = regSeeder.GetPeer("127.0.0.1") - if err == nil { + if err != ErrNoSuchPeer { t.Error("Peer should have been cleared on disconnect") } } func TestOutboundPeerAsync(t *testing.T) { - testContext, cancel := context.WithCancel(context.Background()) - defer cancel() - - if err := mockLocalPeer(testContext); err != nil { - t.Logf("error starting mock peer (%v).", err) - } - regSeeder, err := newTestSeeder(network.Regtest) if err != nil { t.Fatal(err) @@ -134,3 +139,20 @@ func TestOutboundPeerAsync(t *testing.T) { regSeeder.DisconnectAllPeers() } + +func TestRequestAddresses(t *testing.T) { + regSeeder, err := newTestSeeder(network.Regtest) + if err != nil { + t.Error(err) + return + } + + err = regSeeder.ConnectToPeer("127.0.0.1") + if err != nil { + t.Error(err) + return + } + + regSeeder.RequestAddresses() + regSeeder.WaitForMoreAddresses() +}