mempool: Filter new txs if they have insufficient gas (#2385)

This also refactors the prior mempool to filter to be known as
"precheck filter" and this new filter is called "postcheck filter"

This PR also fixes a bug where the precheck filter previously didn't
account for the amino overhead, which could a maliciously sized tx to
halt blocks from getting any txs in them.

* Move maxGas outside of function definition to avoid race condition
* Type filter funcs and make public
* Use helper method for post check
* Remove superfluous Filter suffix
* Move default pre/post checks into package
* Fix broken references
* Fix typos
* Expand on examples for checks
This commit is contained in:
Dev Ojha 2018-09-21 17:50:06 -07:00 committed by Alexander Simmerl
parent f99e4010f2
commit 111e627037
5 changed files with 198 additions and 51 deletions

View File

@ -22,6 +22,16 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
// PreCheckFunc is an optional filter to determine if a transaction should be
// rejected. Invoked before CheckTx. An example would be to ensure that a
// transaction isn't exceeded the block size.
type PreCheckFunc func(types.Tx) bool
// PostCheckFunc is an optional filter executed after CheckTx and rejects
// transaction if false is returned. An example would be to ensure a
// transaction doesn't require more gas than available.
type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) bool
/* /*
The mempool pushes new txs onto the proxyAppConn. The mempool pushes new txs onto the proxyAppConn.
@ -58,6 +68,27 @@ var (
ErrMempoolIsFull = errors.New("Mempool is full") ErrMempoolIsFull = errors.New("Mempool is full")
) )
// PreCheckAminoMaxBytes checks that the size of the transaction plus the amino
// overhead is smaller or equal to the expected maxBytes.
func PreCheckAminoMaxBytes(maxBytes int64) PreCheckFunc {
return func(tx types.Tx) bool {
// We have to account for the amino overhead in the tx size as well
aminoOverhead := amino.UvarintSize(uint64(len(tx)))
return int64(len(tx)+aminoOverhead) <= maxBytes
}
}
// PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed
// maxGas. Returns true if maxGas is -1.
func PostCheckMaxGas(maxGas int64) PostCheckFunc {
return func(tx types.Tx, res *abci.ResponseCheckTx) bool {
if maxGas == -1 {
return true
}
return res.GasWanted <= maxGas
}
}
// TxID is the hex encoded hash of the bytes as a types.Tx. // TxID is the hex encoded hash of the bytes as a types.Tx.
func TxID(tx []byte) string { func TxID(tx []byte) string {
return fmt.Sprintf("%X", types.Tx(tx).Hash()) return fmt.Sprintf("%X", types.Tx(tx).Hash())
@ -80,8 +111,8 @@ type Mempool struct {
recheckEnd *clist.CElement // re-checking stops here recheckEnd *clist.CElement // re-checking stops here
notifiedTxsAvailable bool notifiedTxsAvailable bool
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
// Filter mempool to only accept txs for which filter(tx) returns true. preCheck PreCheckFunc
filter func(types.Tx) bool postCheck PostCheckFunc
// Keep a cache of already-seen txs. // Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp. // This reduces the pressure on the proxyApp.
@ -141,10 +172,16 @@ func (mem *Mempool) SetLogger(l log.Logger) {
mem.logger = l mem.logger = l
} }
// WithFilter sets a filter for mempool to only accept txs for which f(tx) // WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns
// returns true. // false. This is ran before CheckTx.
func WithFilter(f func(types.Tx) bool) MempoolOption { func WithPreCheck(f PreCheckFunc) MempoolOption {
return func(mem *Mempool) { mem.filter = f } return func(mem *Mempool) { mem.preCheck = f }
}
// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns
// false. This is ran after CheckTx.
func WithPostCheck(f PostCheckFunc) MempoolOption {
return func(mem *Mempool) { mem.postCheck = f }
} }
// WithMetrics sets the metrics. // WithMetrics sets the metrics.
@ -248,7 +285,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
return ErrMempoolIsFull return ErrMempoolIsFull
} }
if mem.filter != nil && !mem.filter(tx) { if mem.preCheck != nil && !mem.preCheck(tx) {
return return
} }
@ -298,7 +335,8 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
switch r := res.Value.(type) { switch r := res.Value.(type) {
case *abci.Response_CheckTx: case *abci.Response_CheckTx:
tx := req.GetCheckTx().Tx tx := req.GetCheckTx().Tx
if r.CheckTx.Code == abci.CodeTypeOK { if (r.CheckTx.Code == abci.CodeTypeOK) &&
mem.isPostCheckPass(tx, r.CheckTx) {
mem.counter++ mem.counter++
memTx := &mempoolTx{ memTx := &mempoolTx{
counter: mem.counter, counter: mem.counter,
@ -326,10 +364,15 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
case *abci.Response_CheckTx: case *abci.Response_CheckTx:
memTx := mem.recheckCursor.Value.(*mempoolTx) memTx := mem.recheckCursor.Value.(*mempoolTx)
if !bytes.Equal(req.GetCheckTx().Tx, memTx.tx) { if !bytes.Equal(req.GetCheckTx().Tx, memTx.tx) {
cmn.PanicSanity(fmt.Sprintf("Unexpected tx response from proxy during recheck\n"+ cmn.PanicSanity(
"Expected %X, got %X", r.CheckTx.Data, memTx.tx)) fmt.Sprintf(
"Unexpected tx response from proxy during recheck\nExpected %X, got %X",
r.CheckTx.Data,
memTx.tx,
),
)
} }
if r.CheckTx.Code == abci.CodeTypeOK { if (r.CheckTx.Code == abci.CodeTypeOK) && mem.isPostCheckPass(memTx.tx, r.CheckTx) {
// Good, nothing to do. // Good, nothing to do.
} else { } else {
// Tx became invalidated due to newly committed block. // Tx became invalidated due to newly committed block.
@ -444,7 +487,12 @@ func (mem *Mempool) ReapMaxTxs(max int) types.Txs {
// Update informs the mempool that the given txs were committed and can be discarded. // Update informs the mempool that the given txs were committed and can be discarded.
// NOTE: this should be called *after* block is committed by consensus. // NOTE: this should be called *after* block is committed by consensus.
// NOTE: unsafe; Lock/Unlock must be managed by caller // NOTE: unsafe; Lock/Unlock must be managed by caller
func (mem *Mempool) Update(height int64, txs types.Txs, filter func(types.Tx) bool) error { func (mem *Mempool) Update(
height int64,
txs types.Txs,
preCheck PreCheckFunc,
postCheck PostCheckFunc,
) error {
// First, create a lookup map of txns in new txs. // First, create a lookup map of txns in new txs.
txsMap := make(map[string]struct{}, len(txs)) txsMap := make(map[string]struct{}, len(txs))
for _, tx := range txs { for _, tx := range txs {
@ -455,8 +503,11 @@ func (mem *Mempool) Update(height int64, txs types.Txs, filter func(types.Tx) bo
mem.height = height mem.height = height
mem.notifiedTxsAvailable = false mem.notifiedTxsAvailable = false
if filter != nil { if preCheck != nil {
mem.filter = filter mem.preCheck = preCheck
}
if postCheck != nil {
mem.postCheck = postCheck
} }
// Remove transactions that are already in txs. // Remove transactions that are already in txs.
@ -514,6 +565,10 @@ func (mem *Mempool) recheckTxs(goodTxs []types.Tx) {
mem.proxyAppConn.FlushAsync() mem.proxyAppConn.FlushAsync()
} }
func (mem *Mempool) isPostCheckPass(tx types.Tx, r *abci.ResponseCheckTx) bool {
return mem.postCheck == nil || mem.postCheck(tx, r)
}
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
// mempoolTx is a transaction that successfully ran // mempoolTx is a transaction that successfully ran

View File

@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
amino "github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/abci/example/counter" "github.com/tendermint/tendermint/abci/example/counter"
"github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/abci/example/kvstore"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
@ -119,6 +120,62 @@ func TestReapMaxBytesMaxGas(t *testing.T) {
} }
} }
func TestMempoolFilters(t *testing.T) {
app := kvstore.NewKVStoreApplication()
cc := proxy.NewLocalClientCreator(app)
mempool := newMempoolWithApp(cc)
emptyTxArr := []types.Tx{[]byte{}}
nopPreFilter := func(tx types.Tx) bool { return true }
nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) bool { return true }
// This is the same filter we expect to be used within node/node.go and state/execution.go
nBytePreFilter := func(n int) func(tx types.Tx) bool {
return func(tx types.Tx) bool {
// We have to account for the amino overhead in the tx size as well
aminoOverhead := amino.UvarintSize(uint64(len(tx)))
return (len(tx) + aminoOverhead) <= n
}
}
nGasPostFilter := func(n int64) func(tx types.Tx, res *abci.ResponseCheckTx) bool {
return func(tx types.Tx, res *abci.ResponseCheckTx) bool {
if n == -1 {
return true
}
return res.GasWanted <= n
}
}
// each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
// each tx has 20 bytes + amino overhead = 21 bytes, 1 gas
tests := []struct {
numTxsToCreate int
preFilter func(tx types.Tx) bool
postFilter func(tx types.Tx, res *abci.ResponseCheckTx) bool
expectedNumTxs int
}{
{10, nopPreFilter, nopPostFilter, 10},
{10, nBytePreFilter(10), nopPostFilter, 0},
{10, nBytePreFilter(20), nopPostFilter, 0},
{10, nBytePreFilter(21), nopPostFilter, 10},
{10, nopPreFilter, nGasPostFilter(-1), 10},
{10, nopPreFilter, nGasPostFilter(0), 0},
{10, nopPreFilter, nGasPostFilter(1), 10},
{10, nopPreFilter, nGasPostFilter(3000), 10},
{10, nBytePreFilter(10), nGasPostFilter(20), 0},
{10, nBytePreFilter(30), nGasPostFilter(20), 10},
{10, nBytePreFilter(21), nGasPostFilter(1), 10},
{10, nBytePreFilter(21), nGasPostFilter(0), 0},
}
for tcIndex, tt := range tests {
mempool.Update(1, emptyTxArr, tt.preFilter, tt.postFilter)
checkTxs(t, mempool, tt.numTxsToCreate)
require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex)
mempool.Flush()
}
}
func TestTxsAvailable(t *testing.T) { func TestTxsAvailable(t *testing.T) {
app := kvstore.NewKVStoreApplication() app := kvstore.NewKVStoreApplication()
cc := proxy.NewLocalClientCreator(app) cc := proxy.NewLocalClientCreator(app)
@ -139,7 +196,7 @@ func TestTxsAvailable(t *testing.T) {
// it should fire once now for the new height // it should fire once now for the new height
// since there are still txs left // since there are still txs left
committedTxs, txs := txs[:50], txs[50:] committedTxs, txs := txs[:50], txs[50:]
if err := mempool.Update(1, committedTxs, nil); err != nil { if err := mempool.Update(1, committedTxs, nil, nil); err != nil {
t.Error(err) t.Error(err)
} }
ensureFire(t, mempool.TxsAvailable(), timeoutMS) ensureFire(t, mempool.TxsAvailable(), timeoutMS)
@ -151,7 +208,7 @@ func TestTxsAvailable(t *testing.T) {
// now call update with all the txs. it should not fire as there are no txs left // now call update with all the txs. it should not fire as there are no txs left
committedTxs = append(txs, moreTxs...) committedTxs = append(txs, moreTxs...)
if err := mempool.Update(2, committedTxs, nil); err != nil { if err := mempool.Update(2, committedTxs, nil, nil); err != nil {
t.Error(err) t.Error(err)
} }
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
@ -208,7 +265,7 @@ func TestSerialReap(t *testing.T) {
binary.BigEndian.PutUint64(txBytes, uint64(i)) binary.BigEndian.PutUint64(txBytes, uint64(i))
txs = append(txs, txBytes) txs = append(txs, txBytes)
} }
if err := mempool.Update(0, txs, nil); err != nil { if err := mempool.Update(0, txs, nil, nil); err != nil {
t.Error(err) t.Error(err)
} }
} }

View File

@ -7,22 +7,23 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
_ "net/http/pprof"
"strings"
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
amino "github.com/tendermint/go-amino" amino "github.com/tendermint/go-amino"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
abci "github.com/tendermint/tendermint/abci/types"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
cs "github.com/tendermint/tendermint/consensus" cs "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/evidence" "github.com/tendermint/tendermint/evidence"
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/pex" "github.com/tendermint/tendermint/p2p/pex"
@ -40,9 +41,6 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time" tmtime "github.com/tendermint/tendermint/types/time"
"github.com/tendermint/tendermint/version" "github.com/tendermint/tendermint/version"
_ "net/http/pprof"
"strings"
) )
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -255,7 +253,17 @@ func NewNode(config *cfg.Config,
proxyApp.Mempool(), proxyApp.Mempool(),
state.LastBlockHeight, state.LastBlockHeight,
mempl.WithMetrics(memplMetrics), mempl.WithMetrics(memplMetrics),
mempl.WithFilter(sm.TxFilter(state)), mempl.WithPreCheck(
mempl.PreCheckAminoMaxBytes(
types.MaxDataBytesUnknownEvidence(
state.ConsensusParams.BlockSize.MaxBytes,
state.Validators.Size(),
),
),
),
mempl.WithPostCheck(
mempl.PostCheckMaxGas(state.ConsensusParams.BlockSize.MaxGas),
),
) )
mempoolLogger := logger.With("module", "mempool") mempoolLogger := logger.With("module", "mempool")
mempool.SetLogger(mempoolLogger) mempool.SetLogger(mempoolLogger)

View File

@ -7,6 +7,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
dbm "github.com/tendermint/tendermint/libs/db" dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -115,11 +116,16 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b
return state, nil return state, nil
} }
// Commit locks the mempool, runs the ABCI Commit message, and updates the mempool. // Commit locks the mempool, runs the ABCI Commit message, and updates the
// mempool.
// It returns the result of calling abci.Commit (the AppHash), and an error. // It returns the result of calling abci.Commit (the AppHash), and an error.
// The Mempool must be locked during commit and update because state is typically reset on Commit and old txs must be replayed // The Mempool must be locked during commit and update because state is
// against committed state before new txs are run in the mempool, lest they be invalid. // typically reset on Commit and old txs must be replayed against committed
func (blockExec *BlockExecutor) Commit(state State, block *types.Block) ([]byte, error) { // state before new txs are run in the mempool, lest they be invalid.
func (blockExec *BlockExecutor) Commit(
state State,
block *types.Block,
) ([]byte, error) {
blockExec.mempool.Lock() blockExec.mempool.Lock()
defer blockExec.mempool.Unlock() defer blockExec.mempool.Unlock()
@ -134,22 +140,35 @@ func (blockExec *BlockExecutor) Commit(state State, block *types.Block) ([]byte,
// Commit block, get hash back // Commit block, get hash back
res, err := blockExec.proxyApp.CommitSync() res, err := blockExec.proxyApp.CommitSync()
if err != nil { if err != nil {
blockExec.logger.Error("Client error during proxyAppConn.CommitSync", "err", err) blockExec.logger.Error(
"Client error during proxyAppConn.CommitSync",
"err", err,
)
return nil, err return nil, err
} }
// ResponseCommit has no error code - just data // ResponseCommit has no error code - just data
blockExec.logger.Info("Committed state", blockExec.logger.Info(
"Committed state",
"height", block.Height, "height", block.Height,
"txs", block.NumTxs, "txs", block.NumTxs,
"appHash", fmt.Sprintf("%X", res.Data)) "appHash", fmt.Sprintf("%X", res.Data),
)
// Update mempool. // Update mempool.
if err := blockExec.mempool.Update(block.Height, block.Txs, TxFilter(state)); err != nil { err = blockExec.mempool.Update(
return nil, err block.Height,
} block.Txs,
mempool.PreCheckAminoMaxBytes(
types.MaxDataBytesUnknownEvidence(
state.ConsensusParams.BlockSize.MaxBytes,
state.Validators.Size(),
),
),
mempool.PostCheckMaxGas(state.ConsensusParams.MaxGas),
)
return res.Data, nil return res.Data, err
} }
//--------------------------------------------------------- //---------------------------------------------------------

View File

@ -2,6 +2,7 @@ package state
import ( import (
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -23,7 +24,7 @@ type Mempool interface {
Size() int Size() int
CheckTx(types.Tx, func(*abci.Response)) error CheckTx(types.Tx, func(*abci.Response)) error
ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs
Update(height int64, txs types.Txs, filter func(types.Tx) bool) error Update(int64, types.Txs, mempool.PreCheckFunc, mempool.PostCheckFunc) error
Flush() Flush()
FlushAppConn() error FlushAppConn() error
@ -36,16 +37,23 @@ type MockMempool struct{}
var _ Mempool = MockMempool{} var _ Mempool = MockMempool{}
func (MockMempool) Lock() {} func (MockMempool) Lock() {}
func (MockMempool) Unlock() {} func (MockMempool) Unlock() {}
func (MockMempool) Size() int { return 0 } func (MockMempool) Size() int { return 0 }
func (MockMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) error { return nil } func (MockMempool) CheckTx(_ types.Tx, _ func(*abci.Response)) error { return nil }
func (MockMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { return types.Txs{} } func (MockMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (MockMempool) Update(height int64, txs types.Txs, filter func(types.Tx) bool) error { return nil } func (MockMempool) Update(
func (MockMempool) Flush() {} _ int64,
func (MockMempool) FlushAppConn() error { return nil } _ types.Txs,
func (MockMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } _ mempool.PreCheckFunc,
func (MockMempool) EnableTxsAvailable() {} _ mempool.PostCheckFunc,
) error {
return nil
}
func (MockMempool) Flush() {}
func (MockMempool) FlushAppConn() error { return nil }
func (MockMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) }
func (MockMempool) EnableTxsAvailable() {}
//------------------------------------------------------ //------------------------------------------------------
// blockstore // blockstore
@ -82,5 +90,5 @@ type EvidencePool interface {
type MockEvidencePool struct{} type MockEvidencePool struct{}
func (m MockEvidencePool) PendingEvidence(int64) []types.Evidence { return nil } func (m MockEvidencePool) PendingEvidence(int64) []types.Evidence { return nil }
func (m MockEvidencePool) AddEvidence(types.Evidence) error { return nil } func (m MockEvidencePool) AddEvidence(types.Evidence) error { return nil }
func (m MockEvidencePool) Update(*types.Block, State) {} func (m MockEvidencePool) Update(*types.Block, State) {}