zcash: add address request handling

This commit is contained in:
George Tankersley 2019-10-16 22:18:30 -04:00
parent 7d12f5e57d
commit 21b4c750a3
3 changed files with 219 additions and 20 deletions

40
zcash/address.go Normal file
View File

@ -0,0 +1,40 @@
package zcash
import (
"net"
"strconv"
"time"
"github.com/btcsuite/btcd/wire"
)
type Address struct {
netaddr *wire.NetAddress
valid bool
blacklist bool
lastTried time.Time
}
func (a *Address) IsGood() bool {
return a.valid == true
}
func (a *Address) IsBad() bool {
return a.blacklist == true
}
func (a *Address) String() string {
portString := strconv.Itoa(int(a.netaddr.Port))
return net.JoinHostPort(a.netaddr.IP.String(), portString)
}
// Addresses should be sortable by least-recently-tried
type AddrList []*Address
func (list AddrList) Len() int { return len(list) }
func (list AddrList) Swap(i, j int) { list[i], list[j] = list[j], list[i] }
func (list AddrList) Less(i, j int) bool { return list[i].lastTried.Before(list[j].lastTried) }
// GetShuffledAddressList returns a slice of n valid addresses in random order.
func GetShuffledAddressList(addrList []*Address, n int) []*Address { return nil }

View File

@ -4,9 +4,11 @@ import (
"log"
"net"
"os"
"strconv"
"sync"
"time"
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
@ -18,6 +20,7 @@ import (
var (
ErrRepeatConnection = errors.New("attempted repeat connection to existing peer")
ErrNoSuchPeer = errors.New("no record of requested peer")
ErrAddressTimeout = errors.New("wait for addreses timed out")
)
var defaultPeerConfig = &peer.Config{
@ -34,14 +37,16 @@ type Seeder struct {
config *peer.Config
logger *log.Logger
// Peer list handling
peerState sync.RWMutex
handshakeSignals *sync.Map
pendingPeers *sync.Map
livePeers *sync.Map
addrRecvChan chan *wire.NetAddress
// For mutating the above
peerState sync.RWMutex
// Address list handling
addrState sync.RWMutex
addrRecvCond *sync.Cond
addrList []*Address
}
func NewSeeder(network network.Network) (*Seeder, error) {
@ -58,9 +63,11 @@ func NewSeeder(network network.Network) (*Seeder, error) {
handshakeSignals: new(sync.Map),
pendingPeers: new(sync.Map),
livePeers: new(sync.Map),
addrRecvChan: make(chan *wire.NetAddress, 100),
addrList: make([]*Address, 0, 1000),
}
newSeeder.addrRecvCond = sync.NewCond(&newSeeder.addrState)
newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck
newSeeder.config.Listeners.OnAddr = newSeeder.onAddr
@ -85,9 +92,11 @@ func newTestSeeder(network network.Network) (*Seeder, error) {
handshakeSignals: new(sync.Map),
pendingPeers: new(sync.Map),
livePeers: new(sync.Map),
addrRecvChan: make(chan *wire.NetAddress, 100),
addrList: make([]*Address, 0, 1000),
}
newSeeder.addrRecvCond = sync.NewCond(&newSeeder.addrState)
newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck
newSeeder.config.Listeners.OnAddr = newSeeder.onAddr
@ -134,10 +143,26 @@ func (s *Seeder) onVerAck(p *peer.Peer, msg *wire.MsgVerAck) {
// Signal successful connection
if signal, ok := s.handshakeSignals.Load(p.Addr()); ok {
signal.(chan struct{}) <- struct{}{}
} else {
s.logger.Printf("Got verack from peer without a callback channel: %s", p.Addr())
s.DisconnectPeer(p.Addr())
return
}
s.logger.Printf("Got verack from peer without a callback channel: %s", p.Addr())
// Add to list of known good addresses
newAddr := &Address{
netaddr: p.NA(),
valid: true,
lastTried: time.Now(),
}
s.logger.Printf("Adding %s to address list", p.Addr())
s.addrState.Lock()
s.addrList = append(s.addrList, newAddr)
s.addrState.Unlock()
return
}
// ConnectToPeer attempts to connect to a peer on the default port at the
@ -155,15 +180,30 @@ func (s *Seeder) Connect(addr, port string) error {
return errors.Wrap(err, "constructing outbound peer")
}
_, alreadyPending := s.pendingPeers.LoadOrStore(p.Addr(), p)
_, alreadyHandshaking := s.handshakeSignals.LoadOrStore(p.Addr(), make(chan struct{}, 1))
_, alreadyPending := s.pendingPeers.Load(p.Addr())
_, alreadyHandshaking := s.handshakeSignals.Load(p.Addr())
_, alreadyLive := s.livePeers.Load(p.Addr())
if alreadyPending || alreadyHandshaking || alreadyLive {
s.logger.Printf("Attempted repeat connection to peer %s", p.Addr())
if alreadyPending {
s.logger.Printf("Peer is already pending: %s", p.Addr())
return ErrRepeatConnection
} else {
s.pendingPeers.Store(p.Addr(), p)
}
if alreadyHandshaking {
s.logger.Printf("Peer is already handshaking: %s", p.Addr())
return ErrRepeatConnection
} else {
s.handshakeSignals.Store(p.Addr(), make(chan struct{}, 1))
}
if alreadyLive {
s.logger.Printf("Peer is already live: %s", p.Addr())
return ErrRepeatConnection
}
// TODO time out
conn, err := net.Dial("tcp", p.Addr())
if err != nil {
return errors.Wrap(err, "dialing new peer address")
@ -212,6 +252,7 @@ func (s *Seeder) DisconnectPeer(addr string) error {
// TODO: type safety and error handling
v := p.(*peer.Peer)
s.logger.Printf("Disconnecting from peer %s", v.Addr())
v.Disconnect()
v.WaitForDisconnect()
s.livePeers.Delete(addr)
@ -219,6 +260,39 @@ func (s *Seeder) DisconnectPeer(addr string) error {
return nil
}
// DisconnectPeerDishonorably disconnects from a live peer identified by
// "host:port" string. It returns an error if we aren't connected to that peer.
// "Dishonorably" furthermore removes this peer from the list of known good
// addresses and adds them to a blacklist.
func (s *Seeder) DisconnectPeerDishonorably(addr string) error {
p, ok := s.livePeers.Load(addr)
if !ok {
return ErrNoSuchPeer
}
// TODO: type safety and error handling
v := p.(*peer.Peer)
s.logger.Printf("Disconnecting from peer %s", v.Addr())
v.Disconnect()
v.WaitForDisconnect()
s.livePeers.Delete(addr)
s.addrState.Lock()
for i := 0; i < len(s.addrList); i++ {
address := s.addrList[i]
if address.String() == addr {
s.logger.Printf("Blacklisting peer %s", v.Addr())
address.valid = false
address.blacklist = true
}
}
s.addrState.Unlock()
return nil
}
// DisconnectAllPeers terminates the connections to all live and pending peers.
func (s *Seeder) DisconnectAllPeers() {
s.pendingPeers.Range(func(key, value interface{}) bool {
@ -227,7 +301,6 @@ func (s *Seeder) DisconnectAllPeers() {
s.logger.Printf("Invalid peer in pendingPeers")
return false
}
s.logger.Printf("Disconnecting from pending peer %s", p.Addr())
p.Disconnect()
p.WaitForDisconnect()
s.pendingPeers.Delete(key)
@ -240,7 +313,6 @@ func (s *Seeder) DisconnectAllPeers() {
s.logger.Printf("Invalid peer in livePeers")
return false
}
s.logger.Printf("Disconnecting from live peer %s", p.Addr())
s.DisconnectPeer(p.Addr())
return true
})
@ -259,8 +331,55 @@ func (s *Seeder) RequestAddresses() {
})
}
func (s *Seeder) WaitForMoreAddresses() {
<-s.addrRecvChan
// WaitForAddresses waits for n addresses to be received and their initial
// connection attempts to resolve. There is no escape if that does not happen -
// this is intended for test runners.
func (s *Seeder) WaitForAddresses(n int) error {
s.addrState.Lock()
for {
addrCount := len(s.addrList)
if addrCount < n {
s.addrRecvCond.Wait()
} else {
break
}
}
s.addrState.Unlock()
return nil
}
func (s *Seeder) alreadyKnowsAddress(na *wire.NetAddress) bool {
s.addrState.RLock()
defer s.addrState.RUnlock()
ref := &Address{
netaddr: na,
}
for i := 0; i < len(s.addrList); i++ {
if s.addrList[i].String() == ref.String() {
return true
}
}
return false
}
func (s *Seeder) isBlacklistedAddress(na *wire.NetAddress) bool {
s.addrState.RLock()
defer s.addrState.RUnlock()
ref := &Address{
netaddr: na,
}
for i := 0; i < len(s.addrList); i++ {
if s.addrList[i].String() == ref.String() {
return s.addrList[i].IsBad()
}
}
return false
}
func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) {
@ -271,7 +390,38 @@ func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) {
}
s.logger.Printf("Got %d addrs from peer %s", len(msg.AddrList), p.Addr())
for _, addr := range msg.AddrList {
s.addrRecvChan <- addr
for _, na := range msg.AddrList {
s.logger.Printf("Trying %s:%d from peer %s", na.IP, na.Port, p.Addr())
go func(na *wire.NetAddress) {
if !addrmgr.IsRoutable(na) && !s.config.AllowSelfConns {
s.logger.Printf("Got bad addr %s:%d from peer %s", na.IP, na.Port, p.Addr())
s.DisconnectPeerDishonorably(p.Addr())
return
}
if s.alreadyKnowsAddress(na) {
s.logger.Printf("Already knew about address %s:%d", na.IP, na.Port)
return
}
if s.isBlacklistedAddress(na) {
s.logger.Printf("Address %s:%d is blacklisted", na.IP, na.Port)
return
}
portString := strconv.Itoa(int(na.Port))
err := s.Connect(na.IP.String(), portString)
if err != nil {
s.logger.Printf("Got unusable peer %s:%d from peer %s. Error: ", na.IP, na.Port, p.Addr(), err)
return
}
peerString := net.JoinHostPort(na.IP.String(), portString)
s.DisconnectPeer(peerString)
s.addrRecvCond.Broadcast()
}(na)
}
}

View File

@ -39,9 +39,15 @@ func startMockLoop() {
time.Now(),
0,
net.ParseIP("127.0.0.1"),
uint16(8233),
uint16(18233),
)
cache = append(cache, addr)
addr2 := wire.NewNetAddressTimestamp(
time.Now(),
0,
net.ParseIP("127.0.0.1"),
uint16(18344),
)
cache = append(cache, addr, addr2)
_, err := p.PushAddrMsg(cache)
if err != nil {
mockPeerLogger.Error(err)
@ -154,5 +160,8 @@ func TestRequestAddresses(t *testing.T) {
}
regSeeder.RequestAddresses()
regSeeder.WaitForMoreAddresses()
regSeeder.WaitForAddresses(1)
// TODO It isn't possible to test this wait on a local mock peer without
// carving a path through absolutely all of the bad connection logic.
}