diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 70a599ba..449a42ff 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -5,12 +5,13 @@ import ( "reflect" "time" - "github.com/tendermint/go-amino" + amino "github.com/tendermint/go-amino" + + cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" ) const ( @@ -174,7 +175,7 @@ func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage, // Receive implements Reactor by handling 4 types of messages (look below). func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - msg, err := DecodeMessage(msgBytes) + msg, err := decodeMsg(msgBytes) if err != nil { bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) bcR.Switch.StopPeerForError(src, err) @@ -342,17 +343,11 @@ func RegisterBlockchainMessages(cdc *amino.Codec) { cdc.RegisterConcrete(&bcStatusRequestMessage{}, "tendermint/mempool/StatusRequest", nil) } -// DecodeMessage decodes BlockchainMessage. -// TODO: ensure that bz is completely read. -func DecodeMessage(bz []byte) (msg BlockchainMessage, err error) { +func decodeMsg(bz []byte) (msg BlockchainMessage, err error) { if len(bz) > maxMsgSize { - return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", - len(bz), maxMsgSize) + return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize) } err = cdc.UnmarshalBinaryBare(bz, &msg) - if err != nil { - err = cmn.ErrorWrap(err, "DecodeMessage() had bytes left over") - } return } diff --git a/consensus/reactor.go b/consensus/reactor.go index 1cac32b8..76a15194 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -9,11 +9,11 @@ import ( "github.com/pkg/errors" amino "github.com/tendermint/go-amino" - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" cstypes "github.com/tendermint/tendermint/consensus/types" + cmn "github.com/tendermint/tendermint/libs/common" tmevents "github.com/tendermint/tendermint/libs/events" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -184,7 +184,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) return } - msg, err := DecodeMessage(msgBytes) + msg, err := decodeMsg(msgBytes) if err != nil { conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) conR.Switch.StopPeerForError(src, err) @@ -1307,11 +1307,9 @@ func RegisterConsensusMessages(cdc *amino.Codec) { cdc.RegisterConcrete(&ProposalHeartbeatMessage{}, "tendermint/ProposalHeartbeat", nil) } -// DecodeMessage decodes the given bytes into a ConsensusMessage. -func DecodeMessage(bz []byte) (msg ConsensusMessage, err error) { +func decodeMsg(bz []byte) (msg ConsensusMessage, err error) { if len(bz) > maxMsgSize { - return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", - len(bz), maxMsgSize) + return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize) } err = cdc.UnmarshalBinaryBare(bz, &msg) return diff --git a/evidence/reactor.go b/evidence/reactor.go index 7b22b8db..bf11ac10 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -5,10 +5,10 @@ import ( "reflect" "time" - "github.com/tendermint/go-amino" + amino "github.com/tendermint/go-amino" + clist "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -73,7 +73,7 @@ func (evR *EvidenceReactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Receive implements Reactor. // It adds any received evidence to the evpool. func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - msg, err := DecodeMessage(msgBytes) + msg, err := decodeMsg(msgBytes) if err != nil { evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) evR.Switch.StopPeerForError(src, err) @@ -204,11 +204,9 @@ func RegisterEvidenceMessages(cdc *amino.Codec) { "tendermint/evidence/EvidenceListMessage", nil) } -// DecodeMessage decodes a byte-array into a EvidenceMessage. -func DecodeMessage(bz []byte) (msg EvidenceMessage, err error) { +func decodeMsg(bz []byte) (msg EvidenceMessage, err error) { if len(bz) > maxMsgSize { - return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", - len(bz), maxMsgSize) + return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize) } err = cdc.UnmarshalBinaryBare(bz, &msg) return diff --git a/mempool/reactor.go b/mempool/reactor.go index e63ff58e..96988be7 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -78,7 +78,7 @@ func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Receive implements Reactor. // It adds any received transactions to the mempool. func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - msg, err := DecodeMessage(msgBytes) + msg, err := decodeMsg(msgBytes) if err != nil { memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) memR.Switch.StopPeerForError(src, err) @@ -174,11 +174,9 @@ func RegisterMempoolMessages(cdc *amino.Codec) { cdc.RegisterConcrete(&TxMessage{}, "tendermint/mempool/TxMessage", nil) } -// DecodeMessage decodes a byte-array into a MempoolMessage. -func DecodeMessage(bz []byte) (msg MempoolMessage, err error) { +func decodeMsg(bz []byte) (msg MempoolMessage, err error) { if len(bz) > maxMsgSize { - return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", - len(bz), maxMsgSize) + return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize) } err = cdc.UnmarshalBinaryBare(bz, &msg) return diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 2d93783d..e90665a3 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -206,7 +206,7 @@ func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) { // Receive implements Reactor by handling incoming PEX messages. func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) { - msg, err := DecodeMessage(msgBytes) + msg, err := decodeMsg(msgBytes) if err != nil { r.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) r.Switch.StopPeerForError(src, err) @@ -287,7 +287,7 @@ func (r *PEXReactor) RequestAddrs(p Peer) { return } r.requestsSent.Set(id, struct{}{}) - p.Send(PexChannel, cdc.MustMarshalBinary(&pexRequestMessage{})) + p.Send(PexChannel, cdc.MustMarshalBinaryBare(&pexRequestMessage{})) } // ReceiveAddrs adds the given addrs to the addrbook if theres an open @@ -324,7 +324,7 @@ func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error { // SendAddrs sends addrs to the peer. func (r *PEXReactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) { - p.Send(PexChannel, cdc.MustMarshalBinary(&pexAddrsMessage{Addrs: netAddrs})) + p.Send(PexChannel, cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: netAddrs})) } // SetEnsurePeersPeriod sets period to ensure peers connected. @@ -670,13 +670,11 @@ func RegisterPexMessage(cdc *amino.Codec) { cdc.RegisterConcrete(&pexAddrsMessage{}, "tendermint/p2p/PexAddrsMessage", nil) } -// DecodeMessage implements interface registered above. -func DecodeMessage(bz []byte) (msg PexMessage, err error) { +func decodeMsg(bz []byte) (msg PexMessage, err error) { if len(bz) > maxMsgSize { - return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", - len(bz), maxMsgSize) + return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize) } - err = cdc.UnmarshalBinary(bz, &msg) + err = cdc.UnmarshalBinaryBare(bz, &msg) return } diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 6d6e91c3..629c9397 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -134,11 +134,11 @@ func TestPEXReactorReceive(t *testing.T) { size := book.Size() addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()} - msg := cdc.MustMarshalBinary(&pexAddrsMessage{Addrs: addrs}) + msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs}) r.Receive(PexChannel, peer, msg) assert.Equal(t, size+1, book.Size()) - msg = cdc.MustMarshalBinary(&pexRequestMessage{}) + msg = cdc.MustMarshalBinaryBare(&pexRequestMessage{}) r.Receive(PexChannel, peer, msg) // should not panic. } @@ -154,7 +154,7 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) { assert.True(t, sw.Peers().Has(peer.ID())) id := string(peer.ID()) - msg := cdc.MustMarshalBinary(&pexRequestMessage{}) + msg := cdc.MustMarshalBinaryBare(&pexRequestMessage{}) // first time creates the entry r.Receive(PexChannel, peer, msg) @@ -191,7 +191,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) { assert.True(t, sw.Peers().Has(peer.ID())) addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()} - msg := cdc.MustMarshalBinary(&pexAddrsMessage{Addrs: addrs}) + msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs}) // receive some addrs. should clear the request r.Receive(PexChannel, peer, msg) @@ -303,7 +303,7 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) { size := book.Size() addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()} - msg := cdc.MustMarshalBinary(&pexAddrsMessage{Addrs: addrs}) + msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs}) pexR.Receive(PexChannel, peer, msg) assert.Equal(t, size, book.Size())