Make mempool aware of MaxGas requirement (#2360)

* Make mempool aware of MaxGas requirement

* update spec

* Add tests

* Switch GasWanted from kv store to persistent kv store

* Fix typo in test name

* switch back to using kvstore, not persistent kv store
This commit is contained in:
Dev Ojha 2018-09-12 13:41:19 -07:00 committed by Ethan Buchman
parent 0e1cd88863
commit 1ea64fc27f
8 changed files with 88 additions and 24 deletions

View File

@ -9,13 +9,14 @@ BREAKING CHANGES:
* Apps * Apps
* Go API * Go API
* \#2310 Mempool.ReapMaxBytes -> Mempool.ReapMaxBytesMaxGas
* Blockchain Protocol * Blockchain Protocol
* P2P Protocol * P2P Protocol
FEATURES: FEATURES:
* \#2310 Mempool is now aware of the MaxGas requirement
IMPROVEMENTS: IMPROVEMENTS:

View File

@ -88,7 +88,7 @@ func (app *KVStoreApplication) DeliverTx(tx []byte) types.ResponseDeliverTx {
} }
func (app *KVStoreApplication) CheckTx(tx []byte) types.ResponseCheckTx { func (app *KVStoreApplication) CheckTx(tx []byte) types.ResponseCheckTx {
return types.ResponseCheckTx{Code: code.CodeTypeOK} return types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}
} }
func (app *KVStoreApplication) Commit() types.ResponseCommit { func (app *KVStoreApplication) Commit() types.ResponseCommit {

View File

@ -148,7 +148,7 @@ func TestMempoolRmBadTx(t *testing.T) {
// check for the tx // check for the tx
for { for {
txs := cs.mempool.ReapMaxBytes(len(txBytes)) txs := cs.mempool.ReapMaxBytesMaxGas(len(txBytes), -1)
if len(txs) == 0 { if len(txs) == 0 {
emptyMempoolCh <- struct{}{} emptyMempoolCh <- struct{}{}
return return

View File

@ -949,10 +949,12 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts
} }
maxBytes := cs.state.ConsensusParams.BlockSize.MaxBytes maxBytes := cs.state.ConsensusParams.BlockSize.MaxBytes
maxGas := cs.state.ConsensusParams.BlockSize.MaxGas
// bound evidence to 1/10th of the block // bound evidence to 1/10th of the block
evidence := cs.evpool.PendingEvidence(types.MaxEvidenceBytesPerBlock(maxBytes)) evidence := cs.evpool.PendingEvidence(types.MaxEvidenceBytesPerBlock(maxBytes))
// Mempool validated transactions // Mempool validated transactions
txs := cs.mempool.ReapMaxBytes(types.MaxDataBytes(maxBytes, cs.state.Validators.Size(), len(evidence))) txs := cs.mempool.ReapMaxBytesMaxGas(types.MaxDataBytes(maxBytes, cs.state.Validators.Size(), len(evidence)), maxGas)
proposerAddr := cs.privValidator.GetAddress() proposerAddr := cs.privValidator.GetAddress()
block, parts := cs.state.MakeBlock(cs.Height, txs, commit, evidence, proposerAddr) block, parts := cs.state.MakeBlock(cs.Height, txs, commit, evidence, proposerAddr)

View File

@ -22,7 +22,8 @@ to potentially untrusted actors.
Internal functionality is exposed via method calls to other Internal functionality is exposed via method calls to other
code compiled into the tendermint binary. code compiled into the tendermint binary.
- Reap - get tx to propose in next block - ReapMaxBytesMaxGas - get txs to propose in the next block. Guarantees that the
size of the txs is less than MaxBytes, and gas is less than MaxGas
- Update - remove tx that were included in last block - Update - remove tx that were included in last block
- ABCI.CheckTx - call ABCI app to validate the tx - ABCI.CheckTx - call ABCI app to validate the tx

View File

@ -302,9 +302,10 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
if r.CheckTx.Code == abci.CodeTypeOK { if r.CheckTx.Code == abci.CodeTypeOK {
mem.counter++ mem.counter++
memTx := &mempoolTx{ memTx := &mempoolTx{
counter: mem.counter, counter: mem.counter,
height: mem.height, height: mem.height,
tx: tx, gasWanted: r.CheckTx.GasWanted,
tx: tx,
} }
mem.txs.PushBack(memTx) mem.txs.PushBack(memTx)
mem.logger.Info("Added good transaction", "tx", TxID(tx), "res", r, "total", mem.Size()) mem.logger.Info("Added good transaction", "tx", TxID(tx), "res", r, "total", mem.Size())
@ -380,10 +381,11 @@ func (mem *Mempool) notifyTxsAvailable() {
} }
} }
// ReapMaxBytes reaps transactions from the mempool up to n bytes total. // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes bytes total
// If max is negative, there is no cap on the size of all returned // with the condition that the total gasWanted must be less than maxGas.
// If both maxes are negative, there is no cap on the size of all returned
// transactions (~ all available transactions). // transactions (~ all available transactions).
func (mem *Mempool) ReapMaxBytes(max int) types.Txs { func (mem *Mempool) ReapMaxBytesMaxGas(maxBytes int, maxGas int64) types.Txs {
var buf [binary.MaxVarintLen64]byte var buf [binary.MaxVarintLen64]byte
mem.proxyMtx.Lock() mem.proxyMtx.Lock()
@ -394,19 +396,26 @@ func (mem *Mempool) ReapMaxBytes(max int) types.Txs {
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 10)
} }
var cur int var totalBytes int
var totalGas int64
// TODO: we will get a performance boost if we have a good estimate of avg // TODO: we will get a performance boost if we have a good estimate of avg
// size per tx, and set the initial capacity based off of that. // size per tx, and set the initial capacity based off of that.
// txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max/mem.avgTxSize)) // txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max/mem.avgTxSize))
txs := make([]types.Tx, 0, mem.txs.Len()) txs := make([]types.Tx, 0, mem.txs.Len())
for e := mem.txs.Front(); e != nil; e = e.Next() { for e := mem.txs.Front(); e != nil; e = e.Next() {
memTx := e.Value.(*mempoolTx) memTx := e.Value.(*mempoolTx)
// Check total size requirement
// amino.UvarintSize is not used here because it won't be possible to reuse buf // amino.UvarintSize is not used here because it won't be possible to reuse buf
aminoOverhead := binary.PutUvarint(buf[:], uint64(len(memTx.tx))) aminoOverhead := binary.PutUvarint(buf[:], uint64(len(memTx.tx)))
if max > 0 && cur+len(memTx.tx)+aminoOverhead > max { if maxBytes > -1 && totalBytes+len(memTx.tx)+aminoOverhead > maxBytes {
return txs return txs
} }
cur += len(memTx.tx) + aminoOverhead totalBytes += len(memTx.tx) + aminoOverhead
// Check total gas requirement
if maxGas > -1 && totalGas+memTx.gasWanted > maxGas {
return txs
}
totalGas += memTx.gasWanted
txs = append(txs, memTx.tx) txs = append(txs, memTx.tx)
} }
return txs return txs
@ -513,9 +522,10 @@ func (mem *Mempool) recheckTxs(goodTxs []types.Tx) {
// mempoolTx is a transaction that successfully ran // mempoolTx is a transaction that successfully ran
type mempoolTx struct { type mempoolTx struct {
counter int64 // a simple incrementing counter counter int64 // a simple incrementing counter
height int64 // height that this tx had been validated in height int64 // height that this tx had been validated in
tx types.Tx // gasWanted int64 // amount of gas this tx states it will require
tx types.Tx //
} }
// Height returns the height for this transaction // Height returns the height for this transaction

View File

@ -11,16 +11,16 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/example/counter" "github.com/tendermint/tendermint/abci/example/counter"
"github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/abci/example/kvstore"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
"github.com/stretchr/testify/require"
) )
func newMempoolWithApp(cc proxy.ClientCreator) *Mempool { func newMempoolWithApp(cc proxy.ClientCreator) *Mempool {
@ -71,6 +71,54 @@ func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
return txs return txs
} }
func TestReapMaxBytesMaxGas(t *testing.T) {
app := kvstore.NewKVStoreApplication()
cc := proxy.NewLocalClientCreator(app)
mempool := newMempoolWithApp(cc)
// Ensure gas calculation behaves as expected
checkTxs(t, mempool, 1)
tx0 := mempool.TxsFront().Value.(*mempoolTx)
// assert that kv store has gas wanted = 1.
require.Equal(t, app.CheckTx(tx0.tx).GasWanted, int64(1), "KVStore had a gas value neq to 1")
require.Equal(t, tx0.gasWanted, int64(1), "transactions gas was set incorrectly")
// ensure each tx is 20 bytes long
require.Equal(t, len(tx0.tx), 20, "Tx is longer than 20 bytes")
mempool.Flush()
// each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
// each tx has 20 bytes + amino overhead = 21 bytes, 1 gas
tests := []struct {
numTxsToCreate int
maxBytes int
maxGas int64
expectedNumTxs int
}{
{20, -1, -1, 20},
{20, -1, 0, 0},
{20, -1, 10, 10},
{20, -1, 30, 20},
{20, 0, -1, 0},
{20, 0, 10, 0},
{20, 10, 10, 0},
{20, 21, 10, 1},
{20, 210, -1, 10},
{20, 210, 5, 5},
{20, 210, 10, 10},
{20, 210, 15, 10},
{20, 20000, -1, 20},
{20, 20000, 5, 5},
{20, 20000, 30, 20},
}
for tcIndex, tt := range tests {
checkTxs(t, mempool, tt.numTxsToCreate)
got := mempool.ReapMaxBytesMaxGas(tt.maxBytes, tt.maxGas)
assert.Equal(t, tt.expectedNumTxs, len(got), "Got %d txs, expected %d, tc #%d",
len(got), tt.expectedNumTxs, tcIndex)
mempool.Flush()
}
}
func TestTxsAvailable(t *testing.T) { func TestTxsAvailable(t *testing.T) {
app := kvstore.NewKVStoreApplication() app := kvstore.NewKVStoreApplication()
cc := proxy.NewLocalClientCreator(app) cc := proxy.NewLocalClientCreator(app)
@ -149,7 +197,7 @@ func TestSerialReap(t *testing.T) {
} }
reapCheck := func(exp int) { reapCheck := func(exp int) {
txs := mempool.ReapMaxBytes(-1) txs := mempool.ReapMaxBytesMaxGas(-1, -1)
require.Equal(t, len(txs), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, len(txs))) require.Equal(t, len(txs), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, len(txs)))
} }

View File

@ -22,7 +22,7 @@ type Mempool interface {
Size() int Size() int
CheckTx(types.Tx, func(*abci.Response)) error CheckTx(types.Tx, func(*abci.Response)) error
ReapMaxBytes(max int) types.Txs ReapMaxBytesMaxGas(maxBytes int, maxGas int64) types.Txs
Update(height int64, txs types.Txs, filter func(types.Tx) bool) error Update(height int64, txs types.Txs, filter func(types.Tx) bool) error
Flush() Flush()
FlushAppConn() error FlushAppConn() error
@ -34,11 +34,13 @@ type Mempool interface {
// MockMempool is an empty implementation of a Mempool, useful for testing. // MockMempool is an empty implementation of a Mempool, useful for testing.
type MockMempool struct{} type MockMempool struct{}
var _ Mempool = MockMempool{}
func (MockMempool) Lock() {} func (MockMempool) Lock() {}
func (MockMempool) Unlock() {} func (MockMempool) Unlock() {}
func (MockMempool) Size() int { return 0 } func (MockMempool) Size() int { return 0 }
func (MockMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) error { return nil } func (MockMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) error { return nil }
func (MockMempool) ReapMaxBytes(max int) types.Txs { return types.Txs{} } func (MockMempool) ReapMaxBytesMaxGas(maxBytes int, maxGas int64) types.Txs { return types.Txs{} }
func (MockMempool) Update(height int64, txs types.Txs, filter func(types.Tx) bool) error { return nil } func (MockMempool) Update(height int64, txs types.Txs, filter func(types.Tx) bool) error { return nil }
func (MockMempool) Flush() {} func (MockMempool) Flush() {}
func (MockMempool) FlushAppConn() error { return nil } func (MockMempool) FlushAppConn() error { return nil }