zcash: refactor address book and ensure type consistency for interface{} maps

This commit is contained in:
George Tankersley 2020-05-20 18:08:34 -04:00
parent f1c13a8fd6
commit 8755624103
4 changed files with 187 additions and 144 deletions

View File

@ -10,29 +10,9 @@ import (
)
type Address struct {
netaddr *wire.NetAddress
valid bool
blacklist bool
lastTried time.Time
}
// NewAddress returns a new address that is marked valid, not blacklisted, and
// last tried at time.Now().
func NewAddress(na *wire.NetAddress) *Address {
return &Address{
netaddr: na,
valid: true,
blacklist: false,
lastTried: time.Now(),
}
}
func (a *Address) IsGood() bool {
return a.valid && !a.blacklist
}
func (a *Address) IsBad() bool {
return a.blacklist
netaddr *wire.NetAddress
blacklisted bool
lastUpdate time.Time
}
func (a *Address) String() string {
@ -44,94 +24,141 @@ func (a *Address) asPeerKey() PeerKey {
return PeerKey(a.String())
}
func (a *Address) fromPeerKey(s PeerKey) (*Address, error) {
host, portString, err := net.SplitHostPort(s.String())
if err != nil {
return nil, err
}
portInt, err := strconv.ParseUint(portString, 10, 16)
if err != nil {
return nil, err
}
na := wire.NewNetAddressTimestamp(
time.Now(),
0,
net.ParseIP(host),
uint16(portInt),
)
a.netaddr = na
a.blacklisted = false
a.lastUpdate = na.Timestamp
return a, nil
}
func (a *Address) asNetAddress() *wire.NetAddress {
newNA := *a.netaddr
newNA.Timestamp = a.lastUpdate
return &newNA
}
func (a *Address) fromNetAddress(na *wire.NetAddress) (*Address, error) {
a.netaddr = na
a.blacklisted = false
a.lastUpdate = na.Timestamp
return a, nil
}
func (a *Address) MarshalText() (text []byte, err error) {
return []byte(a.String()), nil
}
type AddressBook struct {
addrList []*Address
addrs map[PeerKey]*Address
addrState sync.RWMutex
addrRecvCond *sync.Cond
}
func (bk *AddressBook) Add(newAddr *Address) {
bk.addrState.Lock()
bk.addrList = append(bk.addrList, newAddr)
bk.addrState.Unlock()
bk.addrRecvCond.Broadcast()
}
func (bk *AddressBook) Blacklist(addr PeerKey) {
bk.addrState.Lock()
for i := 0; i < len(bk.addrList); i++ {
address := bk.addrList[i]
if address.asPeerKey() == addr {
address.valid = false
address.blacklist = true
}
}
bk.addrState.Unlock()
}
func (bk *AddressBook) AlreadyKnowsAddress(na *wire.NetAddress) bool {
bk.addrState.RLock()
defer bk.addrState.RUnlock()
addr := NewAddress(na)
for i := 0; i < len(bk.addrList); i++ {
if bk.addrList[i].String() == addr.String() {
return true
}
}
return false
}
func (bk *AddressBook) IsBlacklistedAddress(na *wire.NetAddress) bool {
bk.addrState.RLock()
defer bk.addrState.RUnlock()
ref := NewAddress(na)
for i := 0; i < len(bk.addrList); i++ {
if bk.addrList[i].String() == ref.String() {
return bk.addrList[i].IsBad()
}
}
return false
}
func (bk *AddressBook) UpdateAddressStateFromTemplate(update *Address) {
bk.addrState.Lock()
defer bk.addrState.Unlock()
for i := 0; i < len(bk.addrList); i++ {
if bk.addrList[i].String() == update.String() {
bk.addrList[i].valid = update.valid
bk.addrList[i].blacklist = update.blacklist
bk.addrList[i].lastTried = update.lastTried
return
}
}
}
func NewAddressBook(capacity int) *AddressBook {
func NewAddressBook() *AddressBook {
addrBook := &AddressBook{
addrList: make([]*Address, 0, capacity),
addrs: make(map[PeerKey]*Address),
}
addrBook.addrRecvCond = sync.NewCond(&addrBook.addrState)
return addrBook
}
func (bk *AddressBook) Add(s PeerKey) {
newAddr, err := (&Address{}).fromPeerKey(s)
if err != nil {
// XXX effectively NOP bogus peer strings
return
}
bk.addrState.Lock()
bk.addrs[s] = newAddr
bk.addrState.Unlock()
// Wake anyone who was waiting on us to receive an address.
bk.addrRecvCond.Broadcast()
}
func (bk *AddressBook) Remove(s PeerKey) {
bk.addrState.Lock()
defer bk.addrState.Unlock()
if _, ok := bk.addrs[s]; ok {
delete(bk.addrs, s)
}
}
func (bk *AddressBook) Blacklist(s PeerKey) {
bk.addrState.Lock()
defer bk.addrState.Unlock()
if target, ok := bk.addrs[s]; ok {
target.blacklisted = true
target.lastUpdate = time.Now()
} else {
// Create a new Address just to be blacklisted
addr, err := (&Address{}).fromPeerKey(s)
if err != nil {
// XXX effectively NOP bogus peer strings
return
}
addr.blacklisted = true
bk.addrs[s] = addr
}
}
// Touch updates the last-seen timestamp if the peer is in the address book or does nothing if not.
func (bk *AddressBook) Touch(s PeerKey) {
bk.addrState.Lock()
defer bk.addrState.Unlock()
if target, ok := bk.addrs[s]; ok {
target.lastUpdate = time.Now()
}
}
// IsKnown returns true if the peer is already in our address book, false if not.
func (bk *AddressBook) IsKnown(s PeerKey) bool {
bk.addrState.RLock()
defer bk.addrState.RUnlock()
_, known := bk.addrs[s]
return known
}
func (bk *AddressBook) IsBlacklisted(s PeerKey) bool {
bk.addrState.RLock()
defer bk.addrState.RUnlock()
if target, ok := bk.addrs[s]; ok {
return target.blacklisted
}
return false
}
// WaitForAddresses waits for n addresses to be received and their initial
// connection attempts to resolve. There is no escape if that does not happen -
// this is intended for test runners or goroutines with a timeout.
func (bk *AddressBook) waitForAddresses(n int, done chan struct{}) {
bk.addrState.Lock()
for {
addrCount := len(bk.addrList)
addrCount := len(bk.addrs)
if addrCount < n {
bk.addrRecvCond.Wait()
} else {
@ -144,4 +171,6 @@ func (bk *AddressBook) waitForAddresses(n int, done chan struct{}) {
}
// GetShuffledAddressList returns a slice of n valid addresses in random order.
func (ab *AddressBook) GetShuffledAddressList(n int) []*Address { return nil }
// func (bk *AddressBook) GetShuffledAddressList(n int) []*Address {
// }

View File

@ -31,6 +31,13 @@ var defaultPeerConfig = &peer.Config{
ProtocolVersion: 170009, // Blossom
}
// The minimum number of addresses we need to know about to begin serving introductions.
const minimumReadyAddresses = 10
// The maximum amount of time we will wait for a peer to complete the initial handshake.
const maximumHandshakeWait = 1 * time.Second
// Seeder contains all of the state and configuration needed to request addresses from Zcash peers and present them to a DNS provider.
type Seeder struct {
peer *peer.Peer
config *peer.Config
@ -60,7 +67,7 @@ func NewSeeder(network network.Network) (*Seeder, error) {
handshakeSignals: new(sync.Map),
pendingPeers: NewPeerMap(),
livePeers: NewPeerMap(),
addrBook: NewAddressBook(1000),
addrBook: NewAddressBook(),
}
newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck
@ -88,7 +95,7 @@ func newTestSeeder(network network.Network) (*Seeder, error) {
handshakeSignals: new(sync.Map),
pendingPeers: NewPeerMap(),
livePeers: NewPeerMap(),
addrBook: NewAddressBook(1000),
addrBook: NewAddressBook(),
}
newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck
@ -134,25 +141,28 @@ func (s *Seeder) Connect(addr, port string) error {
return errors.Wrap(err, "constructing outbound peer")
}
if s.addrBook.IsBlacklistedAddress(p.NA()) {
// PeerKeys are used in our internal maps to keep signals and responses from specific peers straight.
pk := peerKeyFromPeer(p)
if s.addrBook.IsBlacklisted(pk) {
return ErrBlacklistedPeer
}
_, alreadyPending := s.pendingPeers.Load(peerKeyFromPeer(p))
_, alreadyHandshaking := s.handshakeSignals.Load(peerKeyFromPeer(p))
_, alreadyLive := s.livePeers.Load(peerKeyFromPeer(p))
_, alreadyPending := s.pendingPeers.Load(pk)
_, alreadyHandshaking := s.handshakeSignals.Load(pk)
_, alreadyLive := s.livePeers.Load(pk)
if alreadyPending {
s.logger.Printf("Peer is already pending: %s", p.Addr())
return ErrRepeatConnection
}
s.pendingPeers.Store(peerKeyFromPeer(p), p)
s.pendingPeers.Store(pk, p)
if alreadyHandshaking {
s.logger.Printf("Peer is already handshaking: %s", p.Addr())
return ErrRepeatConnection
}
s.handshakeSignals.Store(p.Addr(), make(chan struct{}, 1))
s.handshakeSignals.Store(pk, make(chan struct{}, 1))
if alreadyLive {
s.logger.Printf("Peer is already live: %s", p.Addr())
@ -168,15 +178,15 @@ func (s *Seeder) Connect(addr, port string) error {
s.logger.Printf("Handshake initated with new peer %s", p.Addr())
p.AssociateConnection(conn)
// TODO: handle disconnect during this
handshakeChan, _ := s.handshakeSignals.Load(p.Addr())
// Wait for
handshakeChan, _ := s.handshakeSignals.Load(pk)
select {
case <-handshakeChan.(chan struct{}):
s.logger.Printf("Handshake completed with new peer %s", p.Addr())
s.handshakeSignals.Delete(p.Addr())
s.handshakeSignals.Delete(pk)
return nil
case <-time.After(1 * time.Second):
case <-time.After(maximumHandshakeWait):
return errors.New("peer handshake started but timed out")
}
@ -211,11 +221,11 @@ func (s *Seeder) DisconnectPeer(addr PeerKey) error {
return nil
}
// DisconnectPeerDishonorably disconnects from a live peer identified by
// DisconnectAndBlacklist disconnects from a live peer identified by
// "host:port" string. It returns an error if we aren't connected to that peer.
// "Dishonorably" furthermore removes this peer from the list of known good
// addresses and adds them to a blacklist.
func (s *Seeder) DisconnectPeerDishonorably(addr PeerKey) error {
// It furthermore removes this peer from the list of known good
// addresses and adds them to a blacklist. to prevent future connections.
func (s *Seeder) DisconnectAndBlacklist(addr PeerKey) error {
p, ok := s.livePeers.Load(addr)
if !ok {
@ -250,6 +260,7 @@ func (s *Seeder) DisconnectAllPeers() {
})
}
// RequestAddresses sends a request for more addresses to every peer we're connected to.
func (s *Seeder) RequestAddresses() {
s.livePeers.Range(func(key PeerKey, p *peer.Peer) bool {
s.logger.Printf("Requesting addresses from peer %s", p.Addr())
@ -258,9 +269,7 @@ func (s *Seeder) RequestAddresses() {
})
}
// WaitForAddresses waits for n addresses to be received and their initial
// connection attempts to resolve. There is no escape if that does not happen -
// this is intended for test runners.
// WaitForAddresses waits for n addresses to be confirmed and available in the address book.
func (s *Seeder) WaitForAddresses(n int, timeout time.Duration) error {
done := make(chan struct{})
go s.addrBook.waitForAddresses(n, done)
@ -274,6 +283,7 @@ func (s *Seeder) WaitForAddresses(n int, timeout time.Duration) error {
// Ready reports if the seeder is ready to provide addresses.
func (s *Seeder) Ready() bool {
// TODO report ready when we have some addresses
return false
return s.WaitForAddresses(minimumReadyAddresses, 1*time.Millisecond) == nil
}
//func (s *Seeder) Addresses(count int) []

View File

@ -1,6 +1,7 @@
package zcash
import (
"runtime"
"strconv"
"time"
@ -10,41 +11,42 @@ import (
)
func (s *Seeder) onVerAck(p *peer.Peer, msg *wire.MsgVerAck) {
pk := peerKeyFromPeer(p)
// Check if we're expecting to hear from this peer
_, ok := s.pendingPeers.Load(peerKeyFromPeer(p))
_, ok := s.pendingPeers.Load(pk)
if !ok {
s.logger.Printf("Got verack from unexpected peer %s", p.Addr())
// TODO: probably want to disconnect from the peer sending us out-of-order veracks
return
}
// Add to set of live peers
s.livePeers.Store(peerKeyFromPeer(p), p)
s.livePeers.Store(pk, p)
// Remove from set of pending peers
s.pendingPeers.Delete(peerKeyFromPeer(p))
s.pendingPeers.Delete(pk)
// Signal successful connection
if signal, ok := s.handshakeSignals.Load(p.Addr()); ok {
if signal, ok := s.handshakeSignals.Load(pk); ok {
signal.(chan struct{}) <- struct{}{}
} else {
s.logger.Printf("Got verack from peer without a callback channel: %s", p.Addr())
s.DisconnectPeer(peerKeyFromPeer(p))
s.DisconnectPeer(pk)
return
}
// Add to list of known good addresses if we don't already have it.
// Otherwise, update the last-valid time.
newAddr := NewAddress(p.NA())
if s.addrBook.AlreadyKnowsAddress(p.NA()) {
s.addrBook.UpdateAddressStateFromTemplate(newAddr)
return
if s.addrBook.IsKnown(pk) {
s.addrBook.Touch(pk)
} else {
s.logger.Printf("Adding %s to address list", pk)
s.addrBook.Add(pk)
}
s.logger.Printf("Adding %s to address list", p.Addr())
s.addrBook.Add(newAddr)
return
}
@ -63,30 +65,37 @@ func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) {
queue <- na
}
for i := 0; i < 32; i++ {
for i := 0; i < runtime.NumCPU()*2; i++ {
go func() {
var na *wire.NetAddress
for {
select {
case next := <-queue:
// Pull the next address off the queue
na = next
case <-time.After(1 * time.Second):
// Or die if there wasn't one
return
}
// Note that AllowSelfConns is only exposed in a fork of btcd
// pending https://github.com/btcsuite/btcd/pull/1481, which
// is why the module `replace`s btcd.
if !addrmgr.IsRoutable(na) && !s.config.AllowSelfConns {
s.logger.Printf("Got bad addr %s:%d from peer %s", na.IP, na.Port, p.Addr())
s.DisconnectPeerDishonorably(peerKeyFromPeer(p))
s.DisconnectAndBlacklist(peerKeyFromPeer(p))
continue
}
if s.addrBook.AlreadyKnowsAddress(na) {
s.logger.Printf("Already knew about address %s:%d", na.IP, na.Port)
potentialPeer := peerKeyFromNA(na)
if s.addrBook.IsKnown(potentialPeer) {
s.logger.Printf("Already knew about %s:%d", na.IP, na.Port)
continue
}
if s.addrBook.IsBlacklistedAddress(na) {
s.logger.Printf("Address %s:%d is blacklisted", na.IP, na.Port)
if s.addrBook.IsBlacklisted(potentialPeer) {
s.logger.Printf("Previously blacklisted %s:%d", na.IP, na.Port)
continue
}
@ -96,23 +105,14 @@ func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) {
if err != nil {
s.logger.Printf("Got unusable peer %s:%d from peer %s. Error: %s", na.IP, na.Port, p.Addr(), err)
// Mark previously-known peers as invalid
newAddr := &Address{
netaddr: p.NA(),
valid: false,
lastTried: time.Now(),
}
// TODO: function for marking bad addresses directly. needs better storage layer
if s.addrBook.AlreadyKnowsAddress(p.NA()) {
s.addrBook.UpdateAddressStateFromTemplate(newAddr)
}
// Blacklist the potential peer. We might try to connect again later,
// since we assume IsRoutable filtered out the truly wrong ones.
s.addrBook.Blacklist(potentialPeer)
continue
}
s.DisconnectPeer(peerKeyFromNA(na))
s.addrBook.Add(NewAddress(na))
s.DisconnectPeer(potentialPeer)
s.addrBook.Add(potentialPeer)
}
}()
}

View File

@ -12,6 +12,10 @@ import (
// PeerKey is a convenient marker type for the "host:port" format used throughout our maps and lists.
type PeerKey string
func (p PeerKey) String() string {
return string(p)
}
func peerKeyFromPeer(p *peer.Peer) PeerKey {
return PeerKey(p.Addr())
}