diff --git a/types/priv_validator/socket.go b/types/priv_validator/socket.go index 35cfe4ef..26030346 100644 --- a/types/priv_validator/socket.go +++ b/types/priv_validator/socket.go @@ -1,17 +1,18 @@ package types import ( - "bytes" "fmt" "io" "net" "time" + "github.com/pkg/errors" crypto "github.com/tendermint/go-crypto" wire "github.com/tendermint/go-wire" "github.com/tendermint/go-wire/data" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" + "golang.org/x/net/netutil" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" @@ -60,14 +61,21 @@ func (pvsc *PrivValidatorSocketClient) OnStart() error { return err } - var err error - var conn net.Conn + var ( + err error + conn net.Conn + ) RETRY_LOOP: for { conn, err = cmn.Connect(pvsc.SocketAddress) if err != nil { - pvsc.Logger.Error(fmt.Sprintf("PrivValidatorSocket failed to connect to %v. Retrying...", pvsc.SocketAddress)) + pvsc.Logger.Error( + "OnStart", + "addr", pvsc.SocketAddress, + "err", errors.Wrap(err, "connection failed"), + ) + time.Sleep(time.Second * dialRetryIntervalSeconds) continue RETRY_LOOP } @@ -75,7 +83,10 @@ RETRY_LOOP: if pvsc.privKey != nil { conn, err = p2p.MakeSecretConnection(conn, *pvsc.privKey) if err != nil { - pvsc.Logger.Error("failed to encrypt connection: " + err.Error()) + pvsc.Logger.Error( + "OnStart", + "err", errors.Wrap(err, "encrypting connection failed"), + ) continue RETRY_LOOP } } @@ -96,8 +107,7 @@ func (pvsc *PrivValidatorSocketClient) OnStop() { // Address is an alias for PubKey().Address(). func (pvsc *PrivValidatorSocketClient) Address() data.Bytes { - pubKey := pvsc.PubKey() - return pubKey.Address() + return pvsc.PubKey().Address() } // PubKey implements PrivValidator. @@ -186,10 +196,10 @@ type PrivValidatorSocketServer struct { // PrivValidatorSocketServer. func NewPrivValidatorSocketServer( logger log.Logger, - socketAddr, chainID string, + chainID, socketAddr string, + maxConnections int, privVal PrivValidator, privKey *crypto.PrivKeyEd25519, - maxConnections int, ) *PrivValidatorSocketServer { proto, addr := cmn.ProtocolAndAddress(socketAddr) pvss := &PrivValidatorSocketServer{ @@ -211,10 +221,9 @@ func (pvss *PrivValidatorSocketServer) OnStart() error { return err } - // pvss.listener = netutil.LimitListener(ln, pvss.maxConnections) - pvss.listener = ln + pvss.listener = netutil.LimitListener(ln, pvss.maxConnections) - go pvss.acceptConnectionsRoutine() + go pvss.acceptConnections() return nil } @@ -226,30 +235,39 @@ func (pvss *PrivValidatorSocketServer) OnStop() { } if err := pvss.listener.Close(); err != nil { - pvss.Logger.Error("Error closing listener", "err", err) + pvss.Logger.Error("OnStop", "err", errors.Wrap(err, "closing listener failed")) } } -func (pvss *PrivValidatorSocketServer) acceptConnectionsRoutine() { +func (pvss *PrivValidatorSocketServer) acceptConnections() { for { conn, err := pvss.listener.Accept() if err != nil { if !pvss.IsRunning() { return // Ignore error from listener closing. } - pvss.Logger.Error("Failed to accept connection: " + err.Error()) + pvss.Logger.Error( + "accpetConnections", + "err", errors.Wrap(err, "failed to accept connection"), + ) continue } if err := conn.SetDeadline(time.Now().Add(time.Second)); err != nil { - pvss.Logger.Error("failed to set timeout for ocnnection: " + err.Error()) + pvss.Logger.Error( + "acceptConnetions", + "err", errors.Wrap(err, "setting connection timeout failed"), + ) continue } if pvss.privKey != nil { conn, err = p2p.MakeSecretConnection(conn, *pvss.privKey) if err != nil { - pvss.Logger.Error("Failed to make secret connection: " + err.Error()) + pvss.Logger.Error( + "acceptConnections", + "err", errors.Wrap(err, "secret connection failed"), + ) continue } } @@ -269,7 +287,7 @@ func (pvss *PrivValidatorSocketServer) handleConnection(conn net.Conn) { req, err := readMsg(conn) if err != nil { if err != io.EOF { - pvss.Logger.Error("readMsg", "err", err) + pvss.Logger.Error("handleConnection", "err", err) } return } @@ -280,31 +298,20 @@ func (pvss *PrivValidatorSocketServer) handleConnection(conn net.Conn) { case *PubKeyMsg: res = &PubKeyMsg{pvss.privVal.PubKey()} case *SignVoteMsg: - err := pvss.privVal.SignVote(pvss.chainID, r.Vote) - if err != nil { - pvss.Logger.Error("handleConnection", "err", err) - return - } - + err = pvss.privVal.SignVote(pvss.chainID, r.Vote) res = &SignVoteMsg{r.Vote} case *SignProposalMsg: - err := pvss.privVal.SignProposal(pvss.chainID, r.Proposal) - if err != nil { - pvss.Logger.Error("handleConnection", "err", err) - return - } - + err = pvss.privVal.SignProposal(pvss.chainID, r.Proposal) res = &SignProposalMsg{r.Proposal} case *SignHeartbeatMsg: - err := pvss.privVal.SignHeartbeat(pvss.chainID, r.Heartbeat) - if err != nil { - pvss.Logger.Error("handleConnection", "err", err) - return - } - + err = pvss.privVal.SignHeartbeat(pvss.chainID, r.Heartbeat) res = &SignHeartbeatMsg{r.Heartbeat} default: - pvss.Logger.Error("handleConnection", "err", fmt.Sprintf("unknown msg: %v", r)) + err = fmt.Errorf("unknown msg: %v", r) + } + + if err != nil { + pvss.Logger.Error("handleConnection", "err", err) return } @@ -337,50 +344,6 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&SignHeartbeatMsg{}, msgTypeSignHeartbeat}, ) -func readMsg(r io.Reader) (PrivValidatorSocketMsg, error) { - var ( - n int - err error - ) - - read := wire.ReadBinary(struct{ PrivValidatorSocketMsg }{}, r, 0, &n, &err) - if err != nil { - return nil, err - } - - w, ok := read.(struct{ PrivValidatorSocketMsg }) - if !ok { - return nil, fmt.Errorf("unknown type") - } - - return w.PrivValidatorSocketMsg, nil -} - -func writeMsg(w io.Writer, msg interface{}) error { - var ( - err error - n int - ) - - wire.WriteBinary(struct{ PrivValidatorSocketMsg }{msg}, w, &n, &err) - - return err -} - -func decodeMsg(bz []byte) (msg PrivValidatorSocketMsg, err error) { - var ( - r = bytes.NewReader(bz) - - n int - ) - - msgI := wire.ReadBinary(struct{ PrivValidatorSocketMsg }{}, r, 0, &n, &err) - - msg = msgI.(struct{ PrivValidatorSocketMsg }).PrivValidatorSocketMsg - - return msg, err -} - // PubKeyMsg is a PrivValidatorSocket message containing the public key. type PubKeyMsg struct { PubKey crypto.PubKey @@ -400,3 +363,34 @@ type SignProposalMsg struct { type SignHeartbeatMsg struct { Heartbeat *types.Heartbeat } + +func readMsg(r io.Reader) (PrivValidatorSocketMsg, error) { + var ( + n int + err error + ) + + read := wire.ReadBinary(struct{ PrivValidatorSocketMsg }{}, r, 0, &n, &err) + if err != nil { + return nil, err + } + + w, ok := read.(struct{ PrivValidatorSocketMsg }) + if !ok { + return nil, errors.New("unknwon type") + } + + return w.PrivValidatorSocketMsg, nil +} + +func writeMsg(w io.Writer, msg interface{}) error { + var ( + err error + n int + ) + + // TODO(xla): This extra wrap should be gone with the sdk-2 update. + wire.WriteBinary(struct{ PrivValidatorSocketMsg }{msg}, w, &n, &err) + + return err +} diff --git a/types/priv_validator/socket_test.go b/types/priv_validator/socket_test.go index 16cc59c5..88e19009 100644 --- a/types/priv_validator/socket_test.go +++ b/types/priv_validator/socket_test.go @@ -23,11 +23,11 @@ func TestPrivValidatorSocketServer(t *testing.T) { privVal = NewTestPrivValidator(signer) pvss = NewPrivValidatorSocketServer( logger, - "127.0.0.1:0", chainID, + "127.0.0.1:0", + 1, privVal, &serverPrivKey, - 1, ) )