From 6d817e16c1c17f7cad4a34fa91457e21f63f2de4 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 15 Jun 2015 11:33:08 +0200 Subject: [PATCH 1/4] core, miner: tx pool drops txs below ask price --- core/transaction_pool.go | 19 ++++++++++++++++--- eth/backend.go | 1 + miner/miner.go | 2 +- miner/worker.go | 7 +++++-- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 4a0594228..8f917e96a 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -19,6 +19,7 @@ var ( // Transaction Pool Errors ErrInvalidSender = errors.New("Invalid sender") ErrNonce = errors.New("Nonce too low") + ErrCheap = errors.New("Gas price too low for acceptance") ErrBalance = errors.New("Insufficient balance") ErrNonExistentAccount = errors.New("Account does not exist or account balance too low") ErrInsufficientFunds = errors.New("Insufficient funds for gas * price + value") @@ -41,6 +42,7 @@ type TxPool struct { currentState stateFn // The state function which will allow us to do some pre checkes pendingState *state.ManagedState gasLimit func() *big.Int // The current gas limit function callback + minGasPrice *big.Int eventMux *event.TypeMux events event.Subscription @@ -57,8 +59,9 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func( eventMux: eventMux, currentState: currentStateFn, gasLimit: gasLimitFn, + minGasPrice: new(big.Int), pendingState: state.ManageState(currentStateFn()), - events: eventMux.Subscribe(ChainEvent{}), + events: eventMux.Subscribe(ChainEvent{}, GasPriceChanged{}), } go pool.eventLoop() @@ -69,10 +72,15 @@ func (pool *TxPool) eventLoop() { // Track chain events. When a chain events occurs (new chain canon block) // we need to know the new state. The new state will help us determine // the nonces in the managed state - for _ = range pool.events.Chan() { + for ev := range pool.events.Chan() { pool.mu.Lock() - pool.resetState() + switch ev := ev.(type) { + case ChainEvent: + pool.resetState() + case GasPriceChanged: + pool.minGasPrice = ev.Price + } pool.mu.Unlock() } @@ -124,6 +132,11 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { err error ) + // Drop transactions under our own minimal accepted gas price + if pool.minGasPrice.Cmp(tx.GasPrice()) > 0 { + return ErrCheap + } + // Validate the transaction sender and it's sig. Throw // if the from fields is invalid. if from, err = tx.From(); err != nil { diff --git a/eth/backend.go b/eth/backend.go index d2ec0cc62..6b7eb736f 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -292,6 +292,7 @@ func New(config *Config) (*Ethereum, error) { } eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock) eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) + eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) eth.miner = miner.New(eth, eth.EventMux(), eth.pow) diff --git a/miner/miner.go b/miner/miner.go index 20ca81648..7f73f3ee8 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -77,7 +77,7 @@ func (m *Miner) SetGasPrice(price *big.Int) { return } - m.worker.gasPrice = price + m.worker.setGasPrice(price) } func (self *Miner) Start(coinbase common.Address, threads int) { diff --git a/miner/worker.go b/miner/worker.go index bd4bc0e3c..d339507ca 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -6,6 +6,7 @@ import ( "sort" "sync" "sync/atomic" + "time" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" @@ -374,6 +375,8 @@ func (self *worker) commitNewWork() { self.currentMu.Lock() defer self.currentMu.Unlock() + tstart := time.Now() + previous := self.current self.makeCurrent() current := self.current @@ -409,7 +412,7 @@ func (self *worker) commitNewWork() { // We only care about logging if we're actually mining if atomic.LoadInt32(&self.mining) == 1 { - glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", current.block.Number(), current.tcount, len(uncles)) + glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles. Took %v\n", current.block.Number(), current.tcount, len(uncles), time.Since(tstart)) self.logLocalMinedBlocks(previous) } @@ -437,7 +440,6 @@ func (self *worker) commitUncle(uncle *types.Header) error { // Error not unique return core.UncleError("Uncle not unique") } - self.current.uncles.Add(uncle.Hash()) if !self.current.ancestors.Has(uncle.ParentHash) { return core.UncleError(fmt.Sprintf("Uncle's parent unknown (%x)", uncle.ParentHash[0:4])) @@ -446,6 +448,7 @@ func (self *worker) commitUncle(uncle *types.Header) error { if self.current.family.Has(uncle.Hash()) { return core.UncleError(fmt.Sprintf("Uncle already in family (%x)", uncle.Hash())) } + self.current.uncles.Add(uncle.Hash()) return nil } From 21fa29111b3cd12e3748fcb6310e6a18c5562f17 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 15 Jun 2015 12:16:29 +0200 Subject: [PATCH 2/4] core: reduce max allowed queued txs per address Transactions in the queue are now capped to a maximum of 200 transactions. This number is completely arbitrary. --- common/types.go | 11 +++++++++++ core/transaction_pool.go | 19 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/common/types.go b/common/types.go index 183d48fb3..d05c21eec 100644 --- a/common/types.go +++ b/common/types.go @@ -1,6 +1,7 @@ package common import ( + "fmt" "math/big" "math/rand" "reflect" @@ -95,3 +96,13 @@ func (a *Address) Set(other Address) { a[i] = v } } + +// PP Pretty Prints a byte slice in the following format: +// hex(value[:4])...(hex[len(value)-4:]) +func PP(value []byte) string { + if len(value) <= 8 { + return Bytes2Hex(value) + } + + return fmt.Sprintf("%x...%x", value[:4], value[len(value)-4]) +} diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 8f917e96a..ce6fed1a9 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -28,6 +28,10 @@ var ( ErrNegativeValue = errors.New("Negative value") ) +const ( + maxQueued = 200 // max limit of queued txs per address +) + type stateFn func() *state.StateDB // TxPool contains all currently known transactions. Transactions @@ -224,6 +228,21 @@ func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { self.queue[from] = make(map[common.Hash]*types.Transaction) } self.queue[from][hash] = tx + + if len(self.queue[from]) > maxQueued { + var ( + worstHash common.Hash + worstNonce uint64 + ) + for hash, tx := range self.queue[from] { + if tx.Nonce() > worstNonce { + worstNonce = tx.Nonce() + worstHash = hash + } + } + glog.V(logger.Debug).Infof("Queued tx limit exceeded for %x. Removed worst nonce tx: %x\n", common.PP(from[:]), common.PP(worstHash[:])) + delete(self.queue[from], worstHash) + } } // addTx will add a transaction to the pending (processable queue) list of transactions From e79cc42dfe36f6db61cebb37607f5bfe89e4cdcc Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 15 Jun 2015 16:46:45 +0200 Subject: [PATCH 3/4] core: moved check for max queue to checkQueue Moved the queue to check to the checkQueue method so no undeeded loops need to be initiated or sorting needs to happen twice. --- core/chain_manager.go | 12 ++---------- core/transaction_pool.go | 26 ++++++++++---------------- 2 files changed, 12 insertions(+), 26 deletions(-) diff --git a/core/chain_manager.go b/core/chain_manager.go index e56d82cce..c3b7273c2 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "math/big" - "os" "runtime" "sync" "sync/atomic" @@ -235,15 +234,8 @@ func (bc *ChainManager) setLastState() { if block != nil { bc.currentBlock = block bc.lastBlockHash = block.Hash() - } else { // TODO CLEAN THIS UP TMP CODE - block = bc.GetBlockByNumber(400000) - if block == nil { - fmt.Println("Fatal. LastBlock not found. Report this issue") - os.Exit(1) - } - bc.currentBlock = block - bc.lastBlockHash = block.Hash() - bc.insert(block) + } else { + glog.Fatalf("Fatal. LastBlock not found. Please run removedb and resync") } } else { bc.Reset() diff --git a/core/transaction_pool.go b/core/transaction_pool.go index ce6fed1a9..e31f5c6b3 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -228,21 +228,6 @@ func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { self.queue[from] = make(map[common.Hash]*types.Transaction) } self.queue[from][hash] = tx - - if len(self.queue[from]) > maxQueued { - var ( - worstHash common.Hash - worstNonce uint64 - ) - for hash, tx := range self.queue[from] { - if tx.Nonce() > worstNonce { - worstNonce = tx.Nonce() - worstHash = hash - } - } - glog.V(logger.Debug).Infof("Queued tx limit exceeded for %x. Removed worst nonce tx: %x\n", common.PP(from[:]), common.PP(worstHash[:])) - delete(self.queue[from], worstHash) - } } // addTx will add a transaction to the pending (processable queue) list of transactions @@ -367,7 +352,16 @@ func (pool *TxPool) checkQueue() { // Find the next consecutive nonce range starting at the // current account nonce. sort.Sort(addq) - for _, e := range addq { + for i, e := range addq { + // start deleting the transactions from the queue if they exceed the limit + if i > maxQueued { + if glog.V(logger.Debug) { + glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(e.hash[:])) + } + delete(pool.queue[address], e.hash) + continue + } + if e.AccountNonce > guessedNonce { break } From 2628103f1df35ad6a130f2f41e73c7703bf61886 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 15 Jun 2015 17:21:08 +0200 Subject: [PATCH 4/4] rpc/api: fixed default gas-(price) issue. --- rpc/api/eth.go | 9 ++++++++- rpc/api/eth_args.go | 8 ++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/rpc/api/eth.go b/rpc/api/eth.go index a0b9dad86..d329dbf10 100644 --- a/rpc/api/eth.go +++ b/rpc/api/eth.go @@ -259,7 +259,14 @@ func (self *ethApi) SendTransaction(req *shared.Request) (interface{}, error) { nonce = args.Nonce.String() } - v, err := self.xeth.Transact(args.From, args.To, nonce, args.Value.String(), args.Gas.String(), args.GasPrice.String(), args.Data) + var gas, price string + if args.Gas != nil { + gas = args.Gas.String() + } + if args.GasPrice != nil { + price = args.GasPrice.String() + } + v, err := self.xeth.Transact(args.From, args.To, nonce, args.Value.String(), gas, price, args.Data) if err != nil { return nil, err } diff --git a/rpc/api/eth_args.go b/rpc/api/eth_args.go index ad9a35fa2..1c86bee51 100644 --- a/rpc/api/eth_args.go +++ b/rpc/api/eth_args.go @@ -333,9 +333,7 @@ func (args *NewTxArgs) UnmarshalJSON(b []byte) (err error) { args.Value = num num = nil - if ext.Gas == nil { - num = big.NewInt(0) - } else { + if ext.Gas != nil { if num, err = numString(ext.Gas); err != nil { return err } @@ -343,9 +341,7 @@ func (args *NewTxArgs) UnmarshalJSON(b []byte) (err error) { args.Gas = num num = nil - if ext.GasPrice == nil { - num = big.NewInt(0) - } else { + if ext.GasPrice != nil { if num, err = numString(ext.GasPrice); err != nil { return err }