diff --git a/mempool/mempool.go b/mempool/mempool.go index 1bcad6fa..543c274d 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -22,6 +22,16 @@ import ( "github.com/tendermint/tendermint/types" ) +// PreCheckFunc is an optional filter to determine if a transaction should be +// rejected. Invoked before CheckTx. An example would be to ensure that a +// transaction isn't exceeded the block size. +type PreCheckFunc func(types.Tx) bool + +// PostCheckFunc is an optional filter executed after CheckTx and rejects +// transaction if false is returned. An example would be to ensure a +// transaction doesn't require more gas than available. +type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) bool + /* The mempool pushes new txs onto the proxyAppConn. @@ -58,6 +68,27 @@ var ( ErrMempoolIsFull = errors.New("Mempool is full") ) +// PreCheckAminoMaxBytes checks that the size of the transaction plus the amino +// overhead is smaller or equal to the expected maxBytes. +func PreCheckAminoMaxBytes(maxBytes int64) PreCheckFunc { + return func(tx types.Tx) bool { + // We have to account for the amino overhead in the tx size as well + aminoOverhead := amino.UvarintSize(uint64(len(tx))) + return int64(len(tx)+aminoOverhead) <= maxBytes + } +} + +// PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed +// maxGas. Returns true if maxGas is -1. +func PostCheckMaxGas(maxGas int64) PostCheckFunc { + return func(tx types.Tx, res *abci.ResponseCheckTx) bool { + if maxGas == -1 { + return true + } + return res.GasWanted <= maxGas + } +} + // TxID is the hex encoded hash of the bytes as a types.Tx. func TxID(tx []byte) string { return fmt.Sprintf("%X", types.Tx(tx).Hash()) @@ -80,8 +111,8 @@ type Mempool struct { recheckEnd *clist.CElement // re-checking stops here notifiedTxsAvailable bool txsAvailable chan struct{} // fires once for each height, when the mempool is not empty - // Filter mempool to only accept txs for which filter(tx) returns true. - filter func(types.Tx) bool + preCheck PreCheckFunc + postCheck PostCheckFunc // Keep a cache of already-seen txs. // This reduces the pressure on the proxyApp. @@ -141,10 +172,16 @@ func (mem *Mempool) SetLogger(l log.Logger) { mem.logger = l } -// WithFilter sets a filter for mempool to only accept txs for which f(tx) -// returns true. -func WithFilter(f func(types.Tx) bool) MempoolOption { - return func(mem *Mempool) { mem.filter = f } +// WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns +// false. This is ran before CheckTx. +func WithPreCheck(f PreCheckFunc) MempoolOption { + return func(mem *Mempool) { mem.preCheck = f } +} + +// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns +// false. This is ran after CheckTx. +func WithPostCheck(f PostCheckFunc) MempoolOption { + return func(mem *Mempool) { mem.postCheck = f } } // WithMetrics sets the metrics. @@ -248,7 +285,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { return ErrMempoolIsFull } - if mem.filter != nil && !mem.filter(tx) { + if mem.preCheck != nil && !mem.preCheck(tx) { return } @@ -298,7 +335,8 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { switch r := res.Value.(type) { case *abci.Response_CheckTx: tx := req.GetCheckTx().Tx - if r.CheckTx.Code == abci.CodeTypeOK { + if (r.CheckTx.Code == abci.CodeTypeOK) && + mem.isPostCheckPass(tx, r.CheckTx) { mem.counter++ memTx := &mempoolTx{ counter: mem.counter, @@ -326,10 +364,15 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) { case *abci.Response_CheckTx: memTx := mem.recheckCursor.Value.(*mempoolTx) if !bytes.Equal(req.GetCheckTx().Tx, memTx.tx) { - cmn.PanicSanity(fmt.Sprintf("Unexpected tx response from proxy during recheck\n"+ - "Expected %X, got %X", r.CheckTx.Data, memTx.tx)) + cmn.PanicSanity( + fmt.Sprintf( + "Unexpected tx response from proxy during recheck\nExpected %X, got %X", + r.CheckTx.Data, + memTx.tx, + ), + ) } - if r.CheckTx.Code == abci.CodeTypeOK { + if (r.CheckTx.Code == abci.CodeTypeOK) && mem.isPostCheckPass(memTx.tx, r.CheckTx) { // Good, nothing to do. } else { // Tx became invalidated due to newly committed block. @@ -444,7 +487,12 @@ func (mem *Mempool) ReapMaxTxs(max int) types.Txs { // Update informs the mempool that the given txs were committed and can be discarded. // NOTE: this should be called *after* block is committed by consensus. // NOTE: unsafe; Lock/Unlock must be managed by caller -func (mem *Mempool) Update(height int64, txs types.Txs, filter func(types.Tx) bool) error { +func (mem *Mempool) Update( + height int64, + txs types.Txs, + preCheck PreCheckFunc, + postCheck PostCheckFunc, +) error { // First, create a lookup map of txns in new txs. txsMap := make(map[string]struct{}, len(txs)) for _, tx := range txs { @@ -455,8 +503,11 @@ func (mem *Mempool) Update(height int64, txs types.Txs, filter func(types.Tx) bo mem.height = height mem.notifiedTxsAvailable = false - if filter != nil { - mem.filter = filter + if preCheck != nil { + mem.preCheck = preCheck + } + if postCheck != nil { + mem.postCheck = postCheck } // Remove transactions that are already in txs. @@ -514,6 +565,10 @@ func (mem *Mempool) recheckTxs(goodTxs []types.Tx) { mem.proxyAppConn.FlushAsync() } +func (mem *Mempool) isPostCheckPass(tx types.Tx, r *abci.ResponseCheckTx) bool { + return mem.postCheck == nil || mem.postCheck(tx, r) +} + //-------------------------------------------------------------------------------- // mempoolTx is a transaction that successfully ran diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index dc7259dd..4f66da36 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + amino "github.com/tendermint/go-amino" "github.com/tendermint/tendermint/abci/example/counter" "github.com/tendermint/tendermint/abci/example/kvstore" abci "github.com/tendermint/tendermint/abci/types" @@ -119,6 +120,62 @@ func TestReapMaxBytesMaxGas(t *testing.T) { } } +func TestMempoolFilters(t *testing.T) { + app := kvstore.NewKVStoreApplication() + cc := proxy.NewLocalClientCreator(app) + mempool := newMempoolWithApp(cc) + emptyTxArr := []types.Tx{[]byte{}} + + nopPreFilter := func(tx types.Tx) bool { return true } + nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) bool { return true } + + // This is the same filter we expect to be used within node/node.go and state/execution.go + nBytePreFilter := func(n int) func(tx types.Tx) bool { + return func(tx types.Tx) bool { + // We have to account for the amino overhead in the tx size as well + aminoOverhead := amino.UvarintSize(uint64(len(tx))) + return (len(tx) + aminoOverhead) <= n + } + } + + nGasPostFilter := func(n int64) func(tx types.Tx, res *abci.ResponseCheckTx) bool { + return func(tx types.Tx, res *abci.ResponseCheckTx) bool { + if n == -1 { + return true + } + return res.GasWanted <= n + } + } + + // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs. + // each tx has 20 bytes + amino overhead = 21 bytes, 1 gas + tests := []struct { + numTxsToCreate int + preFilter func(tx types.Tx) bool + postFilter func(tx types.Tx, res *abci.ResponseCheckTx) bool + expectedNumTxs int + }{ + {10, nopPreFilter, nopPostFilter, 10}, + {10, nBytePreFilter(10), nopPostFilter, 0}, + {10, nBytePreFilter(20), nopPostFilter, 0}, + {10, nBytePreFilter(21), nopPostFilter, 10}, + {10, nopPreFilter, nGasPostFilter(-1), 10}, + {10, nopPreFilter, nGasPostFilter(0), 0}, + {10, nopPreFilter, nGasPostFilter(1), 10}, + {10, nopPreFilter, nGasPostFilter(3000), 10}, + {10, nBytePreFilter(10), nGasPostFilter(20), 0}, + {10, nBytePreFilter(30), nGasPostFilter(20), 10}, + {10, nBytePreFilter(21), nGasPostFilter(1), 10}, + {10, nBytePreFilter(21), nGasPostFilter(0), 0}, + } + for tcIndex, tt := range tests { + mempool.Update(1, emptyTxArr, tt.preFilter, tt.postFilter) + checkTxs(t, mempool, tt.numTxsToCreate) + require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex) + mempool.Flush() + } +} + func TestTxsAvailable(t *testing.T) { app := kvstore.NewKVStoreApplication() cc := proxy.NewLocalClientCreator(app) @@ -139,7 +196,7 @@ func TestTxsAvailable(t *testing.T) { // it should fire once now for the new height // since there are still txs left committedTxs, txs := txs[:50], txs[50:] - if err := mempool.Update(1, committedTxs, nil); err != nil { + if err := mempool.Update(1, committedTxs, nil, nil); err != nil { t.Error(err) } ensureFire(t, mempool.TxsAvailable(), timeoutMS) @@ -151,7 +208,7 @@ func TestTxsAvailable(t *testing.T) { // now call update with all the txs. it should not fire as there are no txs left committedTxs = append(txs, moreTxs...) - if err := mempool.Update(2, committedTxs, nil); err != nil { + if err := mempool.Update(2, committedTxs, nil, nil); err != nil { t.Error(err) } ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) @@ -208,7 +265,7 @@ func TestSerialReap(t *testing.T) { binary.BigEndian.PutUint64(txBytes, uint64(i)) txs = append(txs, txBytes) } - if err := mempool.Update(0, txs, nil); err != nil { + if err := mempool.Update(0, txs, nil, nil); err != nil { t.Error(err) } } diff --git a/node/node.go b/node/node.go index 97ea8143..0e5581a5 100644 --- a/node/node.go +++ b/node/node.go @@ -7,22 +7,23 @@ import ( "fmt" "net" "net/http" + _ "net/http/pprof" + "strings" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - amino "github.com/tendermint/go-amino" - abci "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/crypto/ed25519" - cmn "github.com/tendermint/tendermint/libs/common" - dbm "github.com/tendermint/tendermint/libs/db" - "github.com/tendermint/tendermint/libs/log" + abci "github.com/tendermint/tendermint/abci/types" bc "github.com/tendermint/tendermint/blockchain" cfg "github.com/tendermint/tendermint/config" cs "github.com/tendermint/tendermint/consensus" + "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/evidence" + cmn "github.com/tendermint/tendermint/libs/common" + dbm "github.com/tendermint/tendermint/libs/db" + "github.com/tendermint/tendermint/libs/log" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p/pex" @@ -40,9 +41,6 @@ import ( "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" "github.com/tendermint/tendermint/version" - - _ "net/http/pprof" - "strings" ) //------------------------------------------------------------------------------ @@ -255,7 +253,17 @@ func NewNode(config *cfg.Config, proxyApp.Mempool(), state.LastBlockHeight, mempl.WithMetrics(memplMetrics), - mempl.WithFilter(sm.TxFilter(state)), + mempl.WithPreCheck( + mempl.PreCheckAminoMaxBytes( + types.MaxDataBytesUnknownEvidence( + state.ConsensusParams.BlockSize.MaxBytes, + state.Validators.Size(), + ), + ), + ), + mempl.WithPostCheck( + mempl.PostCheckMaxGas(state.ConsensusParams.BlockSize.MaxGas), + ), ) mempoolLogger := logger.With("module", "mempool") mempool.SetLogger(mempoolLogger) diff --git a/state/execution.go b/state/execution.go index 60fa4780..c6d5ce0a 100644 --- a/state/execution.go +++ b/state/execution.go @@ -7,6 +7,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) @@ -115,11 +116,16 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b return state, nil } -// Commit locks the mempool, runs the ABCI Commit message, and updates the mempool. +// Commit locks the mempool, runs the ABCI Commit message, and updates the +// mempool. // It returns the result of calling abci.Commit (the AppHash), and an error. -// The Mempool must be locked during commit and update because state is typically reset on Commit and old txs must be replayed -// against committed state before new txs are run in the mempool, lest they be invalid. -func (blockExec *BlockExecutor) Commit(state State, block *types.Block) ([]byte, error) { +// The Mempool must be locked during commit and update because state is +// typically reset on Commit and old txs must be replayed against committed +// state before new txs are run in the mempool, lest they be invalid. +func (blockExec *BlockExecutor) Commit( + state State, + block *types.Block, +) ([]byte, error) { blockExec.mempool.Lock() defer blockExec.mempool.Unlock() @@ -134,22 +140,35 @@ func (blockExec *BlockExecutor) Commit(state State, block *types.Block) ([]byte, // Commit block, get hash back res, err := blockExec.proxyApp.CommitSync() if err != nil { - blockExec.logger.Error("Client error during proxyAppConn.CommitSync", "err", err) + blockExec.logger.Error( + "Client error during proxyAppConn.CommitSync", + "err", err, + ) return nil, err } // ResponseCommit has no error code - just data - blockExec.logger.Info("Committed state", + blockExec.logger.Info( + "Committed state", "height", block.Height, "txs", block.NumTxs, - "appHash", fmt.Sprintf("%X", res.Data)) + "appHash", fmt.Sprintf("%X", res.Data), + ) // Update mempool. - if err := blockExec.mempool.Update(block.Height, block.Txs, TxFilter(state)); err != nil { - return nil, err - } + err = blockExec.mempool.Update( + block.Height, + block.Txs, + mempool.PreCheckAminoMaxBytes( + types.MaxDataBytesUnknownEvidence( + state.ConsensusParams.BlockSize.MaxBytes, + state.Validators.Size(), + ), + ), + mempool.PostCheckMaxGas(state.ConsensusParams.MaxGas), + ) - return res.Data, nil + return res.Data, err } //--------------------------------------------------------- diff --git a/state/services.go b/state/services.go index 320b4772..b8f1febe 100644 --- a/state/services.go +++ b/state/services.go @@ -2,6 +2,7 @@ package state import ( abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/types" ) @@ -23,7 +24,7 @@ type Mempool interface { Size() int CheckTx(types.Tx, func(*abci.Response)) error ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs - Update(height int64, txs types.Txs, filter func(types.Tx) bool) error + Update(int64, types.Txs, mempool.PreCheckFunc, mempool.PostCheckFunc) error Flush() FlushAppConn() error @@ -36,16 +37,23 @@ type MockMempool struct{} var _ Mempool = MockMempool{} -func (MockMempool) Lock() {} -func (MockMempool) Unlock() {} -func (MockMempool) Size() int { return 0 } -func (MockMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) error { return nil } -func (MockMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { return types.Txs{} } -func (MockMempool) Update(height int64, txs types.Txs, filter func(types.Tx) bool) error { return nil } -func (MockMempool) Flush() {} -func (MockMempool) FlushAppConn() error { return nil } -func (MockMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } -func (MockMempool) EnableTxsAvailable() {} +func (MockMempool) Lock() {} +func (MockMempool) Unlock() {} +func (MockMempool) Size() int { return 0 } +func (MockMempool) CheckTx(_ types.Tx, _ func(*abci.Response)) error { return nil } +func (MockMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } +func (MockMempool) Update( + _ int64, + _ types.Txs, + _ mempool.PreCheckFunc, + _ mempool.PostCheckFunc, +) error { + return nil +} +func (MockMempool) Flush() {} +func (MockMempool) FlushAppConn() error { return nil } +func (MockMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } +func (MockMempool) EnableTxsAvailable() {} //------------------------------------------------------ // blockstore @@ -82,5 +90,5 @@ type EvidencePool interface { type MockEvidencePool struct{} func (m MockEvidencePool) PendingEvidence(int64) []types.Evidence { return nil } -func (m MockEvidencePool) AddEvidence(types.Evidence) error { return nil } -func (m MockEvidencePool) Update(*types.Block, State) {} +func (m MockEvidencePool) AddEvidence(types.Evidence) error { return nil } +func (m MockEvidencePool) Update(*types.Block, State) {}