zcash: implement peer handshakes and state tracking
This commit is contained in:
parent
bdb049e215
commit
0f1773c0d1
187
zcash/client.go
187
zcash/client.go
|
@ -24,20 +24,65 @@ var defaultPeerConfig = &peer.Config{
|
|||
ProtocolVersion: 170009, // Blossom
|
||||
}
|
||||
|
||||
var logger = log.New(os.Stdout, "zcash_client: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC)
|
||||
|
||||
type Seeder struct {
|
||||
peer *peer.Peer
|
||||
config *peer.Config
|
||||
logger *log.Logger
|
||||
|
||||
handshakeComplete chan *peer.Peer
|
||||
handshakePendingPeers map[string]*peer.Peer
|
||||
livePeers map[string]*peer.Peer
|
||||
handshakeSignals map[string]chan *peer.Peer
|
||||
pendingPeers map[string]*peer.Peer
|
||||
livePeers map[string]*peer.Peer
|
||||
|
||||
// For mutating the above
|
||||
peerState sync.RWMutex
|
||||
}
|
||||
|
||||
func NewSeeder(network network.Network) (*Seeder, error) {
|
||||
config, err := newSeederPeerConfig(network, defaultPeerConfig)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not construct seeder")
|
||||
}
|
||||
|
||||
logger := log.New(os.Stdout, "zcash_seeder: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC)
|
||||
|
||||
newSeeder := Seeder{
|
||||
config: config,
|
||||
logger: logger,
|
||||
handshakeSignals: make(map[string]chan *peer.Peer),
|
||||
pendingPeers: make(map[string]*peer.Peer),
|
||||
livePeers: make(map[string]*peer.Peer),
|
||||
}
|
||||
|
||||
newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck
|
||||
|
||||
return &newSeeder, nil
|
||||
}
|
||||
|
||||
func newTestSeeder(network network.Network) (*Seeder, error) {
|
||||
config, err := newSeederPeerConfig(network, defaultPeerConfig)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not construct seeder")
|
||||
}
|
||||
|
||||
sink, _ := os.OpenFile(os.DevNull, os.O_WRONLY, 0666)
|
||||
logger := log.New(sink, "zcash_seeder: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC)
|
||||
|
||||
// Allows connections to self for easy mocking
|
||||
config.AllowSelfConns = true
|
||||
|
||||
newSeeder := Seeder{
|
||||
config: config,
|
||||
logger: logger,
|
||||
handshakeSignals: make(map[string]chan *peer.Peer),
|
||||
pendingPeers: make(map[string]*peer.Peer),
|
||||
livePeers: make(map[string]*peer.Peer),
|
||||
}
|
||||
|
||||
newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck
|
||||
|
||||
return &newSeeder, nil
|
||||
}
|
||||
|
||||
func newSeederPeerConfig(magic network.Network, template *peer.Config) (*peer.Config, error) {
|
||||
var newPeerConfig peer.Config
|
||||
|
||||
|
@ -55,94 +100,122 @@ func newSeederPeerConfig(magic network.Network, template *peer.Config) (*peer.Co
|
|||
return &newPeerConfig, nil
|
||||
}
|
||||
|
||||
func (s *Seeder) OnVerAck(p *peer.Peer, msg *wire.MsgVerAck) {
|
||||
func (s *Seeder) onVerAck(p *peer.Peer, msg *wire.MsgVerAck) {
|
||||
// lock peers for read
|
||||
s.peerState.RLock()
|
||||
if s.handshakePendingPeers[p.Addr()] == nil {
|
||||
logger.Printf("Got verack from unexpected peer %s", p.Addr())
|
||||
s.peerState.RUnlock()
|
||||
_, expectingPeer := s.pendingPeers[p.Addr()]
|
||||
s.peerState.RUnlock()
|
||||
|
||||
if !expectingPeer {
|
||||
s.logger.Printf("Got verack from unexpected peer %s", p.Addr())
|
||||
return
|
||||
}
|
||||
|
||||
s.peerState.Lock()
|
||||
{
|
||||
// Add to set of live peers
|
||||
s.livePeers[p.Addr()] = p
|
||||
|
||||
// Remove from set of pending peers
|
||||
delete(s.pendingPeers, p.Addr())
|
||||
}
|
||||
s.peerState.Unlock()
|
||||
|
||||
s.peerState.RLock()
|
||||
{
|
||||
// Signal successful connection
|
||||
s.handshakeSignals[p.Addr()] <- p
|
||||
}
|
||||
s.peerState.RUnlock()
|
||||
|
||||
logger.Printf("Handshake completed with new peer %s", p.Addr())
|
||||
|
||||
s.peerState.Lock()
|
||||
delete(s.handshakePendingPeers, p.Addr())
|
||||
s.livePeers[p.Addr()] = p
|
||||
s.peerState.Unlock()
|
||||
|
||||
s.handshakeComplete <- p
|
||||
}
|
||||
|
||||
func NewSeeder(network network.Network) (*Seeder, error) {
|
||||
config, err := newSeederPeerConfig(network, defaultPeerConfig)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not construct seeder")
|
||||
}
|
||||
|
||||
newSeeder := Seeder{
|
||||
config: config,
|
||||
handshakeComplete: make(chan *peer.Peer, 1),
|
||||
handshakePendingPeers: make(map[string]*peer.Peer),
|
||||
livePeers: make(map[string]*peer.Peer),
|
||||
}
|
||||
|
||||
newSeeder.config.Listeners.OnVerAck = newSeeder.OnVerAck
|
||||
|
||||
return &newSeeder, nil
|
||||
}
|
||||
|
||||
// ConnectToPeer attempts to connect to a peer on the default port at the
|
||||
// specified address. It returns either a live peer connection or an error.
|
||||
func (s *Seeder) ConnectToPeer(addr string) (*peer.Peer, error) {
|
||||
func (s *Seeder) ConnectToPeer(addr string) error {
|
||||
connectionString := net.JoinHostPort(addr, s.config.ChainParams.DefaultPort)
|
||||
|
||||
p, err := peer.NewOutboundPeer(s.config, connectionString)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "constructing outbound peer")
|
||||
return errors.Wrap(err, "constructing outbound peer")
|
||||
}
|
||||
|
||||
conn, err := net.Dial("tcp", p.Addr())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "dialing new peer address")
|
||||
return errors.Wrap(err, "dialing new peer address")
|
||||
}
|
||||
|
||||
s.peerState.Lock()
|
||||
s.handshakePendingPeers[p.Addr()] = p
|
||||
{
|
||||
// Record that we're expecting a verack from this peer.
|
||||
s.pendingPeers[p.Addr()] = p
|
||||
|
||||
// Make a channel for us to wait on.
|
||||
s.handshakeSignals[p.Addr()] = make(chan *peer.Peer, 1)
|
||||
}
|
||||
s.peerState.Unlock()
|
||||
|
||||
// Begin connection negotiation.
|
||||
s.logger.Printf("Handshake initated with new peer %s", p.Addr())
|
||||
p.AssociateConnection(conn)
|
||||
|
||||
for {
|
||||
// lock signals map for select
|
||||
s.peerState.RLock()
|
||||
handshakeChan := s.handshakeSignals[p.Addr()]
|
||||
s.peerState.RUnlock()
|
||||
|
||||
select {
|
||||
case verackPeer := <-s.handshakeComplete:
|
||||
if verackPeer.Addr() == p.Addr() {
|
||||
return p, nil
|
||||
case verackPeer := <-handshakeChan:
|
||||
s.peerState.Lock()
|
||||
{
|
||||
close(s.handshakeSignals[p.Addr()])
|
||||
delete(s.handshakeSignals, p.Addr())
|
||||
}
|
||||
s.peerState.Unlock()
|
||||
s.logger.Printf("Handshake completed with new peer %s", verackPeer.Addr())
|
||||
return nil
|
||||
case <-time.After(time.Second * 1):
|
||||
return nil, errors.New("peer handshake timed out")
|
||||
return errors.New("peer handshake timed out")
|
||||
}
|
||||
}
|
||||
|
||||
panic("This should be unreachable")
|
||||
}
|
||||
|
||||
func (s *Seeder) GracefulDisconnect() {
|
||||
func (s *Seeder) GetPeer(addr string) (*peer.Peer, error) {
|
||||
lookupKey := net.JoinHostPort(addr, s.config.ChainParams.DefaultPort)
|
||||
s.peerState.RLock()
|
||||
p, ok := s.livePeers[lookupKey]
|
||||
s.peerState.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return nil, errors.New("no such active peer")
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (s *Seeder) WaitForPeers() {
|
||||
panic("not yet implemented")
|
||||
}
|
||||
|
||||
func (s *Seeder) DisconnectAllPeers() {
|
||||
s.peerState.Lock()
|
||||
{
|
||||
for _, v := range s.pendingPeers {
|
||||
s.logger.Printf("Disconnecting from peer %s", v.Addr())
|
||||
v.Disconnect()
|
||||
v.WaitForDisconnect()
|
||||
}
|
||||
s.pendingPeers = make(map[string]*peer.Peer)
|
||||
|
||||
for _, v := range s.handshakePendingPeers {
|
||||
logger.Printf("Disconnecting from peer %s", v.Addr())
|
||||
v.Disconnect()
|
||||
v.WaitForDisconnect()
|
||||
for _, v := range s.livePeers {
|
||||
s.logger.Printf("Disconnecting from peer %s", v.Addr())
|
||||
v.Disconnect()
|
||||
v.WaitForDisconnect()
|
||||
}
|
||||
s.livePeers = make(map[string]*peer.Peer)
|
||||
}
|
||||
s.handshakePendingPeers = make(map[string]*peer.Peer)
|
||||
|
||||
for _, v := range s.livePeers {
|
||||
logger.Printf("Disconnecting from peer %s", v.Addr())
|
||||
v.Disconnect()
|
||||
v.WaitForDisconnect()
|
||||
}
|
||||
s.livePeers = make(map[string]*peer.Peer)
|
||||
|
||||
s.peerState.Unlock()
|
||||
}
|
||||
|
|
|
@ -1,38 +1,115 @@
|
|||
package zcash
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/peer"
|
||||
"github.com/btcsuite/btclog"
|
||||
"github.com/gtank/coredns-zcash/zcash/network"
|
||||
)
|
||||
|
||||
func TestOutboundPeer(t *testing.T) {
|
||||
regSeeder, err := NewSeeder(network.Regtest)
|
||||
func mockLocalPeer(ctx context.Context) error {
|
||||
// Configure peer to act as a regtest node that offers no services.
|
||||
config, err := newSeederPeerConfig(network.Regtest, defaultPeerConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
config.AllowSelfConns = true
|
||||
|
||||
backendLogger := btclog.NewBackend(os.Stdout)
|
||||
mockPeerLogger := backendLogger.Logger("mockPeer")
|
||||
//mockPeerLogger.SetLevel(btclog.LevelTrace)
|
||||
peer.UseLogger(mockPeerLogger)
|
||||
|
||||
mockPeer := peer.NewInboundPeer(config)
|
||||
|
||||
listenAddr := net.JoinHostPort("127.0.0.1", config.ChainParams.DefaultPort)
|
||||
listener, err := net.Listen("tcp", listenAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
conn, err := listener.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
mockPeer.AssociateConnection(conn)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
mockPeer.Disconnect()
|
||||
mockPeer.WaitForDisconnect()
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestOutboundPeerSync(t *testing.T) {
|
||||
testContext, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
if err := mockLocalPeer(testContext); err != nil {
|
||||
t.Logf("error starting mock peer (%v).", err)
|
||||
}
|
||||
|
||||
regSeeder, err := newTestSeeder(network.Regtest)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = regSeeder.ConnectToPeer("127.0.0.1")
|
||||
err = regSeeder.ConnectToPeer("127.0.0.1")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Can we address that peer if we want to?
|
||||
p, err := regSeeder.GetPeer("127.0.0.1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if p.Connected() {
|
||||
regSeeder.DisconnectAllPeers()
|
||||
} else {
|
||||
t.Error("Peer never connected")
|
||||
}
|
||||
|
||||
// Can we STILL address a flushed peer?
|
||||
p, err = regSeeder.GetPeer("127.0.0.1")
|
||||
if err == nil {
|
||||
t.Error("Peer should have been cleared on disconnect")
|
||||
}
|
||||
regSeeder.GracefulDisconnect()
|
||||
}
|
||||
|
||||
func TestOutboundPeerAsync(t *testing.T) {
|
||||
regSeeder, err := NewSeeder(network.Regtest)
|
||||
testContext, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
if err := mockLocalPeer(testContext); err != nil {
|
||||
t.Logf("error starting mock peer (%v).", err)
|
||||
}
|
||||
|
||||
regSeeder, err := newTestSeeder(network.Regtest)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
_, err := regSeeder.ConnectToPeer("127.0.0.1")
|
||||
err := regSeeder.ConnectToPeer("127.0.0.1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
regSeeder.GracefulDisconnect()
|
||||
regSeeder.DisconnectAllPeers()
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
|
|
Loading…
Reference in New Issue