From 87556241035c8220d8d382e3781fd895f0031a97 Mon Sep 17 00:00:00 2001 From: George Tankersley Date: Wed, 20 May 2020 18:08:34 -0400 Subject: [PATCH] zcash: refactor address book and ensure type consistency for interface{} maps --- zcash/address_book.go | 211 ++++++++++++++++++++++---------------- zcash/client.go | 52 ++++++---- zcash/client_callbacks.go | 64 ++++++------ zcash/peer_map.go | 4 + 4 files changed, 187 insertions(+), 144 deletions(-) diff --git a/zcash/address_book.go b/zcash/address_book.go index ef3b802..3440398 100644 --- a/zcash/address_book.go +++ b/zcash/address_book.go @@ -10,29 +10,9 @@ import ( ) type Address struct { - netaddr *wire.NetAddress - valid bool - blacklist bool - lastTried time.Time -} - -// NewAddress returns a new address that is marked valid, not blacklisted, and -// last tried at time.Now(). -func NewAddress(na *wire.NetAddress) *Address { - return &Address{ - netaddr: na, - valid: true, - blacklist: false, - lastTried: time.Now(), - } -} - -func (a *Address) IsGood() bool { - return a.valid && !a.blacklist -} - -func (a *Address) IsBad() bool { - return a.blacklist + netaddr *wire.NetAddress + blacklisted bool + lastUpdate time.Time } func (a *Address) String() string { @@ -44,94 +24,141 @@ func (a *Address) asPeerKey() PeerKey { return PeerKey(a.String()) } +func (a *Address) fromPeerKey(s PeerKey) (*Address, error) { + host, portString, err := net.SplitHostPort(s.String()) + if err != nil { + return nil, err + } + + portInt, err := strconv.ParseUint(portString, 10, 16) + if err != nil { + return nil, err + } + + na := wire.NewNetAddressTimestamp( + time.Now(), + 0, + net.ParseIP(host), + uint16(portInt), + ) + + a.netaddr = na + a.blacklisted = false + a.lastUpdate = na.Timestamp + return a, nil +} + +func (a *Address) asNetAddress() *wire.NetAddress { + newNA := *a.netaddr + newNA.Timestamp = a.lastUpdate + return &newNA +} + +func (a *Address) fromNetAddress(na *wire.NetAddress) (*Address, error) { + a.netaddr = na + a.blacklisted = false + a.lastUpdate = na.Timestamp + return a, nil +} + func (a *Address) MarshalText() (text []byte, err error) { return []byte(a.String()), nil } type AddressBook struct { - addrList []*Address + addrs map[PeerKey]*Address addrState sync.RWMutex addrRecvCond *sync.Cond } -func (bk *AddressBook) Add(newAddr *Address) { - bk.addrState.Lock() - bk.addrList = append(bk.addrList, newAddr) - bk.addrState.Unlock() - - bk.addrRecvCond.Broadcast() -} - -func (bk *AddressBook) Blacklist(addr PeerKey) { - bk.addrState.Lock() - for i := 0; i < len(bk.addrList); i++ { - address := bk.addrList[i] - if address.asPeerKey() == addr { - address.valid = false - address.blacklist = true - } - } - bk.addrState.Unlock() -} - -func (bk *AddressBook) AlreadyKnowsAddress(na *wire.NetAddress) bool { - bk.addrState.RLock() - defer bk.addrState.RUnlock() - - addr := NewAddress(na) - - for i := 0; i < len(bk.addrList); i++ { - if bk.addrList[i].String() == addr.String() { - return true - } - } - return false -} - -func (bk *AddressBook) IsBlacklistedAddress(na *wire.NetAddress) bool { - bk.addrState.RLock() - defer bk.addrState.RUnlock() - - ref := NewAddress(na) - - for i := 0; i < len(bk.addrList); i++ { - if bk.addrList[i].String() == ref.String() { - return bk.addrList[i].IsBad() - } - } - - return false -} - -func (bk *AddressBook) UpdateAddressStateFromTemplate(update *Address) { - bk.addrState.Lock() - defer bk.addrState.Unlock() - - for i := 0; i < len(bk.addrList); i++ { - if bk.addrList[i].String() == update.String() { - bk.addrList[i].valid = update.valid - bk.addrList[i].blacklist = update.blacklist - bk.addrList[i].lastTried = update.lastTried - return - } - } -} - -func NewAddressBook(capacity int) *AddressBook { +func NewAddressBook() *AddressBook { addrBook := &AddressBook{ - addrList: make([]*Address, 0, capacity), + addrs: make(map[PeerKey]*Address), } addrBook.addrRecvCond = sync.NewCond(&addrBook.addrState) return addrBook } +func (bk *AddressBook) Add(s PeerKey) { + newAddr, err := (&Address{}).fromPeerKey(s) + if err != nil { + // XXX effectively NOP bogus peer strings + return + } + + bk.addrState.Lock() + bk.addrs[s] = newAddr + bk.addrState.Unlock() + + // Wake anyone who was waiting on us to receive an address. + bk.addrRecvCond.Broadcast() +} + +func (bk *AddressBook) Remove(s PeerKey) { + bk.addrState.Lock() + defer bk.addrState.Unlock() + + if _, ok := bk.addrs[s]; ok { + delete(bk.addrs, s) + } +} + +func (bk *AddressBook) Blacklist(s PeerKey) { + bk.addrState.Lock() + defer bk.addrState.Unlock() + + if target, ok := bk.addrs[s]; ok { + target.blacklisted = true + target.lastUpdate = time.Now() + } else { + // Create a new Address just to be blacklisted + addr, err := (&Address{}).fromPeerKey(s) + if err != nil { + // XXX effectively NOP bogus peer strings + return + } + addr.blacklisted = true + bk.addrs[s] = addr + } +} + +// Touch updates the last-seen timestamp if the peer is in the address book or does nothing if not. +func (bk *AddressBook) Touch(s PeerKey) { + bk.addrState.Lock() + defer bk.addrState.Unlock() + + if target, ok := bk.addrs[s]; ok { + target.lastUpdate = time.Now() + } +} + +// IsKnown returns true if the peer is already in our address book, false if not. +func (bk *AddressBook) IsKnown(s PeerKey) bool { + bk.addrState.RLock() + defer bk.addrState.RUnlock() + + _, known := bk.addrs[s] + return known +} + +func (bk *AddressBook) IsBlacklisted(s PeerKey) bool { + bk.addrState.RLock() + defer bk.addrState.RUnlock() + + if target, ok := bk.addrs[s]; ok { + return target.blacklisted + } + + return false +} + // WaitForAddresses waits for n addresses to be received and their initial // connection attempts to resolve. There is no escape if that does not happen - // this is intended for test runners or goroutines with a timeout. func (bk *AddressBook) waitForAddresses(n int, done chan struct{}) { bk.addrState.Lock() for { - addrCount := len(bk.addrList) + addrCount := len(bk.addrs) if addrCount < n { bk.addrRecvCond.Wait() } else { @@ -144,4 +171,6 @@ func (bk *AddressBook) waitForAddresses(n int, done chan struct{}) { } // GetShuffledAddressList returns a slice of n valid addresses in random order. -func (ab *AddressBook) GetShuffledAddressList(n int) []*Address { return nil } +// func (bk *AddressBook) GetShuffledAddressList(n int) []*Address { + +// } diff --git a/zcash/client.go b/zcash/client.go index f8012a6..e989cc5 100644 --- a/zcash/client.go +++ b/zcash/client.go @@ -31,6 +31,13 @@ var defaultPeerConfig = &peer.Config{ ProtocolVersion: 170009, // Blossom } +// The minimum number of addresses we need to know about to begin serving introductions. +const minimumReadyAddresses = 10 + +// The maximum amount of time we will wait for a peer to complete the initial handshake. +const maximumHandshakeWait = 1 * time.Second + +// Seeder contains all of the state and configuration needed to request addresses from Zcash peers and present them to a DNS provider. type Seeder struct { peer *peer.Peer config *peer.Config @@ -60,7 +67,7 @@ func NewSeeder(network network.Network) (*Seeder, error) { handshakeSignals: new(sync.Map), pendingPeers: NewPeerMap(), livePeers: NewPeerMap(), - addrBook: NewAddressBook(1000), + addrBook: NewAddressBook(), } newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck @@ -88,7 +95,7 @@ func newTestSeeder(network network.Network) (*Seeder, error) { handshakeSignals: new(sync.Map), pendingPeers: NewPeerMap(), livePeers: NewPeerMap(), - addrBook: NewAddressBook(1000), + addrBook: NewAddressBook(), } newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck @@ -134,25 +141,28 @@ func (s *Seeder) Connect(addr, port string) error { return errors.Wrap(err, "constructing outbound peer") } - if s.addrBook.IsBlacklistedAddress(p.NA()) { + // PeerKeys are used in our internal maps to keep signals and responses from specific peers straight. + pk := peerKeyFromPeer(p) + + if s.addrBook.IsBlacklisted(pk) { return ErrBlacklistedPeer } - _, alreadyPending := s.pendingPeers.Load(peerKeyFromPeer(p)) - _, alreadyHandshaking := s.handshakeSignals.Load(peerKeyFromPeer(p)) - _, alreadyLive := s.livePeers.Load(peerKeyFromPeer(p)) + _, alreadyPending := s.pendingPeers.Load(pk) + _, alreadyHandshaking := s.handshakeSignals.Load(pk) + _, alreadyLive := s.livePeers.Load(pk) if alreadyPending { s.logger.Printf("Peer is already pending: %s", p.Addr()) return ErrRepeatConnection } - s.pendingPeers.Store(peerKeyFromPeer(p), p) + s.pendingPeers.Store(pk, p) if alreadyHandshaking { s.logger.Printf("Peer is already handshaking: %s", p.Addr()) return ErrRepeatConnection } - s.handshakeSignals.Store(p.Addr(), make(chan struct{}, 1)) + s.handshakeSignals.Store(pk, make(chan struct{}, 1)) if alreadyLive { s.logger.Printf("Peer is already live: %s", p.Addr()) @@ -168,15 +178,15 @@ func (s *Seeder) Connect(addr, port string) error { s.logger.Printf("Handshake initated with new peer %s", p.Addr()) p.AssociateConnection(conn) - // TODO: handle disconnect during this - handshakeChan, _ := s.handshakeSignals.Load(p.Addr()) + // Wait for + handshakeChan, _ := s.handshakeSignals.Load(pk) select { case <-handshakeChan.(chan struct{}): s.logger.Printf("Handshake completed with new peer %s", p.Addr()) - s.handshakeSignals.Delete(p.Addr()) + s.handshakeSignals.Delete(pk) return nil - case <-time.After(1 * time.Second): + case <-time.After(maximumHandshakeWait): return errors.New("peer handshake started but timed out") } @@ -211,11 +221,11 @@ func (s *Seeder) DisconnectPeer(addr PeerKey) error { return nil } -// DisconnectPeerDishonorably disconnects from a live peer identified by +// DisconnectAndBlacklist disconnects from a live peer identified by // "host:port" string. It returns an error if we aren't connected to that peer. -// "Dishonorably" furthermore removes this peer from the list of known good -// addresses and adds them to a blacklist. -func (s *Seeder) DisconnectPeerDishonorably(addr PeerKey) error { +// It furthermore removes this peer from the list of known good +// addresses and adds them to a blacklist. to prevent future connections. +func (s *Seeder) DisconnectAndBlacklist(addr PeerKey) error { p, ok := s.livePeers.Load(addr) if !ok { @@ -250,6 +260,7 @@ func (s *Seeder) DisconnectAllPeers() { }) } +// RequestAddresses sends a request for more addresses to every peer we're connected to. func (s *Seeder) RequestAddresses() { s.livePeers.Range(func(key PeerKey, p *peer.Peer) bool { s.logger.Printf("Requesting addresses from peer %s", p.Addr()) @@ -258,9 +269,7 @@ func (s *Seeder) RequestAddresses() { }) } -// WaitForAddresses waits for n addresses to be received and their initial -// connection attempts to resolve. There is no escape if that does not happen - -// this is intended for test runners. +// WaitForAddresses waits for n addresses to be confirmed and available in the address book. func (s *Seeder) WaitForAddresses(n int, timeout time.Duration) error { done := make(chan struct{}) go s.addrBook.waitForAddresses(n, done) @@ -274,6 +283,7 @@ func (s *Seeder) WaitForAddresses(n int, timeout time.Duration) error { // Ready reports if the seeder is ready to provide addresses. func (s *Seeder) Ready() bool { - // TODO report ready when we have some addresses - return false + return s.WaitForAddresses(minimumReadyAddresses, 1*time.Millisecond) == nil } + +//func (s *Seeder) Addresses(count int) [] diff --git a/zcash/client_callbacks.go b/zcash/client_callbacks.go index 435dfef..6103892 100644 --- a/zcash/client_callbacks.go +++ b/zcash/client_callbacks.go @@ -1,6 +1,7 @@ package zcash import ( + "runtime" "strconv" "time" @@ -10,41 +11,42 @@ import ( ) func (s *Seeder) onVerAck(p *peer.Peer, msg *wire.MsgVerAck) { + pk := peerKeyFromPeer(p) + // Check if we're expecting to hear from this peer - _, ok := s.pendingPeers.Load(peerKeyFromPeer(p)) + _, ok := s.pendingPeers.Load(pk) if !ok { s.logger.Printf("Got verack from unexpected peer %s", p.Addr()) + // TODO: probably want to disconnect from the peer sending us out-of-order veracks return } // Add to set of live peers - s.livePeers.Store(peerKeyFromPeer(p), p) + s.livePeers.Store(pk, p) // Remove from set of pending peers - s.pendingPeers.Delete(peerKeyFromPeer(p)) + s.pendingPeers.Delete(pk) // Signal successful connection - if signal, ok := s.handshakeSignals.Load(p.Addr()); ok { + if signal, ok := s.handshakeSignals.Load(pk); ok { signal.(chan struct{}) <- struct{}{} } else { s.logger.Printf("Got verack from peer without a callback channel: %s", p.Addr()) - s.DisconnectPeer(peerKeyFromPeer(p)) + s.DisconnectPeer(pk) return } // Add to list of known good addresses if we don't already have it. // Otherwise, update the last-valid time. - newAddr := NewAddress(p.NA()) - if s.addrBook.AlreadyKnowsAddress(p.NA()) { - s.addrBook.UpdateAddressStateFromTemplate(newAddr) - return + if s.addrBook.IsKnown(pk) { + s.addrBook.Touch(pk) + } else { + s.logger.Printf("Adding %s to address list", pk) + s.addrBook.Add(pk) } - s.logger.Printf("Adding %s to address list", p.Addr()) - - s.addrBook.Add(newAddr) return } @@ -63,30 +65,37 @@ func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) { queue <- na } - for i := 0; i < 32; i++ { + for i := 0; i < runtime.NumCPU()*2; i++ { go func() { var na *wire.NetAddress for { select { case next := <-queue: + // Pull the next address off the queue na = next case <-time.After(1 * time.Second): + // Or die if there wasn't one return } + // Note that AllowSelfConns is only exposed in a fork of btcd + // pending https://github.com/btcsuite/btcd/pull/1481, which + // is why the module `replace`s btcd. if !addrmgr.IsRoutable(na) && !s.config.AllowSelfConns { s.logger.Printf("Got bad addr %s:%d from peer %s", na.IP, na.Port, p.Addr()) - s.DisconnectPeerDishonorably(peerKeyFromPeer(p)) + s.DisconnectAndBlacklist(peerKeyFromPeer(p)) continue } - if s.addrBook.AlreadyKnowsAddress(na) { - s.logger.Printf("Already knew about address %s:%d", na.IP, na.Port) + potentialPeer := peerKeyFromNA(na) + + if s.addrBook.IsKnown(potentialPeer) { + s.logger.Printf("Already knew about %s:%d", na.IP, na.Port) continue } - if s.addrBook.IsBlacklistedAddress(na) { - s.logger.Printf("Address %s:%d is blacklisted", na.IP, na.Port) + if s.addrBook.IsBlacklisted(potentialPeer) { + s.logger.Printf("Previously blacklisted %s:%d", na.IP, na.Port) continue } @@ -96,23 +105,14 @@ func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) { if err != nil { s.logger.Printf("Got unusable peer %s:%d from peer %s. Error: %s", na.IP, na.Port, p.Addr(), err) - // Mark previously-known peers as invalid - newAddr := &Address{ - netaddr: p.NA(), - valid: false, - lastTried: time.Now(), - } - - // TODO: function for marking bad addresses directly. needs better storage layer - - if s.addrBook.AlreadyKnowsAddress(p.NA()) { - s.addrBook.UpdateAddressStateFromTemplate(newAddr) - } + // Blacklist the potential peer. We might try to connect again later, + // since we assume IsRoutable filtered out the truly wrong ones. + s.addrBook.Blacklist(potentialPeer) continue } - s.DisconnectPeer(peerKeyFromNA(na)) - s.addrBook.Add(NewAddress(na)) + s.DisconnectPeer(potentialPeer) + s.addrBook.Add(potentialPeer) } }() } diff --git a/zcash/peer_map.go b/zcash/peer_map.go index aa4ceda..3876ddd 100644 --- a/zcash/peer_map.go +++ b/zcash/peer_map.go @@ -12,6 +12,10 @@ import ( // PeerKey is a convenient marker type for the "host:port" format used throughout our maps and lists. type PeerKey string +func (p PeerKey) String() string { + return string(p) +} + func peerKeyFromPeer(p *peer.Peer) PeerKey { return PeerKey(p.Addr()) }