Format and consolidate

This commit is contained in:
Alexander Simmerl 2018-01-31 14:48:56 +01:00 committed by Ethan Buchman
parent 18f7e52562
commit 32d9563a15
2 changed files with 77 additions and 83 deletions

View File

@ -1,17 +1,18 @@
package types package types
import ( import (
"bytes"
"fmt" "fmt"
"io" "io"
"net" "net"
"time" "time"
"github.com/pkg/errors"
crypto "github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire" wire "github.com/tendermint/go-wire"
"github.com/tendermint/go-wire/data" "github.com/tendermint/go-wire/data"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
"golang.org/x/net/netutil"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -60,14 +61,21 @@ func (pvsc *PrivValidatorSocketClient) OnStart() error {
return err return err
} }
var err error var (
var conn net.Conn err error
conn net.Conn
)
RETRY_LOOP: RETRY_LOOP:
for { for {
conn, err = cmn.Connect(pvsc.SocketAddress) conn, err = cmn.Connect(pvsc.SocketAddress)
if err != nil { if err != nil {
pvsc.Logger.Error(fmt.Sprintf("PrivValidatorSocket failed to connect to %v. Retrying...", pvsc.SocketAddress)) pvsc.Logger.Error(
"OnStart",
"addr", pvsc.SocketAddress,
"err", errors.Wrap(err, "connection failed"),
)
time.Sleep(time.Second * dialRetryIntervalSeconds) time.Sleep(time.Second * dialRetryIntervalSeconds)
continue RETRY_LOOP continue RETRY_LOOP
} }
@ -75,7 +83,10 @@ RETRY_LOOP:
if pvsc.privKey != nil { if pvsc.privKey != nil {
conn, err = p2p.MakeSecretConnection(conn, *pvsc.privKey) conn, err = p2p.MakeSecretConnection(conn, *pvsc.privKey)
if err != nil { if err != nil {
pvsc.Logger.Error("failed to encrypt connection: " + err.Error()) pvsc.Logger.Error(
"OnStart",
"err", errors.Wrap(err, "encrypting connection failed"),
)
continue RETRY_LOOP continue RETRY_LOOP
} }
} }
@ -96,8 +107,7 @@ func (pvsc *PrivValidatorSocketClient) OnStop() {
// Address is an alias for PubKey().Address(). // Address is an alias for PubKey().Address().
func (pvsc *PrivValidatorSocketClient) Address() data.Bytes { func (pvsc *PrivValidatorSocketClient) Address() data.Bytes {
pubKey := pvsc.PubKey() return pvsc.PubKey().Address()
return pubKey.Address()
} }
// PubKey implements PrivValidator. // PubKey implements PrivValidator.
@ -186,10 +196,10 @@ type PrivValidatorSocketServer struct {
// PrivValidatorSocketServer. // PrivValidatorSocketServer.
func NewPrivValidatorSocketServer( func NewPrivValidatorSocketServer(
logger log.Logger, logger log.Logger,
socketAddr, chainID string, chainID, socketAddr string,
maxConnections int,
privVal PrivValidator, privVal PrivValidator,
privKey *crypto.PrivKeyEd25519, privKey *crypto.PrivKeyEd25519,
maxConnections int,
) *PrivValidatorSocketServer { ) *PrivValidatorSocketServer {
proto, addr := cmn.ProtocolAndAddress(socketAddr) proto, addr := cmn.ProtocolAndAddress(socketAddr)
pvss := &PrivValidatorSocketServer{ pvss := &PrivValidatorSocketServer{
@ -211,10 +221,9 @@ func (pvss *PrivValidatorSocketServer) OnStart() error {
return err return err
} }
// pvss.listener = netutil.LimitListener(ln, pvss.maxConnections) pvss.listener = netutil.LimitListener(ln, pvss.maxConnections)
pvss.listener = ln
go pvss.acceptConnectionsRoutine() go pvss.acceptConnections()
return nil return nil
} }
@ -226,30 +235,39 @@ func (pvss *PrivValidatorSocketServer) OnStop() {
} }
if err := pvss.listener.Close(); err != nil { if err := pvss.listener.Close(); err != nil {
pvss.Logger.Error("Error closing listener", "err", err) pvss.Logger.Error("OnStop", "err", errors.Wrap(err, "closing listener failed"))
} }
} }
func (pvss *PrivValidatorSocketServer) acceptConnectionsRoutine() { func (pvss *PrivValidatorSocketServer) acceptConnections() {
for { for {
conn, err := pvss.listener.Accept() conn, err := pvss.listener.Accept()
if err != nil { if err != nil {
if !pvss.IsRunning() { if !pvss.IsRunning() {
return // Ignore error from listener closing. return // Ignore error from listener closing.
} }
pvss.Logger.Error("Failed to accept connection: " + err.Error()) pvss.Logger.Error(
"accpetConnections",
"err", errors.Wrap(err, "failed to accept connection"),
)
continue continue
} }
if err := conn.SetDeadline(time.Now().Add(time.Second)); err != nil { if err := conn.SetDeadline(time.Now().Add(time.Second)); err != nil {
pvss.Logger.Error("failed to set timeout for ocnnection: " + err.Error()) pvss.Logger.Error(
"acceptConnetions",
"err", errors.Wrap(err, "setting connection timeout failed"),
)
continue continue
} }
if pvss.privKey != nil { if pvss.privKey != nil {
conn, err = p2p.MakeSecretConnection(conn, *pvss.privKey) conn, err = p2p.MakeSecretConnection(conn, *pvss.privKey)
if err != nil { if err != nil {
pvss.Logger.Error("Failed to make secret connection: " + err.Error()) pvss.Logger.Error(
"acceptConnections",
"err", errors.Wrap(err, "secret connection failed"),
)
continue continue
} }
} }
@ -269,7 +287,7 @@ func (pvss *PrivValidatorSocketServer) handleConnection(conn net.Conn) {
req, err := readMsg(conn) req, err := readMsg(conn)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
pvss.Logger.Error("readMsg", "err", err) pvss.Logger.Error("handleConnection", "err", err)
} }
return return
} }
@ -280,31 +298,20 @@ func (pvss *PrivValidatorSocketServer) handleConnection(conn net.Conn) {
case *PubKeyMsg: case *PubKeyMsg:
res = &PubKeyMsg{pvss.privVal.PubKey()} res = &PubKeyMsg{pvss.privVal.PubKey()}
case *SignVoteMsg: case *SignVoteMsg:
err := pvss.privVal.SignVote(pvss.chainID, r.Vote) err = pvss.privVal.SignVote(pvss.chainID, r.Vote)
if err != nil {
pvss.Logger.Error("handleConnection", "err", err)
return
}
res = &SignVoteMsg{r.Vote} res = &SignVoteMsg{r.Vote}
case *SignProposalMsg: case *SignProposalMsg:
err := pvss.privVal.SignProposal(pvss.chainID, r.Proposal) err = pvss.privVal.SignProposal(pvss.chainID, r.Proposal)
if err != nil {
pvss.Logger.Error("handleConnection", "err", err)
return
}
res = &SignProposalMsg{r.Proposal} res = &SignProposalMsg{r.Proposal}
case *SignHeartbeatMsg: case *SignHeartbeatMsg:
err := pvss.privVal.SignHeartbeat(pvss.chainID, r.Heartbeat) err = pvss.privVal.SignHeartbeat(pvss.chainID, r.Heartbeat)
if err != nil {
pvss.Logger.Error("handleConnection", "err", err)
return
}
res = &SignHeartbeatMsg{r.Heartbeat} res = &SignHeartbeatMsg{r.Heartbeat}
default: default:
pvss.Logger.Error("handleConnection", "err", fmt.Sprintf("unknown msg: %v", r)) err = fmt.Errorf("unknown msg: %v", r)
}
if err != nil {
pvss.Logger.Error("handleConnection", "err", err)
return return
} }
@ -337,50 +344,6 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&SignHeartbeatMsg{}, msgTypeSignHeartbeat}, wire.ConcreteType{&SignHeartbeatMsg{}, msgTypeSignHeartbeat},
) )
func readMsg(r io.Reader) (PrivValidatorSocketMsg, error) {
var (
n int
err error
)
read := wire.ReadBinary(struct{ PrivValidatorSocketMsg }{}, r, 0, &n, &err)
if err != nil {
return nil, err
}
w, ok := read.(struct{ PrivValidatorSocketMsg })
if !ok {
return nil, fmt.Errorf("unknown type")
}
return w.PrivValidatorSocketMsg, nil
}
func writeMsg(w io.Writer, msg interface{}) error {
var (
err error
n int
)
wire.WriteBinary(struct{ PrivValidatorSocketMsg }{msg}, w, &n, &err)
return err
}
func decodeMsg(bz []byte) (msg PrivValidatorSocketMsg, err error) {
var (
r = bytes.NewReader(bz)
n int
)
msgI := wire.ReadBinary(struct{ PrivValidatorSocketMsg }{}, r, 0, &n, &err)
msg = msgI.(struct{ PrivValidatorSocketMsg }).PrivValidatorSocketMsg
return msg, err
}
// PubKeyMsg is a PrivValidatorSocket message containing the public key. // PubKeyMsg is a PrivValidatorSocket message containing the public key.
type PubKeyMsg struct { type PubKeyMsg struct {
PubKey crypto.PubKey PubKey crypto.PubKey
@ -400,3 +363,34 @@ type SignProposalMsg struct {
type SignHeartbeatMsg struct { type SignHeartbeatMsg struct {
Heartbeat *types.Heartbeat Heartbeat *types.Heartbeat
} }
func readMsg(r io.Reader) (PrivValidatorSocketMsg, error) {
var (
n int
err error
)
read := wire.ReadBinary(struct{ PrivValidatorSocketMsg }{}, r, 0, &n, &err)
if err != nil {
return nil, err
}
w, ok := read.(struct{ PrivValidatorSocketMsg })
if !ok {
return nil, errors.New("unknwon type")
}
return w.PrivValidatorSocketMsg, nil
}
func writeMsg(w io.Writer, msg interface{}) error {
var (
err error
n int
)
// TODO(xla): This extra wrap should be gone with the sdk-2 update.
wire.WriteBinary(struct{ PrivValidatorSocketMsg }{msg}, w, &n, &err)
return err
}

View File

@ -23,11 +23,11 @@ func TestPrivValidatorSocketServer(t *testing.T) {
privVal = NewTestPrivValidator(signer) privVal = NewTestPrivValidator(signer)
pvss = NewPrivValidatorSocketServer( pvss = NewPrivValidatorSocketServer(
logger, logger,
"127.0.0.1:0",
chainID, chainID,
"127.0.0.1:0",
1,
privVal, privVal,
&serverPrivKey, &serverPrivKey,
1,
) )
) )