Add locking & ping/heartbeat packets to SocketPV

This commit is contained in:
Hendrik Hofstadt 2018-10-07 16:17:06 +02:00
parent cf779809a7
commit 2e4cae1fdb
2 changed files with 143 additions and 22 deletions

View File

@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net"
"sync"
"time"
"github.com/tendermint/go-amino"
@ -18,11 +19,11 @@ import (
)
const (
defaultAcceptDeadlineSeconds = 30 // tendermint waits this long for remote val to connect
defaultConnDeadlineSeconds = 3 // must be set before each read
defaultConnHeartBeatSeconds = 30 // tcp keep-alive period
defaultConnWaitSeconds = 60 // XXX: is this redundant with the accept deadline?
defaultDialRetries = 10 // try to connect to tendermint this many times
defaultAcceptDeadlineSeconds = 3
defaultConnDeadlineSeconds = 3
defaultConnHeartBeatSeconds = 2
defaultConnWaitSeconds = 60
defaultDialRetries = 10
)
// Socket errors.
@ -33,6 +34,12 @@ var (
ErrUnexpectedResponse = errors.New("received unexpected response")
)
var (
acceptDeadline = time.Second * defaultAcceptDeadlineSeconds
connDeadline = time.Second * defaultConnDeadlineSeconds
connHeartbeat = time.Second * defaultConnHeartBeatSeconds
)
// SocketPVOption sets an optional parameter on the SocketPV.
type SocketPVOption func(*SocketPV)
@ -72,8 +79,10 @@ type SocketPV struct {
connWaitTimeout time.Duration
privKey ed25519.PrivKeyEd25519
conn net.Conn
listener net.Listener
conn net.Conn
listener net.Listener
lock sync.Mutex
cancelPing chan bool
}
// Check that SocketPV implements PrivValidator.
@ -87,9 +96,9 @@ func NewSocketPV(
) *SocketPV {
sc := &SocketPV{
addr: socketAddr,
acceptDeadline: time.Second * defaultAcceptDeadlineSeconds,
connDeadline: time.Second * defaultConnDeadlineSeconds,
connHeartbeat: time.Second * defaultConnHeartBeatSeconds,
acceptDeadline: acceptDeadline,
connDeadline: connDeadline,
connHeartbeat: connHeartbeat,
connWaitTimeout: time.Second * defaultConnWaitSeconds,
privKey: privKey,
}
@ -130,6 +139,9 @@ func (sc *SocketPV) GetPubKey() crypto.PubKey {
}
func (sc *SocketPV) getPubKey() (crypto.PubKey, error) {
sc.lock.Lock()
defer sc.lock.Unlock()
err := writeMsg(sc.conn, &PubKeyMsg{})
if err != nil {
return nil, err
@ -145,6 +157,9 @@ func (sc *SocketPV) getPubKey() (crypto.PubKey, error) {
// SignVote implements PrivValidator.
func (sc *SocketPV) SignVote(chainID string, vote *types.Vote) error {
sc.lock.Lock()
defer sc.lock.Unlock()
err := writeMsg(sc.conn, &SignVoteRequest{Vote: vote})
if err != nil {
return err
@ -174,6 +189,9 @@ func (sc *SocketPV) SignProposal(
chainID string,
proposal *types.Proposal,
) error {
sc.lock.Lock()
defer sc.lock.Unlock()
err := writeMsg(sc.conn, &SignProposalRequest{Proposal: proposal})
if err != nil {
return err
@ -202,6 +220,9 @@ func (sc *SocketPV) SignHeartbeat(
chainID string,
heartbeat *types.Heartbeat,
) error {
sc.lock.Lock()
defer sc.lock.Unlock()
err := writeMsg(sc.conn, &SignHeartbeatRequest{Heartbeat: heartbeat})
if err != nil {
return err
@ -225,6 +246,28 @@ func (sc *SocketPV) SignHeartbeat(
return nil
}
// Ping is used to check connection health.
func (sc *SocketPV) Ping() error {
sc.lock.Lock()
defer sc.lock.Unlock()
err := writeMsg(sc.conn, &PingRequest{})
if err != nil {
return err
}
res, err := readMsg(sc.conn)
if err != nil {
return err
}
_, ok := res.(*PingResponse)
if !ok {
return ErrUnexpectedResponse
}
return nil
}
// OnStart implements cmn.Service.
func (sc *SocketPV) OnStart() error {
if err := sc.listen(); err != nil {
@ -247,6 +290,25 @@ func (sc *SocketPV) OnStart() error {
return err
}
// Start a routine to keep the connection alive
sc.cancelPing = make(chan bool, 1)
go func() {
for {
select {
case <-time.Tick(sc.connHeartbeat):
err := sc.Ping()
if err != nil {
sc.Logger.Error(
"Ping",
"err", err,
)
}
case <-sc.cancelPing:
return
}
}
}()
sc.conn = conn
return nil
@ -254,6 +316,13 @@ func (sc *SocketPV) OnStart() error {
// OnStop implements cmn.Service.
func (sc *SocketPV) OnStop() {
if sc.cancelPing != nil {
select {
case sc.cancelPing <- true:
default:
}
}
if sc.conn != nil {
if err := sc.conn.Close(); err != nil {
err = cmn.ErrorWrap(err, "failed to close connection")
@ -435,7 +504,7 @@ func (rs *RemoteSigner) connect() (net.Conn, error) {
continue
}
if err := conn.SetDeadline(time.Now().Add(time.Second * defaultConnDeadlineSeconds)); err != nil {
if err := conn.SetDeadline(time.Now().Add(connDeadline)); err != nil {
err = cmn.ErrorWrap(err, "setting connection timeout failed")
rs.Logger.Error(
"connect",
@ -467,6 +536,9 @@ func (rs *RemoteSigner) handleConnection(conn net.Conn) {
return // Ignore error from listener closing.
}
// Reset the connection deadline
conn.SetDeadline(time.Now().Add(rs.connDeadline))
req, err := readMsg(conn)
if err != nil {
if err != io.EOF {
@ -503,6 +575,8 @@ func (rs *RemoteSigner) handleConnection(conn net.Conn) {
} else {
res = &SignedHeartbeatResponse{r.Heartbeat, nil}
}
case *PingRequest:
res = &PingResponse{}
default:
err = fmt.Errorf("unknown msg: %v", r)
}
@ -534,6 +608,8 @@ func RegisterSocketPVMsg(cdc *amino.Codec) {
cdc.RegisterConcrete(&SignedProposalResponse{}, "tendermint/socketpv/SignedProposalResponse", nil)
cdc.RegisterConcrete(&SignHeartbeatRequest{}, "tendermint/socketpv/SignHeartbeatRequest", nil)
cdc.RegisterConcrete(&SignedHeartbeatResponse{}, "tendermint/socketpv/SignedHeartbeatResponse", nil)
cdc.RegisterConcrete(&PingRequest{}, "tendermint/socketpv/PingRequest", nil)
cdc.RegisterConcrete(&PingResponse{}, "tendermint/socketpv/PingResponse", nil)
}
// PubKeyMsg is a PrivValidatorSocket message containing the public key.
@ -572,6 +648,13 @@ type SignedHeartbeatResponse struct {
Error *RemoteSignerError
}
// PingRequest is a PrivValidatorSocket message to keep the connection alive.
type PingRequest struct {
}
type PingResponse struct {
}
// RemoteSignerError allows (remote) validators to include meaningful error descriptions in their reply.
type RemoteSignerError struct {
// TODO(ismail): create an enum of known errors
@ -581,14 +664,6 @@ type RemoteSignerError struct {
func readMsg(r io.Reader) (msg SocketPVMsg, err error) {
const maxSocketPVMsgSize = 1024 * 10
// set deadline before trying to read
conn := r.(net.Conn)
if err := conn.SetDeadline(time.Now().Add(time.Second * defaultConnDeadlineSeconds)); err != nil {
err = cmn.ErrorWrap(err, "setting connection timeout failed in readMsg")
return msg, err
}
_, err = cdc.UnmarshalBinaryReader(r, &msg, maxSocketPVMsgSize)
if _, ok := err.(timeoutError); ok {
err = cmn.ErrorWrap(ErrConnTimeout, err.Error())

View File

@ -91,6 +91,53 @@ func TestSocketPVVote(t *testing.T) {
assert.Equal(t, want.Signature, have.Signature)
}
func TestSocketPVVoteResetDeadline(t *testing.T) {
var (
chainID = cmn.RandStr(12)
sc, rs = testSetupSocketPair(t, chainID, types.NewMockPV())
ts = time.Now()
vType = types.VoteTypePrecommit
want = &types.Vote{Timestamp: ts, Type: vType}
have = &types.Vote{Timestamp: ts, Type: vType}
)
defer sc.Stop()
defer rs.Stop()
time.Sleep(800 * time.Microsecond)
require.NoError(t, rs.privVal.SignVote(chainID, want))
require.NoError(t, sc.SignVote(chainID, have))
assert.Equal(t, want.Signature, have.Signature)
// This would exceed the deadline if it was not extended by the previous message
time.Sleep(800 * time.Microsecond)
require.NoError(t, rs.privVal.SignVote(chainID, want))
require.NoError(t, sc.SignVote(chainID, have))
assert.Equal(t, want.Signature, have.Signature)
}
func TestSocketPVVoteKeepalive(t *testing.T) {
var (
chainID = cmn.RandStr(12)
sc, rs = testSetupSocketPair(t, chainID, types.NewMockPV())
ts = time.Now()
vType = types.VoteTypePrecommit
want = &types.Vote{Timestamp: ts, Type: vType}
have = &types.Vote{Timestamp: ts, Type: vType}
)
defer sc.Stop()
defer rs.Stop()
time.Sleep(2 * time.Millisecond)
require.NoError(t, rs.privVal.SignVote(chainID, want))
require.NoError(t, sc.SignVote(chainID, have))
assert.Equal(t, want.Signature, have.Signature)
}
func TestSocketPVHeartbeat(t *testing.T) {
var (
chainID = cmn.RandStr(12)
@ -161,9 +208,6 @@ func TestSocketPVDeadline(t *testing.T) {
<-listenc
// Sleep to guarantee deadline has been hit.
time.Sleep(20 * time.Microsecond)
_, err := sc.getPubKey()
assert.Equal(t, err.(cmn.Error).Data(), ErrConnTimeout)
}
@ -405,6 +449,8 @@ func testSetupSocketPair(
testStartSocketPV(t, readyc, sc)
SocketPVConnDeadline(time.Millisecond)(sc)
SocketPVHeartbeat(500 * time.Microsecond)(sc)
RemoteSignerConnDeadline(time.Millisecond)(rs)
RemoteSignerConnRetries(1e6)(rs)