From 2e4cae1fdb3984cb91aa40993a5cd519d81a712d Mon Sep 17 00:00:00 2001 From: Hendrik Hofstadt Date: Sun, 7 Oct 2018 16:17:06 +0200 Subject: [PATCH] Add locking & ping/heartbeat packets to SocketPV --- privval/socket.go | 113 ++++++++++++++++++++++++++++++++++------- privval/socket_test.go | 52 +++++++++++++++++-- 2 files changed, 143 insertions(+), 22 deletions(-) diff --git a/privval/socket.go b/privval/socket.go index 64d4c46d..cbe339ba 100644 --- a/privval/socket.go +++ b/privval/socket.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net" + "sync" "time" "github.com/tendermint/go-amino" @@ -18,11 +19,11 @@ import ( ) const ( - defaultAcceptDeadlineSeconds = 30 // tendermint waits this long for remote val to connect - defaultConnDeadlineSeconds = 3 // must be set before each read - defaultConnHeartBeatSeconds = 30 // tcp keep-alive period - defaultConnWaitSeconds = 60 // XXX: is this redundant with the accept deadline? - defaultDialRetries = 10 // try to connect to tendermint this many times + defaultAcceptDeadlineSeconds = 3 + defaultConnDeadlineSeconds = 3 + defaultConnHeartBeatSeconds = 2 + defaultConnWaitSeconds = 60 + defaultDialRetries = 10 ) // Socket errors. @@ -33,6 +34,12 @@ var ( ErrUnexpectedResponse = errors.New("received unexpected response") ) +var ( + acceptDeadline = time.Second * defaultAcceptDeadlineSeconds + connDeadline = time.Second * defaultConnDeadlineSeconds + connHeartbeat = time.Second * defaultConnHeartBeatSeconds +) + // SocketPVOption sets an optional parameter on the SocketPV. type SocketPVOption func(*SocketPV) @@ -72,8 +79,10 @@ type SocketPV struct { connWaitTimeout time.Duration privKey ed25519.PrivKeyEd25519 - conn net.Conn - listener net.Listener + conn net.Conn + listener net.Listener + lock sync.Mutex + cancelPing chan bool } // Check that SocketPV implements PrivValidator. @@ -87,9 +96,9 @@ func NewSocketPV( ) *SocketPV { sc := &SocketPV{ addr: socketAddr, - acceptDeadline: time.Second * defaultAcceptDeadlineSeconds, - connDeadline: time.Second * defaultConnDeadlineSeconds, - connHeartbeat: time.Second * defaultConnHeartBeatSeconds, + acceptDeadline: acceptDeadline, + connDeadline: connDeadline, + connHeartbeat: connHeartbeat, connWaitTimeout: time.Second * defaultConnWaitSeconds, privKey: privKey, } @@ -130,6 +139,9 @@ func (sc *SocketPV) GetPubKey() crypto.PubKey { } func (sc *SocketPV) getPubKey() (crypto.PubKey, error) { + sc.lock.Lock() + defer sc.lock.Unlock() + err := writeMsg(sc.conn, &PubKeyMsg{}) if err != nil { return nil, err @@ -145,6 +157,9 @@ func (sc *SocketPV) getPubKey() (crypto.PubKey, error) { // SignVote implements PrivValidator. func (sc *SocketPV) SignVote(chainID string, vote *types.Vote) error { + sc.lock.Lock() + defer sc.lock.Unlock() + err := writeMsg(sc.conn, &SignVoteRequest{Vote: vote}) if err != nil { return err @@ -174,6 +189,9 @@ func (sc *SocketPV) SignProposal( chainID string, proposal *types.Proposal, ) error { + sc.lock.Lock() + defer sc.lock.Unlock() + err := writeMsg(sc.conn, &SignProposalRequest{Proposal: proposal}) if err != nil { return err @@ -202,6 +220,9 @@ func (sc *SocketPV) SignHeartbeat( chainID string, heartbeat *types.Heartbeat, ) error { + sc.lock.Lock() + defer sc.lock.Unlock() + err := writeMsg(sc.conn, &SignHeartbeatRequest{Heartbeat: heartbeat}) if err != nil { return err @@ -225,6 +246,28 @@ func (sc *SocketPV) SignHeartbeat( return nil } +// Ping is used to check connection health. +func (sc *SocketPV) Ping() error { + sc.lock.Lock() + defer sc.lock.Unlock() + + err := writeMsg(sc.conn, &PingRequest{}) + if err != nil { + return err + } + + res, err := readMsg(sc.conn) + if err != nil { + return err + } + _, ok := res.(*PingResponse) + if !ok { + return ErrUnexpectedResponse + } + + return nil +} + // OnStart implements cmn.Service. func (sc *SocketPV) OnStart() error { if err := sc.listen(); err != nil { @@ -247,6 +290,25 @@ func (sc *SocketPV) OnStart() error { return err } + // Start a routine to keep the connection alive + sc.cancelPing = make(chan bool, 1) + go func() { + for { + select { + case <-time.Tick(sc.connHeartbeat): + err := sc.Ping() + if err != nil { + sc.Logger.Error( + "Ping", + "err", err, + ) + } + case <-sc.cancelPing: + return + } + } + }() + sc.conn = conn return nil @@ -254,6 +316,13 @@ func (sc *SocketPV) OnStart() error { // OnStop implements cmn.Service. func (sc *SocketPV) OnStop() { + if sc.cancelPing != nil { + select { + case sc.cancelPing <- true: + default: + } + } + if sc.conn != nil { if err := sc.conn.Close(); err != nil { err = cmn.ErrorWrap(err, "failed to close connection") @@ -435,7 +504,7 @@ func (rs *RemoteSigner) connect() (net.Conn, error) { continue } - if err := conn.SetDeadline(time.Now().Add(time.Second * defaultConnDeadlineSeconds)); err != nil { + if err := conn.SetDeadline(time.Now().Add(connDeadline)); err != nil { err = cmn.ErrorWrap(err, "setting connection timeout failed") rs.Logger.Error( "connect", @@ -467,6 +536,9 @@ func (rs *RemoteSigner) handleConnection(conn net.Conn) { return // Ignore error from listener closing. } + // Reset the connection deadline + conn.SetDeadline(time.Now().Add(rs.connDeadline)) + req, err := readMsg(conn) if err != nil { if err != io.EOF { @@ -503,6 +575,8 @@ func (rs *RemoteSigner) handleConnection(conn net.Conn) { } else { res = &SignedHeartbeatResponse{r.Heartbeat, nil} } + case *PingRequest: + res = &PingResponse{} default: err = fmt.Errorf("unknown msg: %v", r) } @@ -534,6 +608,8 @@ func RegisterSocketPVMsg(cdc *amino.Codec) { cdc.RegisterConcrete(&SignedProposalResponse{}, "tendermint/socketpv/SignedProposalResponse", nil) cdc.RegisterConcrete(&SignHeartbeatRequest{}, "tendermint/socketpv/SignHeartbeatRequest", nil) cdc.RegisterConcrete(&SignedHeartbeatResponse{}, "tendermint/socketpv/SignedHeartbeatResponse", nil) + cdc.RegisterConcrete(&PingRequest{}, "tendermint/socketpv/PingRequest", nil) + cdc.RegisterConcrete(&PingResponse{}, "tendermint/socketpv/PingResponse", nil) } // PubKeyMsg is a PrivValidatorSocket message containing the public key. @@ -572,6 +648,13 @@ type SignedHeartbeatResponse struct { Error *RemoteSignerError } +// PingRequest is a PrivValidatorSocket message to keep the connection alive. +type PingRequest struct { +} + +type PingResponse struct { +} + // RemoteSignerError allows (remote) validators to include meaningful error descriptions in their reply. type RemoteSignerError struct { // TODO(ismail): create an enum of known errors @@ -581,14 +664,6 @@ type RemoteSignerError struct { func readMsg(r io.Reader) (msg SocketPVMsg, err error) { const maxSocketPVMsgSize = 1024 * 10 - - // set deadline before trying to read - conn := r.(net.Conn) - if err := conn.SetDeadline(time.Now().Add(time.Second * defaultConnDeadlineSeconds)); err != nil { - err = cmn.ErrorWrap(err, "setting connection timeout failed in readMsg") - return msg, err - } - _, err = cdc.UnmarshalBinaryReader(r, &msg, maxSocketPVMsgSize) if _, ok := err.(timeoutError); ok { err = cmn.ErrorWrap(ErrConnTimeout, err.Error()) diff --git a/privval/socket_test.go b/privval/socket_test.go index aa2e15fa..26288b71 100644 --- a/privval/socket_test.go +++ b/privval/socket_test.go @@ -91,6 +91,53 @@ func TestSocketPVVote(t *testing.T) { assert.Equal(t, want.Signature, have.Signature) } +func TestSocketPVVoteResetDeadline(t *testing.T) { + var ( + chainID = cmn.RandStr(12) + sc, rs = testSetupSocketPair(t, chainID, types.NewMockPV()) + + ts = time.Now() + vType = types.VoteTypePrecommit + want = &types.Vote{Timestamp: ts, Type: vType} + have = &types.Vote{Timestamp: ts, Type: vType} + ) + defer sc.Stop() + defer rs.Stop() + + time.Sleep(800 * time.Microsecond) + + require.NoError(t, rs.privVal.SignVote(chainID, want)) + require.NoError(t, sc.SignVote(chainID, have)) + assert.Equal(t, want.Signature, have.Signature) + + // This would exceed the deadline if it was not extended by the previous message + time.Sleep(800 * time.Microsecond) + + require.NoError(t, rs.privVal.SignVote(chainID, want)) + require.NoError(t, sc.SignVote(chainID, have)) + assert.Equal(t, want.Signature, have.Signature) +} + +func TestSocketPVVoteKeepalive(t *testing.T) { + var ( + chainID = cmn.RandStr(12) + sc, rs = testSetupSocketPair(t, chainID, types.NewMockPV()) + + ts = time.Now() + vType = types.VoteTypePrecommit + want = &types.Vote{Timestamp: ts, Type: vType} + have = &types.Vote{Timestamp: ts, Type: vType} + ) + defer sc.Stop() + defer rs.Stop() + + time.Sleep(2 * time.Millisecond) + + require.NoError(t, rs.privVal.SignVote(chainID, want)) + require.NoError(t, sc.SignVote(chainID, have)) + assert.Equal(t, want.Signature, have.Signature) +} + func TestSocketPVHeartbeat(t *testing.T) { var ( chainID = cmn.RandStr(12) @@ -161,9 +208,6 @@ func TestSocketPVDeadline(t *testing.T) { <-listenc - // Sleep to guarantee deadline has been hit. - time.Sleep(20 * time.Microsecond) - _, err := sc.getPubKey() assert.Equal(t, err.(cmn.Error).Data(), ErrConnTimeout) } @@ -405,6 +449,8 @@ func testSetupSocketPair( testStartSocketPV(t, readyc, sc) + SocketPVConnDeadline(time.Millisecond)(sc) + SocketPVHeartbeat(500 * time.Microsecond)(sc) RemoteSignerConnDeadline(time.Millisecond)(rs) RemoteSignerConnRetries(1e6)(rs)