tendermint/p2p/switch_test.go

344 lines
8.8 KiB
Go
Raw Normal View History

2015-10-25 18:21:51 -07:00
package p2p
import (
"bytes"
2016-08-09 23:33:38 -07:00
"fmt"
2016-06-25 18:59:52 -07:00
"net"
2015-10-25 18:21:51 -07:00
"sync"
"testing"
"time"
2017-04-07 03:57:03 -07:00
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
2017-11-27 13:48:15 -08:00
2017-04-07 03:57:03 -07:00
crypto "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire"
2017-11-27 13:48:15 -08:00
"github.com/tendermint/tmlibs/log"
2017-05-04 19:33:08 -07:00
cfg "github.com/tendermint/tendermint/config"
2015-10-25 18:21:51 -07:00
)
2016-05-11 21:08:41 -07:00
var (
2017-05-04 19:33:08 -07:00
config *cfg.P2PConfig
2016-05-11 21:08:41 -07:00
)
func init() {
2017-05-04 19:33:08 -07:00
config = cfg.DefaultP2PConfig()
2017-05-01 19:05:26 -07:00
config.PexReactor = true
2016-05-11 21:08:41 -07:00
}
2015-10-25 18:21:51 -07:00
type PeerMessage struct {
2018-01-01 18:27:38 -08:00
PeerID ID
2015-10-25 18:21:51 -07:00
Bytes []byte
Counter int
}
type TestReactor struct {
BaseReactor
mtx sync.Mutex
channels []*ChannelDescriptor
2017-09-12 17:49:22 -07:00
peersAdded []Peer
peersRemoved []Peer
2015-10-25 18:21:51 -07:00
logMessages bool
msgsCounter int
msgsReceived map[byte][]PeerMessage
}
func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
tr := &TestReactor{
channels: channels,
logMessages: logMessages,
msgsReceived: make(map[byte][]PeerMessage),
}
2017-05-02 00:53:32 -07:00
tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
tr.SetLogger(log.TestingLogger())
2015-10-25 18:21:51 -07:00
return tr
}
func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
return tr.channels
}
2017-09-12 17:49:22 -07:00
func (tr *TestReactor) AddPeer(peer Peer) {
2015-10-25 18:21:51 -07:00
tr.mtx.Lock()
defer tr.mtx.Unlock()
tr.peersAdded = append(tr.peersAdded, peer)
}
2017-09-12 17:49:22 -07:00
func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {
2015-10-25 18:21:51 -07:00
tr.mtx.Lock()
defer tr.mtx.Unlock()
tr.peersRemoved = append(tr.peersRemoved, peer)
}
2017-09-12 17:49:22 -07:00
func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
2015-10-25 18:21:51 -07:00
if tr.logMessages {
tr.mtx.Lock()
defer tr.mtx.Unlock()
//fmt.Printf("Received: %X, %X\n", chID, msgBytes)
2018-01-01 18:27:38 -08:00
tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
2015-10-25 18:21:51 -07:00
tr.msgsCounter++
}
}
2016-06-21 11:35:29 -07:00
func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
tr.mtx.Lock()
defer tr.mtx.Unlock()
return tr.msgsReceived[chID]
}
2015-10-25 18:21:51 -07:00
//-----------------------------------------------------------------------------
// convenience method for creating two switches connected to each other.
2016-06-25 18:59:52 -07:00
// XXX: note this uses net.Pipe and not a proper TCP conn
func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
2015-10-25 18:21:51 -07:00
// Create two switches that will be interconnected.
2017-05-01 19:05:26 -07:00
switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches)
2016-06-25 18:59:52 -07:00
return switches[0], switches[1]
2015-10-25 18:21:51 -07:00
}
2016-08-09 23:33:38 -07:00
func initSwitchFunc(i int, sw *Switch) *Switch {
// Make two reactors of two channels each
sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
{ID: byte(0x00), Priority: 10},
{ID: byte(0x01), Priority: 10},
2016-08-09 23:33:38 -07:00
}, true))
sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
{ID: byte(0x02), Priority: 10},
{ID: byte(0x03), Priority: 10},
2016-08-09 23:33:38 -07:00
}, true))
return sw
}
2015-10-25 18:21:51 -07:00
func TestSwitches(t *testing.T) {
2016-08-09 23:33:38 -07:00
s1, s2 := makeSwitchPair(t, initSwitchFunc)
2015-10-25 18:21:51 -07:00
defer s1.Stop()
defer s2.Stop()
if s1.Peers().Size() != 1 {
t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
}
if s2.Peers().Size() != 1 {
t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
}
2016-08-09 23:33:38 -07:00
// Lets send some messages
2015-10-25 18:21:51 -07:00
ch0Msg := "channel zero"
ch1Msg := "channel foo"
ch2Msg := "channel bar"
s1.Broadcast(byte(0x00), ch0Msg)
s1.Broadcast(byte(0x01), ch1Msg)
s1.Broadcast(byte(0x02), ch2Msg)
assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
}
2015-10-25 18:21:51 -07:00
func assertMsgReceivedWithTimeout(t *testing.T, msg string, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) {
ticker := time.NewTicker(checkPeriod)
2017-11-07 15:08:45 -08:00
for {
select {
case <-ticker.C:
msgs := reactor.getMsgs(channel)
if len(msgs) > 0 {
if !bytes.Equal(msgs[0].Bytes, wire.BinaryBytes(msg)) {
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(msg), msgs[0].Bytes)
}
return
}
2017-11-07 15:08:45 -08:00
case <-time.After(timeout):
t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel)
}
2015-10-25 18:21:51 -07:00
}
}
2016-08-09 23:33:38 -07:00
func TestConnAddrFilter(t *testing.T) {
2017-05-01 19:05:26 -07:00
s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
defer s1.Stop()
defer s2.Stop()
2016-08-09 23:33:38 -07:00
c1, c2 := netPipe()
2016-08-09 23:33:38 -07:00
s1.SetAddrFilter(func(addr net.Addr) error {
if addr.String() == c1.RemoteAddr().String() {
return fmt.Errorf("Error: pipe is blacklisted")
}
return nil
})
// connect to good peer
2017-04-07 03:57:03 -07:00
go func() {
2017-11-27 13:48:15 -08:00
err := s1.addPeerWithConnection(c1)
assert.NotNil(t, err, "expected err")
2017-04-07 03:57:03 -07:00
}()
go func() {
2017-11-27 13:48:15 -08:00
err := s2.addPeerWithConnection(c2)
assert.NotNil(t, err, "expected err")
2017-04-07 03:57:03 -07:00
}()
2016-08-09 23:33:38 -07:00
2017-11-07 15:08:45 -08:00
assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
}
2016-08-09 23:33:38 -07:00
2017-11-07 15:08:45 -08:00
func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) {
time.Sleep(timeout)
if sw.Peers().Size() != 0 {
t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size())
2016-08-09 23:33:38 -07:00
}
}
func TestConnPubKeyFilter(t *testing.T) {
2017-05-01 19:05:26 -07:00
s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
defer s1.Stop()
defer s2.Stop()
2016-08-09 23:33:38 -07:00
c1, c2 := netPipe()
2016-08-09 23:33:38 -07:00
// set pubkey filter
2017-12-28 22:53:41 -08:00
s1.SetPubKeyFilter(func(pubkey crypto.PubKey) error {
2016-08-09 23:33:38 -07:00
if bytes.Equal(pubkey.Bytes(), s2.nodeInfo.PubKey.Bytes()) {
return fmt.Errorf("Error: pipe is blacklisted")
}
return nil
})
// connect to good peer
2017-04-07 03:57:03 -07:00
go func() {
2017-11-27 13:48:15 -08:00
err := s1.addPeerWithConnection(c1)
assert.NotNil(t, err, "expected error")
2017-04-07 03:57:03 -07:00
}()
go func() {
2017-11-27 13:48:15 -08:00
err := s2.addPeerWithConnection(c2)
assert.NotNil(t, err, "expected error")
2017-04-07 03:57:03 -07:00
}()
2016-08-09 23:33:38 -07:00
2017-11-07 15:08:45 -08:00
assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
2016-08-09 23:33:38 -07:00
}
2017-04-07 03:57:03 -07:00
func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
assert, require := assert.New(t), require.New(t)
2017-05-01 19:05:26 -07:00
sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
2017-09-21 09:38:48 -07:00
if err != nil {
t.Error(err)
}
2017-04-07 03:57:03 -07:00
defer sw.Stop()
2017-04-11 08:47:05 -07:00
// simulate remote peer
2017-12-28 22:53:41 -08:00
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519().Wrap(), Config: DefaultPeerConfig()}
2017-04-11 08:47:05 -07:00
rp.Start()
defer rp.Stop()
2017-04-07 03:57:03 -07:00
2018-01-13 21:44:16 -08:00
peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig(), false)
2017-04-07 03:57:03 -07:00
require.Nil(err)
2017-09-21 12:33:22 -07:00
err = sw.addPeer(peer)
2017-04-07 03:57:03 -07:00
require.Nil(err)
// simulate failure by closing connection
peer.CloseConn()
2017-11-07 15:08:45 -08:00
assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond)
2017-04-07 03:57:03 -07:00
assert.False(peer.IsRunning())
}
func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
2017-04-07 03:57:03 -07:00
assert, require := assert.New(t), require.New(t)
2017-05-01 19:05:26 -07:00
sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
2017-09-21 09:38:48 -07:00
if err != nil {
t.Error(err)
}
2017-04-07 03:57:03 -07:00
defer sw.Stop()
2017-04-11 08:47:05 -07:00
// simulate remote peer
2017-12-28 22:53:41 -08:00
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519().Wrap(), Config: DefaultPeerConfig()}
2017-04-11 08:47:05 -07:00
rp.Start()
defer rp.Stop()
2017-04-07 03:57:03 -07:00
2018-01-13 21:44:16 -08:00
peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig(), true)
2017-04-07 03:57:03 -07:00
require.Nil(err)
2017-09-21 12:33:22 -07:00
err = sw.addPeer(peer)
2017-04-07 03:57:03 -07:00
require.Nil(err)
// simulate failure by closing connection
peer.CloseConn()
// TODO: remove sleep, detect the disconnection, wait for reconnect
2017-11-14 14:31:23 -08:00
npeers := sw.Peers().Size()
for i := 0; i < 20; i++ {
time.Sleep(250 * time.Millisecond)
2017-11-14 14:31:23 -08:00
npeers = sw.Peers().Size()
if npeers > 0 {
break
}
}
assert.NotZero(npeers)
2017-04-07 03:57:03 -07:00
assert.False(peer.IsRunning())
}
func TestSwitchFullConnectivity(t *testing.T) {
switches := MakeConnectedSwitches(config, 3, initSwitchFunc, Connect2Switches)
defer func() {
for _, sw := range switches {
sw.Stop()
}
}()
for i, sw := range switches {
if sw.Peers().Size() != 2 {
t.Fatalf("Expected each switch to be connected to 2 other, but %d switch only connected to %d", sw.Peers().Size(), i)
}
}
}
2015-10-25 18:21:51 -07:00
func BenchmarkSwitches(b *testing.B) {
b.StopTimer()
2016-06-25 18:59:52 -07:00
s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch {
2015-10-25 18:21:51 -07:00
// Make bar reactors of bar channels each
sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
{ID: byte(0x00), Priority: 10},
{ID: byte(0x01), Priority: 10},
2015-10-25 18:21:51 -07:00
}, false))
sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
{ID: byte(0x02), Priority: 10},
{ID: byte(0x03), Priority: 10},
2015-10-25 18:21:51 -07:00
}, false))
return sw
})
defer s1.Stop()
defer s2.Stop()
// Allow time for goroutines to boot up
time.Sleep(1 * time.Second)
2015-10-25 18:21:51 -07:00
b.StartTimer()
numSuccess, numFailure := 0, 0
// Send random message from foo channel to another
for i := 0; i < b.N; i++ {
chID := byte(i % 4)
successChan := s1.Broadcast(chID, "test data")
for s := range successChan {
if s {
2017-04-07 03:57:03 -07:00
numSuccess++
2015-10-25 18:21:51 -07:00
} else {
2017-04-07 03:57:03 -07:00
numFailure++
2015-10-25 18:21:51 -07:00
}
}
}
2017-05-02 00:53:32 -07:00
b.Logf("success: %v, failure: %v", numSuccess, numFailure)
2015-10-25 18:21:51 -07:00
// Allow everything to flush before stopping switches & closing connections.
b.StopTimer()
}