Merge pull request #14561 from karalabe/txpool-perf-fix

core: reduce transaction reorganization overhead
This commit is contained in:
Péter Szilágyi 2017-06-01 10:33:47 +03:00 committed by GitHub
commit 799a469000
2 changed files with 35 additions and 32 deletions

View File

@ -251,7 +251,7 @@ func (pool *TxPool) resetState() {
} }
// Check the queue and move transactions over to the pending if possible // Check the queue and move transactions over to the pending if possible
// or remove those that have become invalid // or remove those that have become invalid
pool.promoteExecutables(currentState) pool.promoteExecutables(currentState, nil)
} }
// Stop terminates the transaction pool. // Stop terminates the transaction pool.
@ -339,17 +339,6 @@ func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
pool.mu.Lock() pool.mu.Lock()
defer pool.mu.Unlock() defer pool.mu.Unlock()
state, err := pool.currentState()
if err != nil {
return nil, err
}
// check queue first
pool.promoteExecutables(state)
// invalidate any txs
pool.demoteUnexecutables(state)
pending := make(map[common.Address]types.Transactions) pending := make(map[common.Address]types.Transactions)
for addr, list := range pool.pending { for addr, list := range pool.pending {
pending[addr] = list.Flatten() pending[addr] = list.Flatten()
@ -551,13 +540,14 @@ func (pool *TxPool) Add(tx *types.Transaction) error {
if err != nil { if err != nil {
return err return err
} }
state, err := pool.currentState()
if err != nil {
return err
}
// If we added a new transaction, run promotion checks and return // If we added a new transaction, run promotion checks and return
if !replace { if !replace {
pool.promoteExecutables(state) state, err := pool.currentState()
if err != nil {
return err
}
from, _ := types.Sender(pool.signer, tx) // already validated
pool.promoteExecutables(state, []common.Address{from})
} }
return nil return nil
} }
@ -568,24 +558,26 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) error {
defer pool.mu.Unlock() defer pool.mu.Unlock()
// Add the batch of transaction, tracking the accepted ones // Add the batch of transaction, tracking the accepted ones
replaced, added := true, 0 dirty := make(map[common.Address]struct{})
for _, tx := range txs { for _, tx := range txs {
if replace, err := pool.add(tx); err == nil { if replace, err := pool.add(tx); err == nil {
added++
if !replace { if !replace {
replaced = false from, _ := types.Sender(pool.signer, tx) // already validated
dirty[from] = struct{}{}
} }
} }
} }
// Only reprocess the internal state if something was actually added // Only reprocess the internal state if something was actually added
if added > 0 { if len(dirty) > 0 {
state, err := pool.currentState() state, err := pool.currentState()
if err != nil { if err != nil {
return err return err
} }
if !replaced { addrs := make([]common.Address, 0, len(dirty))
pool.promoteExecutables(state) for addr, _ := range dirty {
addrs = append(addrs, addr)
} }
pool.promoteExecutables(state, addrs)
} }
return nil return nil
} }
@ -662,12 +654,23 @@ func (pool *TxPool) removeTx(hash common.Hash) {
// promoteExecutables moves transactions that have become processable from the // promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all // future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted. // invalidated transactions (low nonce, low balance) are deleted.
func (pool *TxPool) promoteExecutables(state *state.StateDB) { func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) {
gaslimit := pool.gasLimit() gaslimit := pool.gasLimit()
// Gather all the accounts potentially needing updates
if accounts == nil {
accounts = make([]common.Address, 0, len(pool.queue))
for addr, _ := range pool.queue {
accounts = append(accounts, addr)
}
}
// Iterate over all accounts and promote any executable transactions // Iterate over all accounts and promote any executable transactions
queued := uint64(0) queued := uint64(0)
for addr, list := range pool.queue { for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
continue // Just in case someone calls with a non existing account
}
// Drop all transactions that are deemed too old (low nonce) // Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(state.GetNonce(addr)) { for _, tx := range list.Forward(state.GetNonce(addr)) {
hash := tx.Hash() hash := tx.Hash()

View File

@ -175,7 +175,7 @@ func TestTransactionQueue(t *testing.T) {
pool.resetState() pool.resetState()
pool.enqueueTx(tx.Hash(), tx) pool.enqueueTx(tx.Hash(), tx)
pool.promoteExecutables(currentState) pool.promoteExecutables(currentState, []common.Address{from})
if len(pool.pending) != 1 { if len(pool.pending) != 1 {
t.Error("expected valid txs to be 1 is", len(pool.pending)) t.Error("expected valid txs to be 1 is", len(pool.pending))
} }
@ -184,7 +184,7 @@ func TestTransactionQueue(t *testing.T) {
from, _ = deriveSender(tx) from, _ = deriveSender(tx)
currentState.SetNonce(from, 2) currentState.SetNonce(from, 2)
pool.enqueueTx(tx.Hash(), tx) pool.enqueueTx(tx.Hash(), tx)
pool.promoteExecutables(currentState) pool.promoteExecutables(currentState, []common.Address{from})
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
t.Error("expected transaction to be in tx pool") t.Error("expected transaction to be in tx pool")
} }
@ -206,7 +206,7 @@ func TestTransactionQueue(t *testing.T) {
pool.enqueueTx(tx2.Hash(), tx2) pool.enqueueTx(tx2.Hash(), tx2)
pool.enqueueTx(tx3.Hash(), tx3) pool.enqueueTx(tx3.Hash(), tx3)
pool.promoteExecutables(currentState) pool.promoteExecutables(currentState, []common.Address{from})
if len(pool.pending) != 1 { if len(pool.pending) != 1 {
t.Error("expected tx pool to be 1, got", len(pool.pending)) t.Error("expected tx pool to be 1, got", len(pool.pending))
@ -304,16 +304,16 @@ func TestTransactionDoubleNonce(t *testing.T) {
t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace) t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace)
} }
state, _ := pool.currentState() state, _ := pool.currentState()
pool.promoteExecutables(state) pool.promoteExecutables(state, []common.Address{addr})
if pool.pending[addr].Len() != 1 { if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
} }
if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() { if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() {
t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash()) t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash())
} }
// Add the thid transaction and ensure it's not saved (smaller price) // Add the third transaction and ensure it's not saved (smaller price)
pool.add(tx3) pool.add(tx3)
pool.promoteExecutables(state) pool.promoteExecutables(state, []common.Address{addr})
if pool.pending[addr].Len() != 1 { if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
} }
@ -1087,7 +1087,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
// Benchmark the speed of pool validation // Benchmark the speed of pool validation
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
pool.promoteExecutables(state) pool.promoteExecutables(state, nil)
} }
} }