tendermint/p2p/transport_test.go

637 lines
12 KiB
Go
Raw Normal View History

p2p: Implement PeerTransport This is the implementation for the design described in ADR 12[0]. It's the first step of a larger refactor of the p2p package as tracked in interface bundling all concerns of low-level connection handling and isolating the rest of peer lifecycle management from the specifics of the low-level internet protocols. Even if the swappable implementation will never be utilised, already the isolation of conn related code in one place will help with the reasoning about execution path and addressation of security sensitive issues surfaced through bounty programs and audits. We deliberately decided to not have Peer filtering and other management in the Transport, its sole responsibility is the translation of connections to Peers, handing those to the caller fully setup. It's the responsibility of the caller to reject those and or keep track. Peer filtering will take place in the Switch and can be inspected in a the following commit. This changeset additionally is an exercise in clean separation of logic and other infrastructural concerns like logging and instrumentation. By leveraging a clean and minimal interface. How this looks can be seen in a follow-up change. Design #2069[2] Refs #2067[3] Fixes #2047[4] Fixes #2046[5] changes: * describe Transport interface * implement new default Transport: MultiplexTransport * test MultiplexTransport with new constraints * implement ConnSet for concurrent management of net.Conn, synchronous to PeerSet * implement and expose duplicate IP filter * implemnt TransportOption for optional parametirisation [0] https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-012-peer-transport.md [1] https://github.com/tendermint/tendermint/issues/2067 [2] https://github.com/tendermint/tendermint/pull/2069 [3] https://github.com/tendermint/tendermint/issues/2067 [4] https://github.com/tendermint/tendermint/issues/2047 [5] https://github.com/tendermint/tendermint/issues/2046
2018-09-18 13:11:54 -07:00
package p2p
import (
"fmt"
"math/rand"
"net"
"reflect"
"testing"
"time"
"github.com/tendermint/tendermint/crypto/ed25519"
)
func TestTransportMultiplexConnFilter(t *testing.T) {
mt := NewMultiplexTransport(
NodeInfo{},
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
)
MultiplexTransportConnFilters(
func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil },
func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil },
func(_ ConnSet, _ net.Conn, _ []net.IP) error {
return fmt.Errorf("rejected")
},
)(mt)
addr, err := NewNetAddressStringWithOptionalID("127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}
errc := make(chan error)
go func() {
addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
if err != nil {
errc <- err
return
}
_, err = addr.Dial()
if err != nil {
errc <- err
return
}
close(errc)
}()
if err := <-errc; err != nil {
t.Errorf("connection failed: %v", err)
}
_, err = mt.Accept(peerConfig{})
if err, ok := err.(ErrRejected); ok {
if !err.IsFiltered() {
t.Errorf("expected peer to be filtered")
}
} else {
t.Errorf("expected ErrRejected")
}
}
func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
mt := NewMultiplexTransport(
NodeInfo{},
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
)
MultiplexTransportFilterTimeout(5 * time.Millisecond)(mt)
MultiplexTransportConnFilters(
func(_ ConnSet, _ net.Conn, _ []net.IP) error {
time.Sleep(10 * time.Millisecond)
return nil
},
)(mt)
addr, err := NewNetAddressStringWithOptionalID("127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}
errc := make(chan error)
go func() {
addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
if err != nil {
errc <- err
return
}
_, err = addr.Dial()
if err != nil {
errc <- err
return
}
close(errc)
}()
if err := <-errc; err != nil {
t.Errorf("connection failed: %v", err)
}
_, err = mt.Accept(peerConfig{})
if _, ok := err.(ErrFilterTimeout); !ok {
t.Errorf("expected ErrFilterTimeout")
}
}
func TestTransportMultiplexAcceptMultiple(t *testing.T) {
mt := testSetupMultiplexTransport(t)
var (
seed = rand.New(rand.NewSource(time.Now().UnixNano()))
errc = make(chan error, seed.Intn(64)+64)
)
// Setup dialers.
for i := 0; i < cap(errc); i++ {
go func() {
var (
pv = ed25519.GenPrivKey()
dialer = NewMultiplexTransport(
NodeInfo{
ID: PubKeyToID(pv.PubKey()),
ListenAddr: "127.0.0.1:0",
Moniker: "dialer",
Version: "1.0.0",
},
NodeKey{
PrivKey: pv,
},
)
)
addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
if err != nil {
errc <- err
return
}
_, err = dialer.Dial(*addr, peerConfig{})
if err != nil {
errc <- err
return
}
// Signal that the connection was established.
errc <- nil
}()
}
// Catch connection errors.
for i := 0; i < cap(errc); i++ {
if err := <-errc; err != nil {
t.Fatal(err)
}
}
ps := []Peer{}
// Accept all peers.
for i := 0; i < cap(errc); i++ {
p, err := mt.Accept(peerConfig{})
if err != nil {
t.Fatal(err)
}
if err := p.Start(); err != nil {
t.Fatal(err)
}
ps = append(ps, p)
}
if have, want := len(ps), cap(errc); have != want {
t.Errorf("have %v, want %v", have, want)
}
// Stop all peers.
for _, p := range ps {
if err := p.Stop(); err != nil {
t.Fatal(err)
}
}
if err := mt.Close(); err != nil {
t.Errorf("close errored: %v", err)
}
}
func TestTransportMultiplexAcceptNonBlocking(t *testing.T) {
mt := testSetupMultiplexTransport(t)
var (
fastNodePV = ed25519.GenPrivKey()
fastNodeInfo = NodeInfo{
ID: PubKeyToID(fastNodePV.PubKey()),
ListenAddr: "127.0.0.1:0",
Moniker: "fastNode",
Version: "1.0.0",
}
errc = make(chan error)
fastc = make(chan struct{})
slowc = make(chan struct{})
)
// Simulate slow Peer.
go func() {
addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
if err != nil {
errc <- err
return
}
c, err := addr.Dial()
if err != nil {
errc <- err
return
}
close(slowc)
select {
case <-fastc:
// Fast peer connected.
case <-time.After(50 * time.Millisecond):
// We error if the fast peer didn't succeed.
errc <- fmt.Errorf("Fast peer timed out")
}
sc, err := upgradeSecretConn(c, 20*time.Millisecond, ed25519.GenPrivKey())
if err != nil {
errc <- err
return
}
_, err = handshake(sc, 20*time.Millisecond, NodeInfo{
ID: PubKeyToID(ed25519.GenPrivKey().PubKey()),
ListenAddr: "127.0.0.1:0",
Moniker: "slow_peer",
})
if err != nil {
errc <- err
return
}
}()
// Simulate fast Peer.
go func() {
<-slowc
var (
dialer = NewMultiplexTransport(
fastNodeInfo,
NodeKey{
PrivKey: fastNodePV,
},
)
)
addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
if err != nil {
errc <- err
return
}
_, err = dialer.Dial(*addr, peerConfig{})
if err != nil {
errc <- err
return
}
close(errc)
close(fastc)
}()
if err := <-errc; err != nil {
t.Errorf("connection failed: %v", err)
}
p, err := mt.Accept(peerConfig{})
if err != nil {
t.Fatal(err)
}
if have, want := p.NodeInfo(), fastNodeInfo; !reflect.DeepEqual(have, want) {
t.Errorf("have %v, want %v", have, want)
}
}
func TestTransportMultiplexValidateNodeInfo(t *testing.T) {
mt := testSetupMultiplexTransport(t)
errc := make(chan error)
go func() {
var (
pv = ed25519.GenPrivKey()
dialer = NewMultiplexTransport(
NodeInfo{
ID: PubKeyToID(pv.PubKey()),
ListenAddr: "127.0.0.1:0",
Moniker: "", // Should not be empty.
Version: "1.0.0",
},
NodeKey{
PrivKey: pv,
},
)
)
addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
if err != nil {
errc <- err
return
}
_, err = dialer.Dial(*addr, peerConfig{})
if err != nil {
errc <- err
return
}
close(errc)
}()
if err := <-errc; err != nil {
t.Errorf("connection failed: %v", err)
}
_, err := mt.Accept(peerConfig{})
if err, ok := err.(ErrRejected); ok {
if !err.IsNodeInfoInvalid() {
t.Errorf("expected NodeInfo to be invalid")
}
} else {
t.Errorf("expected ErrRejected")
}
}
func TestTransportMultiplexRejectMissmatchID(t *testing.T) {
mt := testSetupMultiplexTransport(t)
errc := make(chan error)
go func() {
dialer := NewMultiplexTransport(
NodeInfo{
ID: PubKeyToID(ed25519.GenPrivKey().PubKey()),
ListenAddr: "127.0.0.1:0",
Moniker: "dialer",
Version: "1.0.0",
},
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
)
addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
if err != nil {
errc <- err
return
}
_, err = dialer.Dial(*addr, peerConfig{})
if err != nil {
errc <- err
return
}
close(errc)
}()
if err := <-errc; err != nil {
t.Errorf("connection failed: %v", err)
}
_, err := mt.Accept(peerConfig{})
if err, ok := err.(ErrRejected); ok {
if !err.IsAuthFailure() {
t.Errorf("expected auth failure")
}
} else {
t.Errorf("expected ErrRejected")
}
}
func TestTransportMultiplexRejectIncompatible(t *testing.T) {
mt := testSetupMultiplexTransport(t)
errc := make(chan error)
go func() {
var (
pv = ed25519.GenPrivKey()
dialer = NewMultiplexTransport(
NodeInfo{
ID: PubKeyToID(pv.PubKey()),
ListenAddr: "127.0.0.1:0",
Moniker: "dialer",
Version: "2.0.0",
},
NodeKey{
PrivKey: pv,
},
)
)
addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
if err != nil {
errc <- err
return
}
_, err = dialer.Dial(*addr, peerConfig{})
if err != nil {
errc <- err
return
}
close(errc)
}()
_, err := mt.Accept(peerConfig{})
if err, ok := err.(ErrRejected); ok {
if !err.IsIncompatible() {
t.Errorf("expected to reject incompatible")
}
} else {
t.Errorf("expected ErrRejected")
}
}
func TestTransportMultiplexRejectSelf(t *testing.T) {
mt := testSetupMultiplexTransport(t)
errc := make(chan error)
go func() {
addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String())
if err != nil {
errc <- err
return
}
_, err = mt.Dial(*addr, peerConfig{})
if err != nil {
errc <- err
return
}
close(errc)
}()
if err := <-errc; err != nil {
if err, ok := err.(ErrRejected); ok {
if !err.IsSelf() {
t.Errorf("expected to reject self")
}
} else {
t.Errorf("expected ErrRejected")
}
} else {
t.Errorf("expected connection failure")
}
_, err := mt.Accept(peerConfig{})
if err, ok := err.(ErrRejected); ok {
if !err.IsSelf() {
t.Errorf("expected to reject self")
}
} else {
t.Errorf("expected ErrRejected")
}
}
func TestTransportConnDuplicateIPFilter(t *testing.T) {
filter := ConnDuplicateIPFilter()
if err := filter(nil, &testTransportConn{}, nil); err != nil {
t.Fatal(err)
}
var (
c = &testTransportConn{}
cs = NewConnSet()
)
cs.Set(c, []net.IP{
net.IP{10, 0, 10, 1},
net.IP{10, 0, 10, 2},
net.IP{10, 0, 10, 3},
})
if err := filter(cs, c, []net.IP{
net.IP{10, 0, 10, 2},
}); err == nil {
t.Errorf("expected Peer to be rejected as duplicate")
}
}
func TestTransportHandshake(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
var (
peerPV = ed25519.GenPrivKey()
peerNodeInfo = NodeInfo{
ID: PubKeyToID(peerPV.PubKey()),
}
)
go func() {
c, err := net.Dial(ln.Addr().Network(), ln.Addr().String())
if err != nil {
t.Error(err)
return
}
go func(c net.Conn) {
_, err := cdc.MarshalBinaryWriter(c, peerNodeInfo)
if err != nil {
t.Error(err)
}
}(c)
go func(c net.Conn) {
ni := NodeInfo{}
_, err := cdc.UnmarshalBinaryReader(
c,
&ni,
int64(MaxNodeInfoSize()),
)
if err != nil {
t.Error(err)
}
}(c)
}()
c, err := ln.Accept()
if err != nil {
t.Fatal(err)
}
ni, err := handshake(c, 20*time.Millisecond, NodeInfo{})
if err != nil {
t.Fatal(err)
}
if have, want := ni, peerNodeInfo; !reflect.DeepEqual(have, want) {
t.Errorf("have %v, want %v", have, want)
}
}
func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport {
var (
pv = ed25519.GenPrivKey()
mt = NewMultiplexTransport(
NodeInfo{
ID: PubKeyToID(pv.PubKey()),
ListenAddr: "127.0.0.1:0",
Moniker: "transport",
Version: "1.0.0",
},
NodeKey{
PrivKey: pv,
},
)
)
addr, err := NewNetAddressStringWithOptionalID("127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}
return mt
}
type testTransportAddr struct{}
func (a *testTransportAddr) Network() string { return "tcp" }
func (a *testTransportAddr) String() string { return "test.local:1234" }
type testTransportConn struct{}
func (c *testTransportConn) Close() error {
return fmt.Errorf("Close() not implemented")
}
func (c *testTransportConn) LocalAddr() net.Addr {
return &testTransportAddr{}
}
func (c *testTransportConn) RemoteAddr() net.Addr {
return &testTransportAddr{}
}
func (c *testTransportConn) Read(_ []byte) (int, error) {
return -1, fmt.Errorf("Read() not implemented")
}
func (c *testTransportConn) SetDeadline(_ time.Time) error {
return fmt.Errorf("SetDeadline() not implemented")
}
func (c *testTransportConn) SetReadDeadline(_ time.Time) error {
return fmt.Errorf("SetReadDeadline() not implemented")
}
func (c *testTransportConn) SetWriteDeadline(_ time.Time) error {
return fmt.Errorf("SetWriteDeadline() not implemented")
}
func (c *testTransportConn) Write(_ []byte) (int, error) {
return -1, fmt.Errorf("Write() not implemented")
}