diff --git a/mempool/reactor.go b/mempool/reactor.go index 514347e9..acd693e4 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -1,13 +1,12 @@ package mempool import ( - "bytes" "fmt" "reflect" "time" abci "github.com/tendermint/abci/types" - wire "github.com/tendermint/go-wire" + "github.com/tendermint/go-amino" "github.com/tendermint/tmlibs/clist" "github.com/tendermint/tmlibs/log" @@ -71,7 +70,7 @@ func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Receive implements Reactor. // It adds any received transactions to the mempool. func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - _, msg, err := DecodeMessage(msgBytes) + msg, err := DecodeMessage(msgBytes) if err != nil { memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) memR.Switch.StopPeerForError(src, err) @@ -137,7 +136,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { } // send memTx msg := &TxMessage{Tx: memTx.tx} - success := peer.Send(MempoolChannel, struct{ MempoolMessage }{msg}) + success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg)) if !success { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue @@ -158,24 +157,17 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { //----------------------------------------------------------------------------- // Messages -const ( - msgTypeTx = byte(0x01) -) - // MempoolMessage is a message sent or received by the MempoolReactor. type MempoolMessage interface{} -var _ = wire.RegisterInterface( - struct{ MempoolMessage }{}, - wire.ConcreteType{&TxMessage{}, msgTypeTx}, -) +func RegisterMempoolMessages(cdc *amino.Codec) { + cdc.RegisterInterface((*MempoolMessage)(nil), nil) + cdc.RegisterConcrete(&TxMessage{}, "tendermint/mempool/TxMessage", nil) +} // DecodeMessage decodes a byte-array into a MempoolMessage. -func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) { - msgType = bz[0] - n := new(int) - r := bytes.NewReader(bz) - msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, maxMempoolMessageSize, n, &err).(struct{ MempoolMessage }).MempoolMessage +func DecodeMessage(bz []byte) (msg MempoolMessage, err error) { + err = cdc.UnmarshalBinaryBare(bz, &msg) return } diff --git a/mempool/wire.go b/mempool/wire.go new file mode 100644 index 00000000..ed089726 --- /dev/null +++ b/mempool/wire.go @@ -0,0 +1,11 @@ +package mempool + +import ( + "github.com/tendermint/go-amino" +) + +var cdc = amino.NewCodec() + +func init() { + RegisterMempoolMessages(cdc) +}