Merge pull request #1928 from tendermint/646-decode-msg

Unexport DecodeMessage
This commit is contained in:
Alexander Simmerl 2018-07-10 12:50:33 +02:00 committed by GitHub
commit ce33914f70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 31 additions and 44 deletions

View File

@ -5,12 +5,13 @@ import (
"reflect" "reflect"
"time" "time"
"github.com/tendermint/go-amino" amino "github.com/tendermint/go-amino"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
) )
const ( const (
@ -174,7 +175,7 @@ func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage,
// Receive implements Reactor by handling 4 types of messages (look below). // Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := DecodeMessage(msgBytes) msg, err := decodeMsg(msgBytes)
if err != nil { if err != nil {
bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
bcR.Switch.StopPeerForError(src, err) bcR.Switch.StopPeerForError(src, err)
@ -342,17 +343,11 @@ func RegisterBlockchainMessages(cdc *amino.Codec) {
cdc.RegisterConcrete(&bcStatusRequestMessage{}, "tendermint/mempool/StatusRequest", nil) cdc.RegisterConcrete(&bcStatusRequestMessage{}, "tendermint/mempool/StatusRequest", nil)
} }
// DecodeMessage decodes BlockchainMessage. func decodeMsg(bz []byte) (msg BlockchainMessage, err error) {
// TODO: ensure that bz is completely read.
func DecodeMessage(bz []byte) (msg BlockchainMessage, err error) {
if len(bz) > maxMsgSize { if len(bz) > maxMsgSize {
return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize)
len(bz), maxMsgSize)
} }
err = cdc.UnmarshalBinaryBare(bz, &msg) err = cdc.UnmarshalBinaryBare(bz, &msg)
if err != nil {
err = cmn.ErrorWrap(err, "DecodeMessage() had bytes left over")
}
return return
} }

View File

@ -9,11 +9,11 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
amino "github.com/tendermint/go-amino" amino "github.com/tendermint/go-amino"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
cstypes "github.com/tendermint/tendermint/consensus/types" cstypes "github.com/tendermint/tendermint/consensus/types"
cmn "github.com/tendermint/tendermint/libs/common"
tmevents "github.com/tendermint/tendermint/libs/events" tmevents "github.com/tendermint/tendermint/libs/events"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -184,7 +184,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
return return
} }
msg, err := DecodeMessage(msgBytes) msg, err := decodeMsg(msgBytes)
if err != nil { if err != nil {
conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
conR.Switch.StopPeerForError(src, err) conR.Switch.StopPeerForError(src, err)
@ -1307,11 +1307,9 @@ func RegisterConsensusMessages(cdc *amino.Codec) {
cdc.RegisterConcrete(&ProposalHeartbeatMessage{}, "tendermint/ProposalHeartbeat", nil) cdc.RegisterConcrete(&ProposalHeartbeatMessage{}, "tendermint/ProposalHeartbeat", nil)
} }
// DecodeMessage decodes the given bytes into a ConsensusMessage. func decodeMsg(bz []byte) (msg ConsensusMessage, err error) {
func DecodeMessage(bz []byte) (msg ConsensusMessage, err error) {
if len(bz) > maxMsgSize { if len(bz) > maxMsgSize {
return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize)
len(bz), maxMsgSize)
} }
err = cdc.UnmarshalBinaryBare(bz, &msg) err = cdc.UnmarshalBinaryBare(bz, &msg)
return return

View File

@ -5,10 +5,10 @@ import (
"reflect" "reflect"
"time" "time"
"github.com/tendermint/go-amino" amino "github.com/tendermint/go-amino"
clist "github.com/tendermint/tendermint/libs/clist" clist "github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -73,7 +73,7 @@ func (evR *EvidenceReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Receive implements Reactor. // Receive implements Reactor.
// It adds any received evidence to the evpool. // It adds any received evidence to the evpool.
func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := DecodeMessage(msgBytes) msg, err := decodeMsg(msgBytes)
if err != nil { if err != nil {
evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
evR.Switch.StopPeerForError(src, err) evR.Switch.StopPeerForError(src, err)
@ -204,11 +204,9 @@ func RegisterEvidenceMessages(cdc *amino.Codec) {
"tendermint/evidence/EvidenceListMessage", nil) "tendermint/evidence/EvidenceListMessage", nil)
} }
// DecodeMessage decodes a byte-array into a EvidenceMessage. func decodeMsg(bz []byte) (msg EvidenceMessage, err error) {
func DecodeMessage(bz []byte) (msg EvidenceMessage, err error) {
if len(bz) > maxMsgSize { if len(bz) > maxMsgSize {
return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize)
len(bz), maxMsgSize)
} }
err = cdc.UnmarshalBinaryBare(bz, &msg) err = cdc.UnmarshalBinaryBare(bz, &msg)
return return

View File

@ -78,7 +78,7 @@ func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Receive implements Reactor. // Receive implements Reactor.
// It adds any received transactions to the mempool. // It adds any received transactions to the mempool.
func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := DecodeMessage(msgBytes) msg, err := decodeMsg(msgBytes)
if err != nil { if err != nil {
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
memR.Switch.StopPeerForError(src, err) memR.Switch.StopPeerForError(src, err)
@ -174,11 +174,9 @@ func RegisterMempoolMessages(cdc *amino.Codec) {
cdc.RegisterConcrete(&TxMessage{}, "tendermint/mempool/TxMessage", nil) cdc.RegisterConcrete(&TxMessage{}, "tendermint/mempool/TxMessage", nil)
} }
// DecodeMessage decodes a byte-array into a MempoolMessage. func decodeMsg(bz []byte) (msg MempoolMessage, err error) {
func DecodeMessage(bz []byte) (msg MempoolMessage, err error) {
if len(bz) > maxMsgSize { if len(bz) > maxMsgSize {
return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize)
len(bz), maxMsgSize)
} }
err = cdc.UnmarshalBinaryBare(bz, &msg) err = cdc.UnmarshalBinaryBare(bz, &msg)
return return

View File

@ -206,7 +206,7 @@ func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) {
// Receive implements Reactor by handling incoming PEX messages. // Receive implements Reactor by handling incoming PEX messages.
func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) { func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) {
msg, err := DecodeMessage(msgBytes) msg, err := decodeMsg(msgBytes)
if err != nil { if err != nil {
r.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) r.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
r.Switch.StopPeerForError(src, err) r.Switch.StopPeerForError(src, err)
@ -287,7 +287,7 @@ func (r *PEXReactor) RequestAddrs(p Peer) {
return return
} }
r.requestsSent.Set(id, struct{}{}) r.requestsSent.Set(id, struct{}{})
p.Send(PexChannel, cdc.MustMarshalBinary(&pexRequestMessage{})) p.Send(PexChannel, cdc.MustMarshalBinaryBare(&pexRequestMessage{}))
} }
// ReceiveAddrs adds the given addrs to the addrbook if theres an open // ReceiveAddrs adds the given addrs to the addrbook if theres an open
@ -324,7 +324,7 @@ func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
// SendAddrs sends addrs to the peer. // SendAddrs sends addrs to the peer.
func (r *PEXReactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) { func (r *PEXReactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) {
p.Send(PexChannel, cdc.MustMarshalBinary(&pexAddrsMessage{Addrs: netAddrs})) p.Send(PexChannel, cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: netAddrs}))
} }
// SetEnsurePeersPeriod sets period to ensure peers connected. // SetEnsurePeersPeriod sets period to ensure peers connected.
@ -670,13 +670,11 @@ func RegisterPexMessage(cdc *amino.Codec) {
cdc.RegisterConcrete(&pexAddrsMessage{}, "tendermint/p2p/PexAddrsMessage", nil) cdc.RegisterConcrete(&pexAddrsMessage{}, "tendermint/p2p/PexAddrsMessage", nil)
} }
// DecodeMessage implements interface registered above. func decodeMsg(bz []byte) (msg PexMessage, err error) {
func DecodeMessage(bz []byte) (msg PexMessage, err error) {
if len(bz) > maxMsgSize { if len(bz) > maxMsgSize {
return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize)
len(bz), maxMsgSize)
} }
err = cdc.UnmarshalBinary(bz, &msg) err = cdc.UnmarshalBinaryBare(bz, &msg)
return return
} }

View File

@ -134,11 +134,11 @@ func TestPEXReactorReceive(t *testing.T) {
size := book.Size() size := book.Size()
addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()} addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
msg := cdc.MustMarshalBinary(&pexAddrsMessage{Addrs: addrs}) msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs})
r.Receive(PexChannel, peer, msg) r.Receive(PexChannel, peer, msg)
assert.Equal(t, size+1, book.Size()) assert.Equal(t, size+1, book.Size())
msg = cdc.MustMarshalBinary(&pexRequestMessage{}) msg = cdc.MustMarshalBinaryBare(&pexRequestMessage{})
r.Receive(PexChannel, peer, msg) // should not panic. r.Receive(PexChannel, peer, msg) // should not panic.
} }
@ -154,7 +154,7 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) {
assert.True(t, sw.Peers().Has(peer.ID())) assert.True(t, sw.Peers().Has(peer.ID()))
id := string(peer.ID()) id := string(peer.ID())
msg := cdc.MustMarshalBinary(&pexRequestMessage{}) msg := cdc.MustMarshalBinaryBare(&pexRequestMessage{})
// first time creates the entry // first time creates the entry
r.Receive(PexChannel, peer, msg) r.Receive(PexChannel, peer, msg)
@ -191,7 +191,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
assert.True(t, sw.Peers().Has(peer.ID())) assert.True(t, sw.Peers().Has(peer.ID()))
addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()} addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
msg := cdc.MustMarshalBinary(&pexAddrsMessage{Addrs: addrs}) msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs})
// receive some addrs. should clear the request // receive some addrs. should clear the request
r.Receive(PexChannel, peer, msg) r.Receive(PexChannel, peer, msg)
@ -303,7 +303,7 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
size := book.Size() size := book.Size()
addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()} addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
msg := cdc.MustMarshalBinary(&pexAddrsMessage{Addrs: addrs}) msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs})
pexR.Receive(PexChannel, peer, msg) pexR.Receive(PexChannel, peer, msg)
assert.Equal(t, size, book.Size()) assert.Equal(t, size, book.Size())