tendermint/p2p/conn/connection_test.go

495 lines
14 KiB
Go
Raw Normal View History

2018-01-20 21:33:53 -08:00
package conn
2017-04-04 12:13:09 -07:00
import (
2018-03-25 21:40:02 -07:00
"bytes"
2017-04-04 12:13:09 -07:00
"net"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
2018-03-20 18:27:10 -07:00
"github.com/tendermint/go-amino"
2017-05-02 00:53:32 -07:00
"github.com/tendermint/tmlibs/log"
2017-04-04 12:13:09 -07:00
)
func createTestMConnection(conn net.Conn) *MConnection {
2017-04-04 12:13:09 -07:00
onReceive := func(chID byte, msgBytes []byte) {
}
onError := func(r interface{}) {
}
2017-05-02 00:53:32 -07:00
c := createMConnectionWithCallbacks(conn, onReceive, onError)
c.SetLogger(log.TestingLogger())
return c
2017-04-04 12:13:09 -07:00
}
func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection {
2017-11-09 11:27:17 -08:00
cfg := DefaultMConnConfig()
cfg.PingInterval = 90 * time.Millisecond
2018-01-23 01:11:44 -08:00
cfg.PongTimeout = 45 * time.Millisecond
2018-02-09 09:58:02 -08:00
chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
2017-11-09 11:27:17 -08:00
c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg)
2017-05-02 00:53:32 -07:00
c.SetLogger(log.TestingLogger())
return c
2017-04-04 12:13:09 -07:00
}
func TestMConnectionSend(t *testing.T) {
2018-01-20 18:12:04 -08:00
server, client := NetPipe()
2017-10-03 15:49:20 -07:00
defer server.Close() // nolint: errcheck
defer client.Close() // nolint: errcheck
2017-04-04 12:13:09 -07:00
mconn := createTestMConnection(client)
err := mconn.Start()
2018-03-20 18:27:10 -07:00
require.Nil(t, err)
2017-04-04 12:13:09 -07:00
defer mconn.Stop()
2018-03-25 21:40:02 -07:00
msg := []byte("Ant-Man")
2018-03-20 18:27:10 -07:00
assert.True(t, mconn.Send(0x01, msg))
// Note: subsequent Send/TrySend calls could pass because we are reading from
// the send queue in a separate goroutine.
2017-09-21 09:38:48 -07:00
_, err = server.Read(make([]byte, len(msg)))
if err != nil {
t.Error(err)
}
2018-03-20 18:27:10 -07:00
assert.True(t, mconn.CanSend(0x01))
2017-04-04 12:13:09 -07:00
2018-03-25 21:40:02 -07:00
msg = []byte("Spider-Man")
2018-03-20 18:27:10 -07:00
assert.True(t, mconn.TrySend(0x01, msg))
2017-09-21 09:38:48 -07:00
_, err = server.Read(make([]byte, len(msg)))
if err != nil {
t.Error(err)
}
2017-04-14 03:21:58 -07:00
2018-03-20 18:27:10 -07:00
assert.False(t, mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
2018-03-25 21:40:02 -07:00
assert.False(t, mconn.Send(0x05, []byte("Absorbing Man")), "Send should return false because channel is unknown")
2017-04-04 12:13:09 -07:00
}
func TestMConnectionReceive(t *testing.T) {
2018-01-20 18:12:04 -08:00
server, client := NetPipe()
2017-10-03 15:49:20 -07:00
defer server.Close() // nolint: errcheck
defer client.Close() // nolint: errcheck
2017-04-04 12:13:09 -07:00
receivedCh := make(chan []byte)
errorsCh := make(chan interface{})
onReceive := func(chID byte, msgBytes []byte) {
receivedCh <- msgBytes
}
onError := func(r interface{}) {
errorsCh <- r
}
mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
err := mconn1.Start()
2018-03-20 18:27:10 -07:00
require.Nil(t, err)
2017-04-04 12:13:09 -07:00
defer mconn1.Stop()
mconn2 := createTestMConnection(server)
err = mconn2.Start()
2018-03-20 18:27:10 -07:00
require.Nil(t, err)
2017-04-04 12:13:09 -07:00
defer mconn2.Stop()
2018-03-25 21:40:02 -07:00
msg := []byte("Cyclops")
2018-03-20 18:27:10 -07:00
assert.True(t, mconn2.Send(0x01, msg))
2017-04-04 12:13:09 -07:00
select {
case receivedBytes := <-receivedCh:
2018-03-25 21:40:02 -07:00
assert.Equal(t, []byte(msg), receivedBytes)
2017-04-04 12:13:09 -07:00
case err := <-errorsCh:
t.Fatalf("Expected %s, got %+v", msg, err)
case <-time.After(500 * time.Millisecond):
t.Fatalf("Did not receive %s message in 500ms", msg)
}
}
func TestMConnectionStatus(t *testing.T) {
2018-01-20 18:12:04 -08:00
server, client := NetPipe()
2017-10-03 15:49:20 -07:00
defer server.Close() // nolint: errcheck
defer client.Close() // nolint: errcheck
2017-04-04 12:13:09 -07:00
mconn := createTestMConnection(client)
err := mconn.Start()
2018-03-20 18:27:10 -07:00
require.Nil(t, err)
2017-04-04 12:13:09 -07:00
defer mconn.Stop()
status := mconn.Status()
2018-03-20 18:27:10 -07:00
assert.NotNil(t, status)
assert.Zero(t, status.Channels[0].SendQueueSize)
2017-04-04 12:13:09 -07:00
}
2017-04-04 14:49:01 -07:00
2018-01-11 16:06:33 -08:00
func TestMConnectionPongTimeoutResultsInError(t *testing.T) {
2017-11-01 20:13:18 -07:00
server, client := net.Pipe()
defer server.Close()
defer client.Close()
receivedCh := make(chan []byte)
errorsCh := make(chan interface{})
onReceive := func(chID byte, msgBytes []byte) {
receivedCh <- msgBytes
}
onError := func(r interface{}) {
errorsCh <- r
}
mconn := createMConnectionWithCallbacks(client, onReceive, onError)
err := mconn.Start()
require.Nil(t, err)
2017-11-01 20:13:18 -07:00
defer mconn.Stop()
serverGotPing := make(chan struct{})
2018-01-10 18:24:31 -08:00
go func() {
// read ping
2018-03-20 18:27:10 -07:00
var pkt PacketPing
2018-04-06 13:46:40 -07:00
const maxPacketPingSize = 1024
_, err = cdc.UnmarshalBinaryReader(server, &pkt, maxPacketPingSize)
2018-03-20 18:27:10 -07:00
assert.Nil(t, err)
serverGotPing <- struct{}{}
2018-01-10 18:24:31 -08:00
}()
<-serverGotPing
2018-01-10 18:24:31 -08:00
pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
2017-11-01 20:13:18 -07:00
select {
case msgBytes := <-receivedCh:
t.Fatalf("Expected error, but got %v", msgBytes)
2017-11-01 20:13:18 -07:00
case err := <-errorsCh:
assert.NotNil(t, err)
case <-time.After(pongTimerExpired):
t.Fatalf("Expected to receive error after %v", pongTimerExpired)
}
}
func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) {
server, client := net.Pipe()
defer server.Close()
defer client.Close()
receivedCh := make(chan []byte)
errorsCh := make(chan interface{})
onReceive := func(chID byte, msgBytes []byte) {
receivedCh <- msgBytes
}
onError := func(r interface{}) {
errorsCh <- r
}
mconn := createMConnectionWithCallbacks(client, onReceive, onError)
err := mconn.Start()
require.Nil(t, err)
defer mconn.Stop()
// sending 3 pongs in a row (abuse)
2018-03-20 18:27:10 -07:00
_, err = server.Write(cdc.MustMarshalBinary(PacketPong{}))
require.Nil(t, err)
2018-03-20 18:27:10 -07:00
_, err = server.Write(cdc.MustMarshalBinary(PacketPong{}))
require.Nil(t, err)
2018-03-20 18:27:10 -07:00
_, err = server.Write(cdc.MustMarshalBinary(PacketPong{}))
require.Nil(t, err)
serverGotPing := make(chan struct{})
go func() {
// read ping (one byte)
2018-03-20 18:27:10 -07:00
var packet, err = Packet(nil), error(nil)
_, err = cdc.UnmarshalBinaryReader(server, &packet, 1024)
require.Nil(t, err)
serverGotPing <- struct{}{}
// respond with pong
2018-03-20 18:27:10 -07:00
_, err = server.Write(cdc.MustMarshalBinary(PacketPong{}))
require.Nil(t, err)
}()
<-serverGotPing
pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
select {
case msgBytes := <-receivedCh:
t.Fatalf("Expected no data, but got %v", msgBytes)
case err := <-errorsCh:
t.Fatalf("Expected no error, but got %v", err)
case <-time.After(pongTimerExpired):
assert.True(t, mconn.IsRunning())
}
}
func TestMConnectionMultiplePings(t *testing.T) {
server, client := net.Pipe()
defer server.Close()
defer client.Close()
receivedCh := make(chan []byte)
errorsCh := make(chan interface{})
onReceive := func(chID byte, msgBytes []byte) {
receivedCh <- msgBytes
}
onError := func(r interface{}) {
errorsCh <- r
}
mconn := createMConnectionWithCallbacks(client, onReceive, onError)
err := mconn.Start()
require.Nil(t, err)
defer mconn.Stop()
// sending 3 pings in a row (abuse)
2018-02-09 09:58:02 -08:00
// see https://github.com/tendermint/tendermint/issues/1190
2018-03-20 18:27:10 -07:00
_, err = server.Write(cdc.MustMarshalBinary(PacketPing{}))
require.Nil(t, err)
2018-03-20 18:27:10 -07:00
var pkt PacketPong
_, err = cdc.UnmarshalBinaryReader(server, &pkt, 1024)
require.Nil(t, err)
2018-03-20 18:27:10 -07:00
_, err = server.Write(cdc.MustMarshalBinary(PacketPing{}))
require.Nil(t, err)
2018-03-20 18:27:10 -07:00
_, err = cdc.UnmarshalBinaryReader(server, &pkt, 1024)
require.Nil(t, err)
2018-03-20 18:27:10 -07:00
_, err = server.Write(cdc.MustMarshalBinary(PacketPing{}))
require.Nil(t, err)
2018-03-20 18:27:10 -07:00
_, err = cdc.UnmarshalBinaryReader(server, &pkt, 1024)
require.Nil(t, err)
assert.True(t, mconn.IsRunning())
}
func TestMConnectionPingPongs(t *testing.T) {
server, client := net.Pipe()
defer server.Close()
defer client.Close()
receivedCh := make(chan []byte)
errorsCh := make(chan interface{})
onReceive := func(chID byte, msgBytes []byte) {
receivedCh <- msgBytes
}
onError := func(r interface{}) {
errorsCh <- r
}
mconn := createMConnectionWithCallbacks(client, onReceive, onError)
err := mconn.Start()
require.Nil(t, err)
defer mconn.Stop()
serverGotPing := make(chan struct{})
go func() {
// read ping
2018-03-20 18:27:10 -07:00
var pkt PacketPing
_, err = cdc.UnmarshalBinaryReader(server, &pkt, 1024)
require.Nil(t, err)
serverGotPing <- struct{}{}
// respond with pong
2018-03-20 18:27:10 -07:00
_, err = server.Write(cdc.MustMarshalBinary(PacketPong{}))
require.Nil(t, err)
time.Sleep(mconn.config.PingInterval)
// read ping
2018-03-20 18:27:10 -07:00
_, err = cdc.UnmarshalBinaryReader(server, &pkt, 1024)
require.Nil(t, err)
// respond with pong
2018-03-20 18:27:10 -07:00
_, err = server.Write(cdc.MustMarshalBinary(PacketPong{}))
require.Nil(t, err)
}()
<-serverGotPing
pongTimerExpired := (mconn.config.PongTimeout + 20*time.Millisecond) * 2
select {
case msgBytes := <-receivedCh:
t.Fatalf("Expected no data, but got %v", msgBytes)
case err := <-errorsCh:
t.Fatalf("Expected no error, but got %v", err)
case <-time.After(2 * pongTimerExpired):
assert.True(t, mconn.IsRunning())
2017-11-01 20:13:18 -07:00
}
}
2017-04-06 05:43:45 -07:00
func TestMConnectionStopsAndReturnsError(t *testing.T) {
2018-01-20 18:12:04 -08:00
server, client := NetPipe()
2017-10-03 15:49:20 -07:00
defer server.Close() // nolint: errcheck
defer client.Close() // nolint: errcheck
2017-04-04 14:49:01 -07:00
receivedCh := make(chan []byte)
errorsCh := make(chan interface{})
onReceive := func(chID byte, msgBytes []byte) {
receivedCh <- msgBytes
}
onError := func(r interface{}) {
errorsCh <- r
}
mconn := createMConnectionWithCallbacks(client, onReceive, onError)
err := mconn.Start()
2018-03-20 18:27:10 -07:00
require.Nil(t, err)
2017-04-04 14:49:01 -07:00
defer mconn.Stop()
2017-09-21 09:38:48 -07:00
if err := client.Close(); err != nil {
t.Error(err)
}
2017-04-04 14:49:01 -07:00
select {
case receivedBytes := <-receivedCh:
t.Fatalf("Expected error, got %v", receivedBytes)
case err := <-errorsCh:
2018-03-20 18:27:10 -07:00
assert.NotNil(t, err)
assert.False(t, mconn.IsRunning())
2017-04-04 14:49:01 -07:00
case <-time.After(500 * time.Millisecond):
t.Fatal("Did not receive error in 500ms")
}
}
2018-03-20 18:27:10 -07:00
func newClientAndServerConnsForReadErrors(t *testing.T, chOnErr chan struct{}) (*MConnection, *MConnection) {
2018-01-20 18:12:04 -08:00
server, client := NetPipe()
onReceive := func(chID byte, msgBytes []byte) {}
onError := func(r interface{}) {}
// create client conn with two channels
chDescs := []*ChannelDescriptor{
{ID: 0x01, Priority: 1, SendQueueCapacity: 1},
{ID: 0x02, Priority: 1, SendQueueCapacity: 1},
}
mconnClient := NewMConnection(client, chDescs, onReceive, onError)
mconnClient.SetLogger(log.TestingLogger().With("module", "client"))
err := mconnClient.Start()
2018-03-20 18:27:10 -07:00
require.Nil(t, err)
// create server conn with 1 channel
// it fires on chOnErr when there's an error
serverLogger := log.TestingLogger().With("module", "server")
onError = func(r interface{}) {
chOnErr <- struct{}{}
}
mconnServer := createMConnectionWithCallbacks(server, onReceive, onError)
mconnServer.SetLogger(serverLogger)
err = mconnServer.Start()
2018-03-20 18:27:10 -07:00
require.Nil(t, err)
return mconnClient, mconnServer
}
func expectSend(ch chan struct{}) bool {
after := time.After(time.Second * 5)
select {
case <-ch:
return true
case <-after:
return false
}
}
func TestMConnectionReadErrorBadEncoding(t *testing.T) {
chOnErr := make(chan struct{})
2018-03-20 18:27:10 -07:00
mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
defer mconnClient.Stop()
defer mconnServer.Stop()
client := mconnClient.conn
// send badly encoded msgPacket
2018-03-20 18:27:10 -07:00
bz := cdc.MustMarshalBinary(PacketMsg{})
bz[4] += 0x01 // Invalid prefix bytes.
// Write it.
_, err := client.Write(bz)
assert.Nil(t, err)
assert.True(t, expectSend(chOnErr), "badly encoded msgPacket")
}
func TestMConnectionReadErrorUnknownChannel(t *testing.T) {
chOnErr := make(chan struct{})
2018-03-20 18:27:10 -07:00
mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
defer mconnClient.Stop()
defer mconnServer.Stop()
2018-03-25 21:40:02 -07:00
msg := []byte("Ant-Man")
// fail to send msg on channel unknown by client
2018-03-20 18:27:10 -07:00
assert.False(t, mconnClient.Send(0x03, msg))
// send msg on channel unknown by the server.
// should cause an error
2018-03-20 18:27:10 -07:00
assert.True(t, mconnClient.Send(0x02, msg))
assert.True(t, expectSend(chOnErr), "unknown channel")
}
func TestMConnectionReadErrorLongMessage(t *testing.T) {
chOnErr := make(chan struct{})
chOnRcv := make(chan struct{})
2018-03-20 18:27:10 -07:00
mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
defer mconnClient.Stop()
defer mconnServer.Stop()
mconnServer.onReceive = func(chID byte, msgBytes []byte) {
chOnRcv <- struct{}{}
}
client := mconnClient.conn
// send msg thats just right
var err error
2018-03-25 21:40:02 -07:00
var buf = new(bytes.Buffer)
// - Uvarint length of MustMarshalBinary(packet) = 1 or 2 bytes
// (as long as it's less than 16,384 bytes)
// - Prefix bytes = 4 bytes
// - ChannelID field key + byte = 2 bytes
// - EOF field key + byte = 2 bytes
// - Bytes field key = 1 bytes
// - Uvarint length of MustMarshalBinary(bytes) = 1 or 2 bytes
// - Struct terminator = 1 byte
// = up to 14 bytes overhead for the packet.
2018-03-20 18:27:10 -07:00
var packet = PacketMsg{
ChannelID: 0x01,
EOF: 1,
2018-03-25 21:40:02 -07:00
Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize),
}
2018-03-25 21:40:02 -07:00
_, err = cdc.MarshalBinaryWriter(buf, packet)
assert.Nil(t, err)
_, err = client.Write(buf.Bytes())
2018-03-20 18:27:10 -07:00
assert.Nil(t, err)
assert.True(t, expectSend(chOnRcv), "msg just right")
2018-03-25 21:40:02 -07:00
assert.False(t, expectSend(chOnErr), "msg just right")
// send msg thats too long
2018-03-25 21:40:02 -07:00
buf = new(bytes.Buffer)
2018-03-20 18:27:10 -07:00
packet = PacketMsg{
ChannelID: 0x01,
EOF: 1,
2018-03-25 21:40:02 -07:00
Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize+1),
}
2018-03-25 21:40:02 -07:00
_, err = cdc.MarshalBinaryWriter(buf, packet)
2018-03-20 18:27:10 -07:00
assert.Nil(t, err)
2018-03-25 21:40:02 -07:00
_, err = client.Write(buf.Bytes())
assert.NotNil(t, err)
assert.False(t, expectSend(chOnRcv), "msg too long")
2018-03-20 18:27:10 -07:00
assert.True(t, expectSend(chOnErr), "msg too long")
}
func TestMConnectionReadErrorUnknownMsgType(t *testing.T) {
chOnErr := make(chan struct{})
2018-03-20 18:27:10 -07:00
mconnClient, mconnServer := newClientAndServerConnsForReadErrors(t, chOnErr)
defer mconnClient.Stop()
defer mconnServer.Stop()
// send msg with unknown msg type
2018-03-20 18:27:10 -07:00
err := error(nil)
err = amino.EncodeUvarint(mconnClient.conn, 4)
assert.Nil(t, err)
_, err = mconnClient.conn.Write([]byte{0xFF, 0xFF, 0xFF, 0xFF})
assert.Nil(t, err)
assert.True(t, expectSend(chOnErr), "unknown msg type")
}
func TestMConnectionTrySend(t *testing.T) {
2018-01-20 18:12:04 -08:00
server, client := NetPipe()
defer server.Close()
defer client.Close()
mconn := createTestMConnection(client)
err := mconn.Start()
2018-03-20 18:27:10 -07:00
require.Nil(t, err)
defer mconn.Stop()
2018-03-25 21:40:02 -07:00
msg := []byte("Semicolon-Woman")
resultCh := make(chan string, 2)
2018-03-20 18:27:10 -07:00
assert.True(t, mconn.TrySend(0x01, msg))
server.Read(make([]byte, len(msg)))
2018-03-20 18:27:10 -07:00
assert.True(t, mconn.CanSend(0x01))
assert.True(t, mconn.TrySend(0x01, msg))
assert.False(t, mconn.CanSend(0x01))
go func() {
mconn.TrySend(0x01, msg)
resultCh <- "TrySend"
}()
2018-03-20 18:27:10 -07:00
assert.False(t, mconn.CanSend(0x01))
assert.False(t, mconn.TrySend(0x01, msg))
assert.Equal(t, "TrySend", <-resultCh)
}