zcash: working but unpleasant refactoring of address handling

This commit is contained in:
George Tankersley 2019-12-30 20:54:38 -05:00
parent 5f8cea77c0
commit 54a67e5e72
6 changed files with 181 additions and 141 deletions

2
go.sum
View File

@ -1,3 +1,4 @@
github.com/aead/siphash v1.0.1 h1:FwHfE/T45KPKYuuSAKyyvE+oPWcaQ+CUmFW0bPlM+kg=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f h1:bAs4lUbRJpnnkd9VhRV3jjAVU7DJVjMaK+IsvSeZvFo=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
@ -23,6 +24,7 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=

View File

@ -16,6 +16,8 @@ type Address struct {
lastTried time.Time
}
// NewAddress returns a new address that is marked valid, not blacklisted, and
// last tried at time.Now().
func NewAddress(na *wire.NetAddress) *Address {
return &Address{
netaddr: na,
@ -56,6 +58,8 @@ func (bk *AddressBook) Add(newAddr *Address) {
bk.addrState.Lock()
bk.addrList = append(bk.addrList, newAddr)
bk.addrState.Unlock()
bk.addrRecvCond.Broadcast()
}
func (bk *AddressBook) Blacklist(addr PeerKey) {
@ -99,7 +103,7 @@ func (bk *AddressBook) IsBlacklistedAddress(na *wire.NetAddress) bool {
return false
}
func (bk *AddressBook) UpdateAddressState(update *Address) {
func (bk *AddressBook) UpdateAddressStateFromTemplate(update *Address) {
bk.addrState.Lock()
defer bk.addrState.Unlock()
@ -121,5 +125,23 @@ func NewAddressBook(capacity int) *AddressBook {
return addrBook
}
// WaitForAddresses waits for n addresses to be received and their initial
// connection attempts to resolve. There is no escape if that does not happen -
// this is intended for test runners or goroutines with a timeout.
func (bk *AddressBook) waitForAddresses(n int, done chan struct{}) {
bk.addrState.Lock()
for {
addrCount := len(bk.addrList)
if addrCount < n {
bk.addrRecvCond.Wait()
} else {
break
}
}
bk.addrState.Unlock()
done <- struct{}{}
return
}
// GetShuffledAddressList returns a slice of n valid addresses in random order.
func (ab *AddressBook) GetShuffledAddressList(n int) []*Address { return nil }

View File

@ -4,11 +4,9 @@ import (
"log"
"net"
"os"
"strconv"
"sync"
"time"
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
@ -20,7 +18,7 @@ import (
var (
ErrRepeatConnection = errors.New("attempted repeat connection to existing peer")
ErrNoSuchPeer = errors.New("no record of requested peer")
ErrAddressTimeout = errors.New("wait for addreses timed out")
ErrAddressTimeout = errors.New("wait for addresses timed out")
ErrBlacklistedPeer = errors.New("peer is blacklisted")
)
@ -77,8 +75,9 @@ func newTestSeeder(network network.Network) (*Seeder, error) {
return nil, errors.Wrap(err, "could not construct seeder")
}
sink, _ := os.OpenFile(os.DevNull, os.O_WRONLY, 0666)
logger := log.New(sink, "zcash_seeder: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC)
// sink, _ := os.OpenFile(os.DevNull, os.O_WRONLY, 0666)
// logger := log.New(sink, "zcash_seeder: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC)
logger := log.New(os.Stdout, "zcash_seeder: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC)
// Allows connections to self for easy mocking
config.AllowSelfConns = true
@ -120,46 +119,7 @@ func (s *Seeder) GetNetworkDefaultPort() string {
return s.config.ChainParams.DefaultPort
}
func (s *Seeder) onVerAck(p *peer.Peer, msg *wire.MsgVerAck) {
// Check if we're expecting to hear from this peer
_, ok := s.pendingPeers.Load(peerKeyFromPeer(p))
if !ok {
s.logger.Printf("Got verack from unexpected peer %s", p.Addr())
return
}
// Add to set of live peers
s.livePeers.Store(peerKeyFromPeer(p), p)
// Remove from set of pending peers
s.pendingPeers.Delete(peerKeyFromPeer(p))
// Signal successful connection
if signal, ok := s.handshakeSignals.Load(p.Addr()); ok {
signal.(chan struct{}) <- struct{}{}
} else {
s.logger.Printf("Got verack from peer without a callback channel: %s", p.Addr())
s.DisconnectPeer(peerKeyFromPeer(p))
return
}
// Add to list of known good addresses if we don't already have it.
// Otherwise, update the last-valid time.
if s.addrBook.AlreadyKnowsAddress(p.NA()) {
newAddr := NewAddress(p.NA())
s.updateAddressState(newAddr)
return
}
s.logger.Printf("Adding %s to address list", p.Addr())
s.addrBook.Add(newAddr)
return
}
// ConnectToPeer attempts to connect to a peer on the default port at the
// ConnectOnDefaultPort attempts to connect to a peer on the default port at the
// specified address. It returns an error if it can't complete handshake with
// the peer. Otherwise it returns nil and adds the peer to the list of live
// connections and known-good addresses.
@ -185,16 +145,14 @@ func (s *Seeder) Connect(addr, port string) error {
if alreadyPending {
s.logger.Printf("Peer is already pending: %s", p.Addr())
return ErrRepeatConnection
} else {
s.pendingPeers.Store(peerKeyFromPeer(p), p)
}
s.pendingPeers.Store(peerKeyFromPeer(p), p)
if alreadyHandshaking {
s.logger.Printf("Peer is already handshaking: %s", p.Addr())
return ErrRepeatConnection
} else {
s.handshakeSignals.Store(p.Addr(), make(chan struct{}, 1))
}
s.handshakeSignals.Store(p.Addr(), make(chan struct{}, 1))
if alreadyLive {
s.logger.Printf("Peer is already live: %s", p.Addr())
@ -231,7 +189,7 @@ func (s *Seeder) GetPeer(addr PeerKey) (*peer.Peer, error) {
p, ok := s.livePeers.Load(addr)
if ok {
return p.(*peer.Peer), nil
return p, nil
}
return nil, ErrNoSuchPeer
@ -287,7 +245,7 @@ func (s *Seeder) DisconnectAllPeers() {
})
s.livePeers.Range(func(key PeerKey, p *peer.Peer) bool {
s.DisconnectPeer(p.Addr())
s.DisconnectPeer(key)
return true
})
}
@ -303,86 +261,13 @@ func (s *Seeder) RequestAddresses() {
// WaitForAddresses waits for n addresses to be received and their initial
// connection attempts to resolve. There is no escape if that does not happen -
// this is intended for test runners.
func (s *Seeder) WaitForAddresses(n int) error {
s.addrState.Lock()
for {
addrCount := len(s.addrList)
if addrCount < n {
s.addrRecvCond.Wait()
} else {
break
}
}
s.addrState.Unlock()
return nil
}
func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) {
if len(msg.AddrList) == 0 {
s.logger.Printf("Got empty addr message from peer %s. Disconnecting.", p.Addr())
s.DisconnectPeer(p.Addr())
return
}
s.logger.Printf("Got %d addrs from peer %s", len(msg.AddrList), p.Addr())
queue := make(chan *wire.NetAddress, len(msg.AddrList))
for _, na := range msg.AddrList {
queue <- na
}
for i := 0; i < 32; i++ {
go func() {
var na *wire.NetAddress
for {
select {
case next := <-queue:
na = next
case <-time.After(1 * time.Second):
return
}
if !addrmgr.IsRoutable(na) && !s.config.AllowSelfConns {
s.logger.Printf("Got bad addr %s:%d from peer %s", na.IP, na.Port, p.Addr())
s.DisconnectPeerDishonorably(p.Addr())
continue
}
if s.addrBook.AlreadyKnowsAddress(na) {
s.logger.Printf("Already knew about address %s:%d", na.IP, na.Port)
continue
}
if s.addrBook.IsBlacklistedAddress(na) {
s.logger.Printf("Address %s:%d is blacklisted", na.IP, na.Port)
continue
}
portString := strconv.Itoa(int(na.Port))
err := s.Connect(na.IP.String(), portString)
if err != nil {
s.logger.Printf("Got unusable peer %s:%d from peer %s. Error: %s", na.IP, na.Port, p.Addr(), err)
// Mark previously-known peers as invalid
newAddr := &Address{
netaddr: p.NA(),
valid: false,
lastTried: time.Now(),
}
if s.alreadyKnowsAddress(p.NA()) {
s.updateAddressState(newAddr)
}
continue
}
peerString := net.JoinHostPort(na.IP.String(), portString)
s.DisconnectPeer(peerString)
s.addrRecvCond.Broadcast()
}
}()
func (s *Seeder) WaitForAddresses(n int, timeout time.Duration) error {
done := make(chan struct{})
go s.addrBook.waitForAddresses(n, done)
select {
case <-done:
return nil
case <-time.After(timeout):
return ErrAddressTimeout
}
}

119
zcash/client_callbacks.go Normal file
View File

@ -0,0 +1,119 @@
package zcash
import (
"strconv"
"time"
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
)
func (s *Seeder) onVerAck(p *peer.Peer, msg *wire.MsgVerAck) {
// Check if we're expecting to hear from this peer
_, ok := s.pendingPeers.Load(peerKeyFromPeer(p))
if !ok {
s.logger.Printf("Got verack from unexpected peer %s", p.Addr())
return
}
// Add to set of live peers
s.livePeers.Store(peerKeyFromPeer(p), p)
// Remove from set of pending peers
s.pendingPeers.Delete(peerKeyFromPeer(p))
// Signal successful connection
if signal, ok := s.handshakeSignals.Load(p.Addr()); ok {
signal.(chan struct{}) <- struct{}{}
} else {
s.logger.Printf("Got verack from peer without a callback channel: %s", p.Addr())
s.DisconnectPeer(peerKeyFromPeer(p))
return
}
// Add to list of known good addresses if we don't already have it.
// Otherwise, update the last-valid time.
newAddr := NewAddress(p.NA())
if s.addrBook.AlreadyKnowsAddress(p.NA()) {
s.addrBook.UpdateAddressStateFromTemplate(newAddr)
return
}
s.logger.Printf("Adding %s to address list", p.Addr())
s.addrBook.Add(newAddr)
return
}
func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) {
if len(msg.AddrList) == 0 {
s.logger.Printf("Got empty addr message from peer %s. Disconnecting.", p.Addr())
s.DisconnectPeer(peerKeyFromPeer(p))
return
}
s.logger.Printf("Got %d addrs from peer %s", len(msg.AddrList), p.Addr())
queue := make(chan *wire.NetAddress, len(msg.AddrList))
for _, na := range msg.AddrList {
queue <- na
}
for i := 0; i < 32; i++ {
go func() {
var na *wire.NetAddress
for {
select {
case next := <-queue:
na = next
case <-time.After(1 * time.Second):
return
}
if !addrmgr.IsRoutable(na) && !s.config.AllowSelfConns {
s.logger.Printf("Got bad addr %s:%d from peer %s", na.IP, na.Port, p.Addr())
s.DisconnectPeerDishonorably(peerKeyFromPeer(p))
continue
}
if s.addrBook.AlreadyKnowsAddress(na) {
s.logger.Printf("Already knew about address %s:%d", na.IP, na.Port)
continue
}
if s.addrBook.IsBlacklistedAddress(na) {
s.logger.Printf("Address %s:%d is blacklisted", na.IP, na.Port)
continue
}
portString := strconv.Itoa(int(na.Port))
err := s.Connect(na.IP.String(), portString)
if err != nil {
s.logger.Printf("Got unusable peer %s:%d from peer %s. Error: %s", na.IP, na.Port, p.Addr(), err)
// Mark previously-known peers as invalid
newAddr := &Address{
netaddr: p.NA(),
valid: false,
lastTried: time.Now(),
}
// TODO: function for marking bad addresses directly. needs better storage layer
if s.addrBook.AlreadyKnowsAddress(p.NA()) {
s.addrBook.UpdateAddressStateFromTemplate(newAddr)
}
continue
}
s.DisconnectPeer(peerKeyFromNA(na))
s.addrBook.Add(NewAddress(na))
}
}()
}
}

View File

@ -155,13 +155,18 @@ func TestRequestAddresses(t *testing.T) {
err = regSeeder.ConnectOnDefaultPort("127.0.0.1")
if err != nil {
t.Error(err)
return
t.Fatal(err)
}
regSeeder.RequestAddresses()
regSeeder.WaitForAddresses(1)
err = regSeeder.WaitForAddresses(1, 1*time.Second)
// TODO It isn't possible to test this wait on a local mock peer without
// carving a path through absolutely all of the bad connection logic.
if err != nil {
t.Errorf("Error getting one mocked address: %v", err)
}
err = regSeeder.WaitForAddresses(500, 1*time.Second)
if err != ErrAddressTimeout {
t.Errorf("Should have timed out, instead got: %v", err)
}
}

View File

@ -9,7 +9,7 @@ import (
"github.com/btcsuite/btcd/wire"
)
// The "host:port" format used throughout our maps and lists.
// PeerKey is a convenient marker type for the "host:port" format used throughout our maps and lists.
type PeerKey string
func peerKeyFromPeer(p *peer.Peer) PeerKey {
@ -40,7 +40,7 @@ func (pm *PeerMap) Load(key PeerKey) (*peer.Peer, bool) {
v, mapOk := pm.m.Load(key)
if mapOk {
p, typeOk := v.(*peer.Peer)
if typeOK {
if typeOk {
return p, true
}
}
@ -77,5 +77,12 @@ func (pm *PeerMap) Delete(key PeerKey) {
// Range may be O(N) with the number of elements in the map even if f returns
// false after a constant number of calls.
func (pm *PeerMap) Range(f func(key PeerKey, value *peer.Peer) bool) {
pm.m.Range(f)
// TODO: gaaaaaah
fUntyped := func(untypedKey, untypedValue interface{}) bool {
typedKey, _ := untypedKey.(PeerKey)
typedValue, _ := untypedValue.(*peer.Peer)
return f(typedKey, typedValue)
}
pm.m.Range(fUntyped)
}