diff --git a/core/tx_pool.go b/core/tx_pool.go index edcbc21eb..b805cf226 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -176,7 +176,7 @@ func (pool *TxPool) resetState() { // any transactions that have been included in the block or // have been invalidated because of another transaction (e.g. // higher gas price) - pool.demoteUnexecutables() + pool.demoteUnexecutables(currentState) // Update all accounts to the latest known pending nonce for addr, list := range pool.pending { @@ -185,7 +185,7 @@ func (pool *TxPool) resetState() { } // Check the queue and move transactions over to the pending if possible // or remove those that have become invalid - pool.promoteExecutables() + pool.promoteExecutables(currentState) } func (pool *TxPool) Stop() { @@ -196,8 +196,12 @@ func (pool *TxPool) Stop() { } func (pool *TxPool) State() *state.ManagedState { - pool.mu.RLock() - defer pool.mu.RUnlock() + pool.mu.Lock() + defer pool.mu.Unlock() + + if pool.pendingState == nil { + pool.resetState() + } return pool.pendingState } @@ -237,21 +241,26 @@ func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common // Pending retrieves all currently processable transactions, groupped by origin // account and sorted by nonce. The returned transaction set is a copy and can be // freely modified by calling code. -func (pool *TxPool) Pending() map[common.Address]types.Transactions { +func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) { pool.mu.Lock() defer pool.mu.Unlock() + state, err := pool.currentState() + if err != nil { + return nil, err + } + // check queue first - pool.promoteExecutables() + pool.promoteExecutables(state) // invalidate any txs - pool.demoteUnexecutables() + pool.demoteUnexecutables(state) pending := make(map[common.Address]types.Transactions) for addr, list := range pool.pending { pending[addr] = list.Flatten() } - return pending + return pending, nil } // SetLocal marks a transaction as local, skipping gas price @@ -410,13 +419,19 @@ func (pool *TxPool) Add(tx *types.Transaction) error { if err := pool.add(tx); err != nil { return err } - pool.promoteExecutables() + + state, err := pool.currentState() + if err != nil { + return err + } + + pool.promoteExecutables(state) return nil } // AddBatch attempts to queue a batch of transactions. -func (pool *TxPool) AddBatch(txs []*types.Transaction) { +func (pool *TxPool) AddBatch(txs []*types.Transaction) error { pool.mu.Lock() defer pool.mu.Unlock() @@ -425,7 +440,15 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) { glog.V(logger.Debug).Infoln("tx error:", err) } } - pool.promoteExecutables() + + state, err := pool.currentState() + if err != nil { + return err + } + + pool.promoteExecutables(state) + + return nil } // Get returns a transaction if it is contained in the pool @@ -499,17 +522,7 @@ func (pool *TxPool) removeTx(hash common.Hash) { // promoteExecutables moves transactions that have become processable from the // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. -func (pool *TxPool) promoteExecutables() { - // Init delayed since tx pool could have been started before any state sync - if pool.pendingState == nil { - pool.resetState() - } - // Retrieve the current state to allow nonce and balance checking - state, err := pool.currentState() - if err != nil { - glog.Errorf("Could not get current state: %v", err) - return - } +func (pool *TxPool) promoteExecutables(state *state.StateDB) { // Iterate over all accounts and promote any executable transactions queued := uint64(0) for addr, list := range pool.queue { @@ -645,13 +658,7 @@ func (pool *TxPool) promoteExecutables() { // demoteUnexecutables removes invalid and processed transactions from the pools // executable/pending queue and any subsequent transactions that become unexecutable // are moved back into the future queue. -func (pool *TxPool) demoteUnexecutables() { - // Retrieve the current state to allow nonce and balance checking - state, err := pool.currentState() - if err != nil { - glog.V(logger.Info).Infoln("failed to get current state: %v", err) - return - } +func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { // Iterate over all accounts and demote any non-executable transactions for addr, list := range pool.pending { nonce := state.GetNonce(addr) diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 009d19886..3e516735b 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -51,6 +51,80 @@ func deriveSender(tx *types.Transaction) (common.Address, error) { return types.Sender(types.HomesteadSigner{}, tx) } +// This test simulates a scenario where a new block is imported during a +// state reset and tests whether the pending state is in sync with the +// block head event that initiated the resetState(). +func TestStateChangeDuringPoolReset(t *testing.T) { + var ( + db, _ = ethdb.NewMemDatabase() + key, _ = crypto.GenerateKey() + address = crypto.PubkeyToAddress(key.PublicKey) + mux = new(event.TypeMux) + statedb, _ = state.New(common.Hash{}, db) + trigger = false + ) + + // setup pool with 2 transaction in it + statedb.SetBalance(address, new(big.Int).Mul(common.Big1, common.Ether)) + + tx0 := transaction(0, big.NewInt(100000), key) + tx1 := transaction(1, big.NewInt(100000), key) + + // stateFunc is used multiple times to reset the pending state. + // when simulate is true it will create a state that indicates + // that tx0 and tx1 are included in the chain. + stateFunc := func() (*state.StateDB, error) { + // delay "state change" by one. The tx pool fetches the + // state multiple times and by delaying it a bit we simulate + // a state change between those fetches. + stdb := statedb + if trigger { + statedb, _ = state.New(common.Hash{}, db) + // simulate that the new head block included tx0 and tx1 + statedb.SetNonce(address, 2) + statedb.SetBalance(address, new(big.Int).Mul(common.Big1, common.Ether)) + trigger = false + } + return stdb, nil + } + + gasLimitFunc := func() *big.Int { return big.NewInt(1000000000) } + + txpool := NewTxPool(testChainConfig(), mux, stateFunc, gasLimitFunc) + txpool.resetState() + + nonce := txpool.State().GetNonce(address) + if nonce != 0 { + t.Fatalf("Invalid nonce, want 0, got %d", nonce) + } + + txpool.AddBatch(types.Transactions{tx0, tx1}) + + nonce = txpool.State().GetNonce(address) + if nonce != 2 { + t.Fatalf("Invalid nonce, want 2, got %d", nonce) + } + + // trigger state change in the background + trigger = true + + txpool.resetState() + + pendingTx, err := txpool.Pending() + if err != nil { + t.Fatalf("Could not fetch pending transactions: %v", err) + } + + for addr, txs := range pendingTx { + t.Logf("%0x: %d\n", addr, len(txs)) + } + + nonce = txpool.State().GetNonce(address) + if nonce != 2 { + t.Fatalf("Invalid nonce, want 2, got %d", nonce) + } +} + func TestInvalidTransactions(t *testing.T) { pool, key := setupTxPool() @@ -97,9 +171,10 @@ func TestTransactionQueue(t *testing.T) { from, _ := deriveSender(tx) currentState, _ := pool.currentState() currentState.AddBalance(from, big.NewInt(1000)) + pool.resetState() pool.enqueueTx(tx.Hash(), tx) - pool.promoteExecutables() + pool.promoteExecutables(currentState) if len(pool.pending) != 1 { t.Error("expected valid txs to be 1 is", len(pool.pending)) } @@ -108,7 +183,7 @@ func TestTransactionQueue(t *testing.T) { from, _ = deriveSender(tx) currentState.SetNonce(from, 2) pool.enqueueTx(tx.Hash(), tx) - pool.promoteExecutables() + pool.promoteExecutables(currentState) if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { t.Error("expected transaction to be in tx pool") } @@ -124,11 +199,13 @@ func TestTransactionQueue(t *testing.T) { from, _ = deriveSender(tx1) currentState, _ = pool.currentState() currentState.AddBalance(from, big.NewInt(1000)) + pool.resetState() + pool.enqueueTx(tx1.Hash(), tx1) pool.enqueueTx(tx2.Hash(), tx2) pool.enqueueTx(tx3.Hash(), tx3) - pool.promoteExecutables() + pool.promoteExecutables(currentState) if len(pool.pending) != 1 { t.Error("expected tx pool to be 1, got", len(pool.pending)) @@ -225,7 +302,8 @@ func TestTransactionDoubleNonce(t *testing.T) { if err := pool.add(tx2); err != nil { t.Error("didn't expect error", err) } - pool.promoteExecutables() + state, _ := pool.currentState() + pool.promoteExecutables(state) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } @@ -236,7 +314,7 @@ func TestTransactionDoubleNonce(t *testing.T) { if err := pool.add(tx3); err != nil { t.Error("didn't expect error", err) } - pool.promoteExecutables() + pool.promoteExecutables(state) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } @@ -295,6 +373,7 @@ func TestRemovedTxEvent(t *testing.T) { from, _ := deriveSender(tx) currentState, _ := pool.currentState() currentState.AddBalance(from, big.NewInt(1000000000000)) + pool.resetState() pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}}) pool.eventMux.Post(ChainHeadEvent{nil}) if pool.pending[from].Len() != 1 { @@ -452,6 +531,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) { state, _ := pool.currentState() state.AddBalance(account, big.NewInt(1000000)) + pool.resetState() // Keep queuing up transactions and make sure all above a limit are dropped for i := uint64(1); i <= maxQueuedPerAccount+5; i++ { @@ -564,6 +644,7 @@ func TestTransactionPendingLimiting(t *testing.T) { state, _ := pool.currentState() state.AddBalance(account, big.NewInt(1000000)) + pool.resetState() // Keep queuing up transactions and make sure all above a limit are dropped for i := uint64(0); i < maxQueuedPerAccount+5; i++ { @@ -733,7 +814,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) { // Benchmark the speed of pool validation b.ResetTimer() for i := 0; i < b.N; i++ { - pool.demoteUnexecutables() + pool.demoteUnexecutables(state) } } @@ -757,7 +838,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) { // Benchmark the speed of pool validation b.ResetTimer() for i := 0; i < b.N; i++ { - pool.promoteExecutables() + pool.promoteExecutables(state) } } diff --git a/eth/api_backend.go b/eth/api_backend.go index b95ef79c5..f33b6f7e1 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -131,15 +131,20 @@ func (b *EthApiBackend) RemoveTx(txHash common.Hash) { b.eth.txPool.Remove(txHash) } -func (b *EthApiBackend) GetPoolTransactions() types.Transactions { +func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) { b.eth.txMu.Lock() defer b.eth.txMu.Unlock() + pending, err := b.eth.txPool.Pending() + if err != nil { + return nil, err + } + var txs types.Transactions - for _, batch := range b.eth.txPool.Pending() { + for _, batch := range pending { txs = append(txs, batch...) } - return txs + return txs, nil } func (b *EthApiBackend) GetPoolTransaction(hash common.Hash) *types.Transaction { diff --git a/eth/helper_test.go b/eth/helper_test.go index f23976785..bd6b2d0da 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -93,7 +93,7 @@ type testTxPool struct { // AddBatch appends a batch of transactions to the pool, and notifies any // listeners if the addition channel is non nil -func (p *testTxPool) AddBatch(txs []*types.Transaction) { +func (p *testTxPool) AddBatch(txs []*types.Transaction) error { p.lock.Lock() defer p.lock.Unlock() @@ -101,10 +101,12 @@ func (p *testTxPool) AddBatch(txs []*types.Transaction) { if p.added != nil { p.added <- txs } + + return nil } // Pending returns all the transactions known to the pool -func (p *testTxPool) Pending() map[common.Address]types.Transactions { +func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) { p.lock.RLock() defer p.lock.RUnlock() @@ -116,7 +118,7 @@ func (p *testTxPool) Pending() map[common.Address]types.Transactions { for _, batch := range batches { sort.Sort(types.TxByNonce(batch)) } - return batches + return batches, nil } // newTestTransaction create a new dummy transaction. diff --git a/eth/protocol.go b/eth/protocol.go index 3f65c204b..7d22b33de 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -98,11 +98,11 @@ var errorToString = map[int]string{ type txPool interface { // AddBatch should add the given transactions to the pool. - AddBatch([]*types.Transaction) + AddBatch([]*types.Transaction) error // Pending should return pending transactions. // The slice should be modifiable by the caller. - Pending() map[common.Address]types.Transactions + Pending() (map[common.Address]types.Transactions, error) } // statusData is the network packet for the status message. diff --git a/eth/sync.go b/eth/sync.go index 6584bb1e2..373cc2054 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -46,7 +46,8 @@ type txsync struct { // syncTransactions starts sending all currently pending transactions to the given peer. func (pm *ProtocolManager) syncTransactions(p *peer) { var txs types.Transactions - for _, batch := range pm.txpool.Pending() { + pending, _ := pm.txpool.Pending() + for _, batch := range pending { txs = append(txs, batch...) } if len(txs) == 0 { diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index a25eff5ed..fd86b6465 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1273,8 +1273,12 @@ func (s *PublicTransactionPoolAPI) SignTransaction(ctx context.Context, args Sig // PendingTransactions returns the transactions that are in the transaction pool and have a from address that is one of // the accounts this node manages. -func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction { - pending := s.b.GetPoolTransactions() +func (s *PublicTransactionPoolAPI) PendingTransactions() ([]*RPCTransaction, error) { + pending, err := s.b.GetPoolTransactions() + if err != nil { + return nil, err + } + transactions := make([]*RPCTransaction, 0, len(pending)) for _, tx := range pending { var signer types.Signer = types.HomesteadSigner{} @@ -1286,13 +1290,17 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction { transactions = append(transactions, newRPCPendingTransaction(tx)) } } - return transactions + return transactions, nil } // Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the // pool and reinsert it with the new gas price and limit. func (s *PublicTransactionPoolAPI) Resend(ctx context.Context, tx Tx, gasPrice, gasLimit *rpc.HexNumber) (common.Hash, error) { - pending := s.b.GetPoolTransactions() + pending, err := s.b.GetPoolTransactions() + if err != nil { + return common.Hash{}, err + } + for _, p := range pending { var signer types.Signer = types.HomesteadSigner{} if p.Protected() { diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 77df7eb8d..36d7e754b 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -55,7 +55,7 @@ type Backend interface { // TxPool API SendTx(ctx context.Context, signedTx *types.Transaction) error RemoveTx(txHash common.Hash) - GetPoolTransactions() types.Transactions + GetPoolTransactions() (types.Transactions, error) GetPoolTransaction(txHash common.Hash) *types.Transaction GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) Stats() (pending int, queued int) diff --git a/les/api_backend.go b/les/api_backend.go index 8df963f6e..7dc548ec3 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -110,7 +110,7 @@ func (b *LesApiBackend) RemoveTx(txHash common.Hash) { b.eth.txPool.RemoveTx(txHash) } -func (b *LesApiBackend) GetPoolTransactions() types.Transactions { +func (b *LesApiBackend) GetPoolTransactions() (types.Transactions, error) { return b.eth.txPool.GetTransactions() } diff --git a/les/handler.go b/les/handler.go index 83d73666f..fdf4e6e8a 100644 --- a/les/handler.go +++ b/les/handler.go @@ -88,7 +88,7 @@ type BlockChain interface { type txPool interface { // AddTransactions should add the given transactions to the pool. - AddBatch([]*types.Transaction) + AddBatch([]*types.Transaction) error } type ProtocolManager struct { @@ -879,7 +879,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if reqCnt > maxReqs || reqCnt > MaxTxSend { return errResp(ErrRequestRejected, "") } - pm.txpool.AddBatch(txs) + + if err := pm.txpool.AddBatch(txs); err != nil { + return errResp(ErrUnexpectedResponse, "msg: %v", err) + } + _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) diff --git a/light/txpool.go b/light/txpool.go index 309bc3a32..4a06d317d 100644 --- a/light/txpool.go +++ b/light/txpool.go @@ -500,7 +500,7 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction { // GetTransactions returns all currently processable transactions. // The returned slice may be modified by the caller. -func (self *TxPool) GetTransactions() (txs types.Transactions) { +func (self *TxPool) GetTransactions() (txs types.Transactions, err error) { self.mu.RLock() defer self.mu.RUnlock() @@ -510,7 +510,7 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) { txs[i] = tx i++ } - return txs + return txs, nil } // Content retrieves the data content of the transaction pool, returning all the diff --git a/miner/worker.go b/miner/worker.go index edbd502c1..5fa7c4115 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -519,7 +519,14 @@ func (self *worker) commitNewWork() { if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 { core.ApplyDAOHardFork(work.state) } - txs := types.NewTransactionsByPriceAndNonce(self.eth.TxPool().Pending()) + + pending, err := self.eth.TxPool().Pending() + if err != nil { + glog.Errorf("Could not fetch pending transactions: %v", err) + return + } + + txs := types.NewTransactionsByPriceAndNonce(pending) work.commitTransactions(self.mux, txs, self.gasPrice, self.chain) self.eth.TxPool().RemoveBatch(work.lowGasTxs)