diff --git a/cmd/geth/admin.go b/cmd/geth/admin.go index 01de97ac2..13d10de32 100644 --- a/cmd/geth/admin.go +++ b/cmd/geth/admin.go @@ -78,6 +78,12 @@ func (js *jsre) adminBindings() { miner.Set("stopAutoDAG", js.stopAutoDAG) miner.Set("makeDAG", js.makeDAG) + admin.Set("txPool", struct{}{}) + t, _ = admin.Get("txPool") + txPool := t.Object() + txPool.Set("pending", js.allPendingTransactions) + txPool.Set("queued", js.allQueuedTransactions) + admin.Set("debug", struct{}{}) t, _ = admin.Get("debug") debug := t.Object() @@ -89,6 +95,7 @@ func (js *jsre) adminBindings() { debug.Set("setHead", js.setHead) debug.Set("processBlock", js.debugBlock) debug.Set("seedhash", js.seedHash) + debug.Set("insertBlock", js.insertBlockRlp) // undocumented temporary debug.Set("waitForBlocks", js.waitForBlocks) } @@ -140,6 +147,32 @@ func (js *jsre) seedHash(call otto.FunctionCall) otto.Value { return otto.UndefinedValue() } +func (js *jsre) allPendingTransactions(call otto.FunctionCall) otto.Value { + txs := js.ethereum.TxPool().GetTransactions() + + ltxs := make([]*tx, len(txs)) + for i, tx := range txs { + // no need to check err + ltxs[i] = newTx(tx) + } + + v, _ := call.Otto.ToValue(ltxs) + return v +} + +func (js *jsre) allQueuedTransactions(call otto.FunctionCall) otto.Value { + txs := js.ethereum.TxPool().GetQueuedTransactions() + + ltxs := make([]*tx, len(txs)) + for i, tx := range txs { + // no need to check err + ltxs[i] = newTx(tx) + } + + v, _ := call.Otto.ToValue(ltxs) + return v +} + func (js *jsre) pendingTransactions(call otto.FunctionCall) otto.Value { txs := js.ethereum.TxPool().GetTransactions() @@ -237,16 +270,47 @@ func (js *jsre) debugBlock(call otto.FunctionCall) otto.Value { return otto.UndefinedValue() } + tstart := time.Now() + old := vm.Debug vm.Debug = true _, err = js.ethereum.BlockProcessor().RetryProcess(block) if err != nil { fmt.Println(err) + r, _ := call.Otto.ToValue(map[string]interface{}{"success": false, "time": time.Since(tstart).Seconds()}) + return r } vm.Debug = old - fmt.Println("ok") - return otto.UndefinedValue() + r, _ := call.Otto.ToValue(map[string]interface{}{"success": true, "time": time.Since(tstart).Seconds()}) + return r +} + +func (js *jsre) insertBlockRlp(call otto.FunctionCall) otto.Value { + tstart := time.Now() + + var block types.Block + if call.Argument(0).IsString() { + blockRlp, _ := call.Argument(0).ToString() + err := rlp.DecodeBytes(common.Hex2Bytes(blockRlp), &block) + if err != nil { + fmt.Println(err) + return otto.UndefinedValue() + } + } + + old := vm.Debug + vm.Debug = true + _, err := js.ethereum.BlockProcessor().RetryProcess(&block) + if err != nil { + fmt.Println(err) + r, _ := call.Otto.ToValue(map[string]interface{}{"success": false, "time": time.Since(tstart).Seconds()}) + return r + } + vm.Debug = old + + r, _ := call.Otto.ToValue(map[string]interface{}{"success": true, "time": time.Since(tstart).Seconds()}) + return r } func (js *jsre) setHead(call otto.FunctionCall) otto.Value { diff --git a/cmd/geth/js_test.go b/cmd/geth/js_test.go index dee25e44e..3f34840f3 100644 --- a/cmd/geth/js_test.go +++ b/cmd/geth/js_test.go @@ -68,7 +68,7 @@ func testJEthRE(t *testing.T) (string, *testjethre, *eth.Ethereum) { } // set up mock genesis with balance on the testAddress - core.GenesisData = []byte(testGenesis) + core.GenesisAccounts = []byte(testGenesis) ks := crypto.NewKeyStorePlain(filepath.Join(tmp, "keystore")) am := accounts.NewManager(ks) @@ -250,7 +250,7 @@ func TestSignature(t *testing.T) { } func TestContract(t *testing.T) { - + t.Skip() tmp, repl, ethereum := testJEthRE(t) if err := ethereum.Start(); err != nil { t.Errorf("error starting ethereum: %v", err) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index d319055b1..909d7815e 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -345,8 +345,7 @@ func MakeChain(ctx *cli.Context) (chain *core.ChainManager, blockDB, stateDB, ex eventMux := new(event.TypeMux) pow := ethash.New() chain = core.NewChainManager(blockDB, stateDB, pow, eventMux) - txpool := core.NewTxPool(eventMux, chain.State, chain.GasLimit) - proc := core.NewBlockProcessor(stateDB, extraDB, pow, txpool, chain, eventMux) + proc := core.NewBlockProcessor(stateDB, extraDB, pow, chain, eventMux) chain.SetProcessor(proc) return chain, blockDB, stateDB, extraDB } diff --git a/common/big.go b/common/big.go index 3257b179d..05d56daba 100644 --- a/common/big.go +++ b/common/big.go @@ -36,16 +36,16 @@ func Big(num string) *big.Int { return n } -// BigD +// Bytes2Big // -// Shortcut for new(big.Int).SetBytes(...) -func Bytes2Big(data []byte) *big.Int { +func BytesToBig(data []byte) *big.Int { n := new(big.Int) n.SetBytes(data) return n } -func BigD(data []byte) *big.Int { return Bytes2Big(data) } +func Bytes2Big(data []byte) *big.Int { return BytesToBig(data) } +func BigD(data []byte) *big.Int { return BytesToBig(data) } func String2Big(num string) *big.Int { n := new(big.Int) diff --git a/common/natspec/natspec_e2e_test.go b/common/natspec/natspec_e2e_test.go index a8d318b57..7e9172649 100644 --- a/common/natspec/natspec_e2e_test.go +++ b/common/natspec/natspec_e2e_test.go @@ -119,7 +119,7 @@ func testEth(t *testing.T) (ethereum *eth.Ethereum, err error) { testAddress := strings.TrimPrefix(testAccount.Address.Hex(), "0x") // set up mock genesis with balance on the testAddress - core.GenesisData = []byte(`{ + core.GenesisAccounts = []byte(`{ "` + testAddress + `": {"balance": "` + testBalance + `"} }`) @@ -181,7 +181,7 @@ func (self *testFrontend) applyTxs() { // end to end test func TestNatspecE2E(t *testing.T) { - // t.Skip() + t.Skip() tf := testInit(t) defer tf.ethereum.Stop() diff --git a/core/block_processor.go b/core/block_processor.go index a3ad383d0..190e72694 100644 --- a/core/block_processor.go +++ b/core/block_processor.go @@ -38,14 +38,12 @@ type BlockProcessor struct { // Proof of work used for validating Pow pow.PoW - txpool *TxPool - events event.Subscription eventMux *event.TypeMux } -func NewBlockProcessor(db, extra common.Database, pow pow.PoW, txpool *TxPool, chainManager *ChainManager, eventMux *event.TypeMux) *BlockProcessor { +func NewBlockProcessor(db, extra common.Database, pow pow.PoW, chainManager *ChainManager, eventMux *event.TypeMux) *BlockProcessor { sm := &BlockProcessor{ db: db, extraDb: extra, @@ -53,7 +51,6 @@ func NewBlockProcessor(db, extra common.Database, pow pow.PoW, txpool *TxPool, c Pow: pow, bc: chainManager, eventMux: eventMux, - txpool: txpool, } return sm @@ -178,7 +175,6 @@ func (sm *BlockProcessor) Process(block *types.Block) (logs state.Logs, err erro return nil, ParentError(header.ParentHash) } parent := sm.bc.GetBlock(header.ParentHash) - return sm.processWithParent(block, parent) } @@ -254,14 +250,9 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st return nil, err } - // Calculate the td for this block - //td = CalculateTD(block, parent) // Sync the current block's state to the database state.Sync() - // Remove transactions from the pool - sm.txpool.RemoveTransactions(block.Transactions()) - // This puts transactions in a extra db for rpc for i, tx := range block.Transactions() { putTx(sm.extraDb, tx, block, uint64(i)) diff --git a/core/block_processor_test.go b/core/block_processor_test.go index 72b173a71..b52c3d3f8 100644 --- a/core/block_processor_test.go +++ b/core/block_processor_test.go @@ -17,7 +17,7 @@ func proc() (*BlockProcessor, *ChainManager) { var mux event.TypeMux chainMan := NewChainManager(db, db, thePow(), &mux) - return NewBlockProcessor(db, db, ezp.New(), nil, chainMan, &mux), chainMan + return NewBlockProcessor(db, db, ezp.New(), chainMan, &mux), chainMan } func TestNumber(t *testing.T) { diff --git a/core/chain_makers.go b/core/chain_makers.go index 44f17cc33..3039e52da 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -124,8 +124,7 @@ func newChainManager(block *types.Block, eventMux *event.TypeMux, db common.Data // block processor with fake pow func newBlockProcessor(db common.Database, cman *ChainManager, eventMux *event.TypeMux) *BlockProcessor { chainMan := newChainManager(nil, eventMux, db) - txpool := NewTxPool(eventMux, chainMan.State, chainMan.GasLimit) - bman := NewBlockProcessor(db, db, FakePow{}, txpool, chainMan, eventMux) + bman := NewBlockProcessor(db, db, FakePow{}, chainMan, eventMux) return bman } diff --git a/core/chain_manager.go b/core/chain_manager.go index 927055103..d14a19fea 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -214,19 +214,6 @@ func (self *ChainManager) TransState() *state.StateDB { return self.transState } -func (self *ChainManager) TxState() *state.ManagedState { - self.tsmu.RLock() - defer self.tsmu.RUnlock() - - return self.txState -} - -func (self *ChainManager) setTxState(statedb *state.StateDB) { - self.tsmu.Lock() - defer self.tsmu.Unlock() - self.txState = state.ManageState(statedb) -} - func (self *ChainManager) setTransState(statedb *state.StateDB) { self.transState = statedb } @@ -560,6 +547,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { defer close(nonceQuit) for i, block := range chain { + bstart := time.Now() // Wait for block i's nonce to be verified before processing // its state transition. for nonceChecked[i] { @@ -642,11 +630,11 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { queueEvent.canonicalCount++ if glog.V(logger.Debug) { - glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...)\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4]) + glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) } } else { if glog.V(logger.Detail) { - glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...)\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4]) + glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) } queue[i] = ChainSideEvent{block, logs} @@ -750,7 +738,7 @@ out: case ev := <-events.Chan(): switch ev := ev.(type) { case queueEvent: - for i, event := range ev.queue { + for _, event := range ev.queue { switch event := event.(type) { case ChainEvent: // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long @@ -759,12 +747,6 @@ out: self.currentGasLimit = CalcGasLimit(event.Block) self.eventMux.Post(ChainHeadEvent{event.Block}) } - case ChainSplitEvent: - // On chain splits we need to reset the transaction state. We can't be sure whether the actual - // state of the accounts are still valid. - if i == ev.splitCount { - self.setTxState(state.New(event.Block.Root(), self.stateDb)) - } } self.eventMux.Post(event) diff --git a/core/chain_manager_test.go b/core/chain_manager_test.go index 7dc7358c0..560e85f77 100644 --- a/core/chain_manager_test.go +++ b/core/chain_manager_test.go @@ -267,8 +267,7 @@ func TestChainInsertions(t *testing.T) { var eventMux event.TypeMux chainMan := NewChainManager(db, db, thePow(), &eventMux) - txPool := NewTxPool(&eventMux, chainMan.State, func() *big.Int { return big.NewInt(100000000) }) - blockMan := NewBlockProcessor(db, db, nil, txPool, chainMan, &eventMux) + blockMan := NewBlockProcessor(db, db, nil, chainMan, &eventMux) chainMan.SetProcessor(blockMan) const max = 2 @@ -313,8 +312,7 @@ func TestChainMultipleInsertions(t *testing.T) { } var eventMux event.TypeMux chainMan := NewChainManager(db, db, thePow(), &eventMux) - txPool := NewTxPool(&eventMux, chainMan.State, func() *big.Int { return big.NewInt(100000000) }) - blockMan := NewBlockProcessor(db, db, nil, txPool, chainMan, &eventMux) + blockMan := NewBlockProcessor(db, db, nil, chainMan, &eventMux) chainMan.SetProcessor(blockMan) done := make(chan bool, max) for i, chain := range chains { diff --git a/core/genesis.go b/core/genesis.go index e72834822..a9b7339f3 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -36,7 +36,7 @@ func GenesisBlock(db common.Database) *types.Block { Balance string Code string } - err := json.Unmarshal(GenesisData, &accounts) + err := json.Unmarshal(GenesisAccounts, &accounts) if err != nil { fmt.Println("enable to decode genesis json data:", err) os.Exit(1) @@ -57,7 +57,7 @@ func GenesisBlock(db common.Database) *types.Block { return genesis } -var GenesisData = []byte(`{ +var GenesisAccounts = []byte(`{ "0000000000000000000000000000000000000001": {"balance": "1"}, "0000000000000000000000000000000000000002": {"balance": "1"}, "0000000000000000000000000000000000000003": {"balance": "1"}, diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 4296c79f6..27dc1b0d1 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -6,7 +6,6 @@ import ( "math/big" "sort" "sync" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -14,10 +13,10 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" - "gopkg.in/fatih/set.v0" ) var ( + // Transaction Pool Errors ErrInvalidSender = errors.New("Invalid sender") ErrNonce = errors.New("Nonce too low") ErrBalance = errors.New("Insufficient balance") @@ -28,112 +27,141 @@ var ( ErrNegativeValue = errors.New("Negative value") ) -const txPoolQueueSize = 50 - -type TxPoolHook chan *types.Transaction -type TxMsg struct{ Tx *types.Transaction } - type stateFn func() *state.StateDB -const ( - minGasPrice = 1000000 -) - -type TxProcessor interface { - ProcessTransaction(tx *types.Transaction) -} - -// The tx pool a thread safe transaction pool handler. In order to -// guarantee a non blocking pool we use a queue channel which can be -// independently read without needing access to the actual pool. +// TxPool contains all currently known transactions. Transactions +// enter the pool when they are received from the network or submitted +// locally. They exit the pool when they are included in the blockchain. +// +// The pool separates processable transactions (which can be applied to the +// current state) and future transactions. Transactions move between those +// two states over time as they are received and processed. type TxPool struct { - mu sync.RWMutex - // Queueing channel for reading and writing incoming - // transactions to - queueChan chan *types.Transaction - // Quiting channel - quit chan bool - // The state function which will allow us to do some pre checkes - currentState stateFn - // The current gas limit function callback - gasLimit func() *big.Int - // The actual pool - txs map[common.Hash]*types.Transaction - invalidHashes *set.Set + quit chan bool // Quiting channel + currentState stateFn // The state function which will allow us to do some pre checkes + state *state.ManagedState + gasLimit func() *big.Int // The current gas limit function callback + eventMux *event.TypeMux + events event.Subscription - queue map[common.Address]types.Transactions - - subscribers []chan TxMsg - - eventMux *event.TypeMux + mu sync.RWMutex + pending map[common.Hash]*types.Transaction // processable transactions + queue map[common.Address]map[common.Hash]*types.Transaction } func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { - txPool := &TxPool{ - txs: make(map[common.Hash]*types.Transaction), - queue: make(map[common.Address]types.Transactions), - queueChan: make(chan *types.Transaction, txPoolQueueSize), - quit: make(chan bool), - eventMux: eventMux, - invalidHashes: set.New(), - currentState: currentStateFn, - gasLimit: gasLimitFn, + return &TxPool{ + pending: make(map[common.Hash]*types.Transaction), + queue: make(map[common.Address]map[common.Hash]*types.Transaction), + quit: make(chan bool), + eventMux: eventMux, + currentState: currentStateFn, + gasLimit: gasLimitFn, + state: state.ManageState(currentStateFn()), } - return txPool } func (pool *TxPool) Start() { - // Queue timer will tick so we can attempt to move items from the queue to the - // main transaction pool. - queueTimer := time.NewTicker(300 * time.Millisecond) - // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce) - removalTimer := time.NewTicker(1 * time.Second) -done: - for { - select { - case <-queueTimer.C: - pool.checkQueue() - case <-removalTimer.C: - pool.validatePool() - case <-pool.quit: - break done - } + // Track chain events. When a chain events occurs (new chain canon block) + // we need to know the new state. The new state will help us determine + // the nonces in the managed state + pool.events = pool.eventMux.Subscribe(ChainEvent{}) + for _ = range pool.events.Chan() { + pool.mu.Lock() + + pool.resetState() + + pool.mu.Unlock() } } -func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { +func (pool *TxPool) resetState() { + pool.state = state.ManageState(pool.currentState()) + + // validate the pool of pending transactions, this will remove + // any transactions that have been included in the block or + // have been invalidated because of another transaction (e.g. + // higher gas price) + pool.validatePool() + + // Loop over the pending transactions and base the nonce of the new + // pending transaction set. + for _, tx := range pool.pending { + if addr, err := tx.From(); err == nil { + // Set the nonce. Transaction nonce can never be lower + // than the state nonce; validatePool took care of that. + pool.state.SetNonce(addr, tx.Nonce()) + } + } + + // Check the queue and move transactions over to the pending if possible + // or remove those that have become invalid + pool.checkQueue() +} + +func (pool *TxPool) Stop() { + pool.pending = make(map[common.Hash]*types.Transaction) + close(pool.quit) + pool.events.Unsubscribe() + glog.V(logger.Info).Infoln("TX Pool stopped") +} + +func (pool *TxPool) State() *state.ManagedState { + pool.mu.RLock() + defer pool.mu.RUnlock() + + return pool.state +} + +// validateTx checks whether a transaction is valid according +// to the consensus rules. +func (pool *TxPool) validateTx(tx *types.Transaction) error { // Validate sender var ( from common.Address err error ) + // Validate the transaction sender and it's sig. Throw + // if the from fields is invalid. if from, err = tx.From(); err != nil { return ErrInvalidSender } + // Make sure the account exist. Non existant accounts + // haven't got funds and well therefor never pass. if !pool.currentState().HasAccount(from) { return ErrNonExistentAccount } + // Check the transaction doesn't exceed the current + // block limit gas. if pool.gasLimit().Cmp(tx.GasLimit) < 0 { return ErrGasLimit } + // Transactions can't be negative. This may never happen + // using RLP decoded transactions but may occur if you create + // a transaction using the RPC for example. if tx.Amount.Cmp(common.Big0) < 0 { return ErrNegativeValue } + // Transactor should have enough funds to cover the costs + // cost == V + GP * GL total := new(big.Int).Mul(tx.Price, tx.GasLimit) total.Add(total, tx.Value()) if pool.currentState().GetBalance(from).Cmp(total) < 0 { return ErrInsufficientFunds } + // Should supply enough intrinsic gas if tx.GasLimit.Cmp(IntrinsicGas(tx)) < 0 { return ErrIntrinsicGas } + // Last but not least check for nonce errors (intensive + // operation, saved for last) if pool.currentState().GetNonce(from) > tx.Nonce() { return ErrNonce } @@ -150,38 +178,36 @@ func (self *TxPool) add(tx *types.Transaction) error { return fmt.Errorf("Invalid transaction (%x)", hash[:4]) } */ - if self.txs[hash] != nil { + if self.pending[hash] != nil { return fmt.Errorf("Known transaction (%x)", hash[:4]) } - err := self.ValidateTransaction(tx) + err := self.validateTx(tx) if err != nil { return err } - - self.queueTx(tx) - - var toname string - if to := tx.To(); to != nil { - toname = common.Bytes2Hex(to[:4]) - } else { - toname = "[NEW_CONTRACT]" - } - // we can ignore the error here because From is - // verified in ValidateTransaction. - f, _ := tx.From() - from := common.Bytes2Hex(f[:4]) + self.queueTx(hash, tx) if glog.V(logger.Debug) { - glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash()) + var toname string + if to := tx.To(); to != nil { + toname = common.Bytes2Hex(to[:4]) + } else { + toname = "[NEW_CONTRACT]" + } + // we can ignore the error here because From is + // verified in ValidateTransaction. + f, _ := tx.From() + from := common.Bytes2Hex(f[:4]) + glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash) } + // check and validate the queueue + self.checkQueue() + return nil } -func (self *TxPool) Size() int { - return len(self.txs) -} - +// Add queues a single transaction in the pool if it is valid. func (self *TxPool) Add(tx *types.Transaction) error { self.mu.Lock() defer self.mu.Unlock() @@ -189,6 +215,7 @@ func (self *TxPool) Add(tx *types.Transaction) error { return self.add(tx) } +// AddTransactions attempts to queue all valid transactions in txs. func (self *TxPool) AddTransactions(txs []*types.Transaction) { self.mu.Lock() defer self.mu.Unlock() @@ -203,81 +230,78 @@ func (self *TxPool) AddTransactions(txs []*types.Transaction) { } } -// GetTransaction allows you to check the pending and queued transaction in the -// transaction pool. -// It has two stategies, first check the pool (map) then check the queue +// GetTransaction returns a transaction if it is contained in the pool +// and nil otherwise. func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction { // check the txs first - if tx, ok := tp.txs[hash]; ok { + if tx, ok := tp.pending[hash]; ok { return tx } - // check queue for _, txs := range tp.queue { - for _, tx := range txs { - if tx.Hash() == hash { - return tx - } + if tx, ok := txs[hash]; ok { + return tx } } - return nil } +// GetTransactions returns all currently processable transactions. func (self *TxPool) GetTransactions() (txs types.Transactions) { - self.mu.RLock() - defer self.mu.RUnlock() + self.mu.Lock() + defer self.mu.Unlock() - txs = make(types.Transactions, self.Size()) + // check queue first + self.checkQueue() + // invalidate any txs + self.validatePool() + + txs = make(types.Transactions, len(self.pending)) i := 0 - for _, tx := range self.txs { + for _, tx := range self.pending { txs[i] = tx i++ } - - return + return txs } +// GetQueuedTransactions returns all non-processable transactions. func (self *TxPool) GetQueuedTransactions() types.Transactions { self.mu.RLock() defer self.mu.RUnlock() - var txs types.Transactions - for _, ts := range self.queue { - txs = append(txs, ts...) + var ret types.Transactions + for _, txs := range self.queue { + for _, tx := range txs { + ret = append(ret, tx) + } } - - return txs + sort.Sort(types.TxByNonce{ret}) + return ret } +// RemoveTransactions removes all given transactions from the pool. func (self *TxPool) RemoveTransactions(txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() - for _, tx := range txs { self.removeTx(tx.Hash()) } } -func (pool *TxPool) Flush() { - pool.txs = make(map[common.Hash]*types.Transaction) -} - -func (pool *TxPool) Stop() { - pool.Flush() - close(pool.quit) - - glog.V(logger.Info).Infoln("TX Pool stopped") -} - -func (self *TxPool) queueTx(tx *types.Transaction) { +func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { from, _ := tx.From() // already validated - self.queue[from] = append(self.queue[from], tx) + if self.queue[from] == nil { + self.queue[from] = make(map[common.Hash]*types.Transaction) + } + self.queue[from][hash] = tx } -func (pool *TxPool) addTx(tx *types.Transaction) { - if _, ok := pool.txs[tx.Hash()]; !ok { - pool.txs[tx.Hash()] = tx +func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) { + if _, ok := pool.pending[hash]; !ok { + pool.pending[hash] = tx + + pool.state.SetNonce(addr, tx.AccountNonce) // Notify the subscribers. This event is posted in a goroutine // because it's possible that somewhere during the post "Remove transaction" // gets called which will then wait for the global tx pool lock and deadlock. @@ -285,42 +309,36 @@ func (pool *TxPool) addTx(tx *types.Transaction) { } } -// check queue will attempt to insert +// checkQueue moves transactions that have become processable to main pool. func (pool *TxPool) checkQueue() { - pool.mu.Lock() - defer pool.mu.Unlock() + state := pool.state - statedb := pool.currentState() + var addq txQueue for address, txs := range pool.queue { - sort.Sort(types.TxByNonce{txs}) - - var ( - nonce = statedb.GetNonce(address) - start int - ) - // Clean up the transactions first and determine the start of the nonces - for _, tx := range txs { - if tx.Nonce() >= nonce { + curnonce := state.GetNonce(address) + addq := addq[:0] + for hash, tx := range txs { + if tx.AccountNonce < curnonce { + // Drop queued transactions whose nonce is lower than + // the account nonce because they have been processed. + delete(txs, hash) + } else { + // Collect the remaining transactions for the next pass. + addq = append(addq, txQueueEntry{hash, address, tx}) + } + } + // Find the next consecutive nonce range starting at the + // current account nonce. + sort.Sort(addq) + for _, e := range addq { + if e.AccountNonce > curnonce+1 { break } - start++ + delete(txs, e.hash) + pool.addTx(e.hash, address, e.Transaction) } - pool.queue[address] = txs[start:] - - // expected nonce - enonce := nonce - for _, tx := range pool.queue[address] { - // If the expected nonce does not match up with the next one - // (i.e. a nonce gap), we stop the loop - if enonce != tx.Nonce() { - break - } - enonce++ - - pool.addTx(tx) - } - // delete the entire queue entry if it's empty. There's no need to keep it - if len(pool.queue[address]) == 0 { + // Delete the entire queue entry if it became empty. + if len(txs) == 0 { delete(pool.queue, address) } } @@ -328,36 +346,41 @@ func (pool *TxPool) checkQueue() { func (pool *TxPool) removeTx(hash common.Hash) { // delete from pending pool - delete(pool.txs, hash) - + delete(pool.pending, hash) // delete from queue -out: for address, txs := range pool.queue { - for i, tx := range txs { - if tx.Hash() == hash { - if len(txs) == 1 { - // if only one tx, remove entire address entry - delete(pool.queue, address) - } else { - pool.queue[address][len(txs)-1], pool.queue[address] = nil, append(txs[:i], txs[i+1:]...) - } - break out + if _, ok := txs[hash]; ok { + if len(txs) == 1 { + // if only one tx, remove entire address entry. + delete(pool.queue, address) + } else { + delete(txs, hash) } + break } } } +// validatePool removes invalid and processed transactions from the main pool. func (pool *TxPool) validatePool() { - pool.mu.Lock() - defer pool.mu.Unlock() - - for hash, tx := range pool.txs { - if err := pool.ValidateTransaction(tx); err != nil { - if glog.V(logger.Info) { + for hash, tx := range pool.pending { + if err := pool.validateTx(tx); err != nil { + if glog.V(logger.Core) { glog.Infof("removed tx (%x) from pool: %v\n", hash[:4], err) } - - pool.removeTx(hash) + delete(pool.pending, hash) } } } + +type txQueue []txQueueEntry + +type txQueueEntry struct { + hash common.Hash + addr common.Address + *types.Transaction +} + +func (q txQueue) Len() int { return len(q) } +func (q txQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } +func (q txQueue) Less(i, j int) bool { return q[i].AccountNonce < q[j].AccountNonce } diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go index d6ea4a2a9..ac297d266 100644 --- a/core/transaction_pool_test.go +++ b/core/transaction_pool_test.go @@ -37,21 +37,21 @@ func TestInvalidTransactions(t *testing.T) { } from, _ := tx.From() - pool.currentState().AddBalance(from, big.NewInt(1)) + pool.state.AddBalance(from, big.NewInt(1)) err = pool.Add(tx) if err != ErrInsufficientFunds { t.Error("expected", ErrInsufficientFunds) } balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(tx.Gas(), tx.GasPrice())) - pool.currentState().AddBalance(from, balance) + pool.state.AddBalance(from, balance) err = pool.Add(tx) if err != ErrIntrinsicGas { t.Error("expected", ErrIntrinsicGas, "got", err) } - pool.currentState().SetNonce(from, 1) - pool.currentState().AddBalance(from, big.NewInt(0xffffffffffffff)) + pool.state.SetNonce(from, 1) + pool.state.AddBalance(from, big.NewInt(0xffffffffffffff)) tx.GasLimit = big.NewInt(100000) tx.Price = big.NewInt(1) tx.SignECDSA(key) @@ -67,26 +67,26 @@ func TestTransactionQueue(t *testing.T) { tx := transaction() tx.SignECDSA(key) from, _ := tx.From() - pool.currentState().AddBalance(from, big.NewInt(1)) - pool.queueTx(tx) + pool.state.AddBalance(from, big.NewInt(1)) + pool.queueTx(tx.Hash(), tx) pool.checkQueue() - if len(pool.txs) != 1 { - t.Error("expected valid txs to be 1 is", len(pool.txs)) + if len(pool.pending) != 1 { + t.Error("expected valid txs to be 1 is", len(pool.pending)) } tx = transaction() + tx.SetNonce(1) tx.SignECDSA(key) from, _ = tx.From() - pool.currentState().SetNonce(from, 10) - tx.SetNonce(1) - pool.queueTx(tx) + pool.state.SetNonce(from, 2) + pool.queueTx(tx.Hash(), tx) pool.checkQueue() - if _, ok := pool.txs[tx.Hash()]; ok { + if _, ok := pool.pending[tx.Hash()]; ok { t.Error("expected transaction to be in tx pool") } - if len(pool.queue[from]) != 0 { + if len(pool.queue[from]) > 0 { t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) } @@ -97,18 +97,18 @@ func TestTransactionQueue(t *testing.T) { tx1.SignECDSA(key) tx2.SignECDSA(key) tx3.SignECDSA(key) - pool.queueTx(tx1) - pool.queueTx(tx2) - pool.queueTx(tx3) + pool.queueTx(tx1.Hash(), tx1) + pool.queueTx(tx2.Hash(), tx2) + pool.queueTx(tx3.Hash(), tx3) from, _ = tx1.From() + pool.checkQueue() - if len(pool.txs) != 1 { + if len(pool.pending) != 1 { t.Error("expected tx pool to be 1 =") } - - if len(pool.queue[from]) != 3 { - t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) + if len(pool.queue[from]) != 2 { + t.Error("expected len(queue) == 2, got", len(pool.queue[from])) } } @@ -117,15 +117,15 @@ func TestRemoveTx(t *testing.T) { tx := transaction() tx.SignECDSA(key) from, _ := tx.From() - pool.currentState().AddBalance(from, big.NewInt(1)) - pool.queueTx(tx) - pool.addTx(tx) + pool.state.AddBalance(from, big.NewInt(1)) + pool.queueTx(tx.Hash(), tx) + pool.addTx(tx.Hash(), from, tx) if len(pool.queue) != 1 { t.Error("expected queue to be 1, got", len(pool.queue)) } - if len(pool.txs) != 1 { - t.Error("expected txs to be 1, got", len(pool.txs)) + if len(pool.pending) != 1 { + t.Error("expected txs to be 1, got", len(pool.pending)) } pool.removeTx(tx.Hash()) @@ -134,8 +134,8 @@ func TestRemoveTx(t *testing.T) { t.Error("expected queue to be 0, got", len(pool.queue)) } - if len(pool.txs) > 0 { - t.Error("expected txs to be 0, got", len(pool.txs)) + if len(pool.pending) > 0 { + t.Error("expected txs to be 0, got", len(pool.pending)) } } @@ -146,9 +146,58 @@ func TestNegativeValue(t *testing.T) { tx.Value().Set(big.NewInt(-1)) tx.SignECDSA(key) from, _ := tx.From() - pool.currentState().AddBalance(from, big.NewInt(1)) + pool.state.AddBalance(from, big.NewInt(1)) err := pool.Add(tx) if err != ErrNegativeValue { t.Error("expected", ErrNegativeValue, "got", err) } } + +func TestTransactionChainFork(t *testing.T) { + pool, key := setupTxPool() + addr := crypto.PubkeyToAddress(key.PublicKey) + pool.currentState().AddBalance(addr, big.NewInt(100000000000000)) + tx := transaction() + tx.GasLimit = big.NewInt(100000) + tx.SignECDSA(key) + + err := pool.add(tx) + if err != nil { + t.Error("didn't expect error", err) + } + pool.RemoveTransactions([]*types.Transaction{tx}) + + // reset the pool's internal state + pool.resetState() + err = pool.add(tx) + if err != nil { + t.Error("didn't expect error", err) + } +} + +func TestTransactionDoubleNonce(t *testing.T) { + pool, key := setupTxPool() + addr := crypto.PubkeyToAddress(key.PublicKey) + pool.currentState().AddBalance(addr, big.NewInt(100000000000000)) + tx := transaction() + tx.GasLimit = big.NewInt(100000) + tx.SignECDSA(key) + + err := pool.add(tx) + if err != nil { + t.Error("didn't expect error", err) + } + + tx2 := transaction() + tx2.GasLimit = big.NewInt(1000000) + tx2.SignECDSA(key) + + err = pool.add(tx2) + if err != nil { + t.Error("didn't expect error", err) + } + + if len(pool.pending) != 2 { + t.Error("expected 2 pending txs. Got", len(pool.pending)) + } +} diff --git a/core/types/block.go b/core/types/block.go index c93452fa7..d7963981e 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -1,7 +1,9 @@ package types import ( + "bytes" "encoding/binary" + "encoding/json" "fmt" "io" "math/big" @@ -80,6 +82,28 @@ func (self *Header) RlpData() interface{} { return self.rlpData(true) } +func (h *Header) UnmarshalJSON(data []byte) error { + var ext struct { + ParentHash string + Coinbase string + Difficulty string + GasLimit string + Time uint64 + Extra string + } + dec := json.NewDecoder(bytes.NewReader(data)) + if err := dec.Decode(&ext); err != nil { + return err + } + + h.ParentHash = common.HexToHash(ext.ParentHash) + h.Coinbase = common.HexToAddress(ext.Coinbase) + h.Difficulty = common.String2Big(ext.Difficulty) + h.Time = ext.Time + h.Extra = []byte(ext.Extra) + return nil +} + func rlpHash(x interface{}) (h common.Hash) { hw := sha3.NewKeccak256() rlp.Encode(hw, x) diff --git a/core/types/transaction_test.go b/core/types/transaction_test.go index dada424e9..492059c28 100644 --- a/core/types/transaction_test.go +++ b/core/types/transaction_test.go @@ -64,7 +64,7 @@ func decodeTx(data []byte) (*Transaction, error) { return &tx, rlp.Decode(bytes.NewReader(data), &tx) } -func defaultTestKey() (*ecdsa.PrivateKey, []byte) { +func defaultTestKey() (*ecdsa.PrivateKey, common.Address) { key := crypto.ToECDSA(common.Hex2Bytes("45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8")) addr := crypto.PubkeyToAddress(key.PublicKey) return key, addr @@ -85,7 +85,7 @@ func TestRecipientEmpty(t *testing.T) { t.FailNow() } - if !bytes.Equal(addr, from.Bytes()) { + if addr != from { t.Error("derived address doesn't match") } } @@ -105,7 +105,7 @@ func TestRecipientNormal(t *testing.T) { t.FailNow() } - if !bytes.Equal(addr, from.Bytes()) { + if addr != from { t.Error("derived address doesn't match") } } diff --git a/core/vm/address.go b/core/vm/contracts.go similarity index 100% rename from core/vm/address.go rename to core/vm/contracts.go diff --git a/core/vm/environment.go b/core/vm/environment.go index cc9570fc8..282d19578 100644 --- a/core/vm/environment.go +++ b/core/vm/environment.go @@ -2,13 +2,10 @@ package vm import ( "errors" - "fmt" - "io" "math/big" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" - "github.com/ethereum/go-ethereum/rlp" ) type Environment interface { @@ -52,40 +49,3 @@ func Transfer(from, to Account, amount *big.Int) error { return nil } - -type Log struct { - address common.Address - topics []common.Hash - data []byte - log uint64 -} - -func (self *Log) Address() common.Address { - return self.address -} - -func (self *Log) Topics() []common.Hash { - return self.topics -} - -func (self *Log) Data() []byte { - return self.data -} - -func (self *Log) Number() uint64 { - return self.log -} - -func (self *Log) EncodeRLP(w io.Writer) error { - return rlp.Encode(w, []interface{}{self.address, self.topics, self.data}) -} - -/* -func (self *Log) RlpData() interface{} { - return []interface{}{self.address, common.ByteSliceToInterface(self.topics), self.data} -} -*/ - -func (self *Log) String() string { - return fmt.Sprintf("{%x %x %x}", self.address, self.data, self.topics) -} diff --git a/core/vm/main_test.go b/core/vm/main_test.go deleted file mode 100644 index 0ae03bf6a..000000000 --- a/core/vm/main_test.go +++ /dev/null @@ -1,9 +0,0 @@ -package vm - -import ( - "testing" - - checker "gopkg.in/check.v1" -) - -func Test(t *testing.T) { checker.TestingT(t) } diff --git a/core/vm/types.go b/core/vm/opcodes.go similarity index 100% rename from core/vm/types.go rename to core/vm/opcodes.go diff --git a/core/vm/vm_test.go b/core/vm/vm_test.go deleted file mode 100644 index 9bd147a72..000000000 --- a/core/vm/vm_test.go +++ /dev/null @@ -1,3 +0,0 @@ -package vm - -// Tests have been removed in favour of general tests. If anything implementation specific needs testing, put it here diff --git a/crypto/crypto.go b/crypto/crypto.go index 9aef44863..8f5597b09 100644 --- a/crypto/crypto.go +++ b/crypto/crypto.go @@ -201,7 +201,7 @@ func ImportBlockTestKey(privKeyBytes []byte) error { ecKey := ToECDSA(privKeyBytes) key := &Key{ Id: uuid.NewRandom(), - Address: common.BytesToAddress(PubkeyToAddress(ecKey.PublicKey)), + Address: PubkeyToAddress(ecKey.PublicKey), PrivateKey: ecKey, } err := ks.StoreKey(key, "") @@ -247,7 +247,7 @@ func decryptPreSaleKey(fileContent []byte, password string) (key *Key, err error ecKey := ToECDSA(ethPriv) key = &Key{ Id: nil, - Address: common.BytesToAddress(PubkeyToAddress(ecKey.PublicKey)), + Address: PubkeyToAddress(ecKey.PublicKey), PrivateKey: ecKey, } derivedAddr := hex.EncodeToString(key.Address.Bytes()) // needed because .Hex() gives leading "0x" @@ -305,7 +305,7 @@ func PKCS7Unpad(in []byte) []byte { return in[:len(in)-int(padding)] } -func PubkeyToAddress(p ecdsa.PublicKey) []byte { +func PubkeyToAddress(p ecdsa.PublicKey) common.Address { pubBytes := FromECDSAPub(&p) - return Sha3(pubBytes[1:])[12:] + return common.BytesToAddress(Sha3(pubBytes[1:])[12:]) } diff --git a/crypto/key.go b/crypto/key.go index 0c5ce4254..0b76c43ff 100644 --- a/crypto/key.go +++ b/crypto/key.go @@ -124,7 +124,7 @@ func NewKeyFromECDSA(privateKeyECDSA *ecdsa.PrivateKey) *Key { id := uuid.NewRandom() key := &Key{ Id: id, - Address: common.BytesToAddress(PubkeyToAddress(privateKeyECDSA.PublicKey)), + Address: PubkeyToAddress(privateKeyECDSA.PublicKey), PrivateKey: privateKeyECDSA, } return key diff --git a/eth/backend.go b/eth/backend.go index 98939b1fa..3956dfcaa 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -198,7 +198,6 @@ type Ethereum struct { net *p2p.Server eventMux *event.TypeMux - txSub event.Subscription miner *miner.Miner // logger logger.LogSystem @@ -288,7 +287,7 @@ func New(config *Config) (*Ethereum, error) { eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.pow, eth.EventMux()) eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock) eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) - eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) + eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) eth.miner = miner.New(eth, eth.EventMux(), eth.pow) eth.miner.SetGasPrice(config.GasPrice) @@ -470,10 +469,6 @@ func (s *Ethereum) Start() error { s.whisper.Start() } - // broadcast transactions - s.txSub = s.eventMux.Subscribe(core.TxPreEvent{}) - go s.txBroadcastLoop() - glog.V(logger.Info).Infoln("Server started") return nil } @@ -531,8 +526,6 @@ func (self *Ethereum) AddPeer(nodeURL string) error { } func (s *Ethereum) Stop() { - s.txSub.Unsubscribe() // quits txBroadcastLoop - s.net.Stop() s.protocolManager.Stop() s.chainManager.Stop() @@ -552,28 +545,6 @@ func (s *Ethereum) WaitForShutdown() { <-s.shutdownChan } -func (self *Ethereum) txBroadcastLoop() { - // automatically stops if unsubscribe - for obj := range self.txSub.Chan() { - event := obj.(core.TxPreEvent) - self.syncAccounts(event.Tx) - } -} - -// keep accounts synced up -func (self *Ethereum) syncAccounts(tx *types.Transaction) { - from, err := tx.From() - if err != nil { - return - } - - if self.accountManager.HasAccount(from) { - if self.chainManager.TxState().GetNonce(from) < tx.Nonce() { - self.chainManager.TxState().SetNonce(from, tx.Nonce()) - } - } -} - // StartAutoDAG() spawns a go routine that checks the DAG every autoDAGcheckInterval // by default that is 10 times per epoch // in epoch n, if we past autoDAGepochHeight within-epoch blocks, diff --git a/miner/worker.go b/miner/worker.go index 58efd61db..1580d4d42 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -494,10 +494,6 @@ func (self *worker) commitTransactions(transactions types.Transactions) { err := self.commitTransaction(tx) switch { case core.IsNonceErr(err) || core.IsInvalidTxErr(err): - // Remove invalid transactions - from, _ := tx.From() - - self.chain.TxState().RemoveNonce(from, tx.Nonce()) current.remove.Add(tx.Hash()) if glog.V(logger.Detail) { diff --git a/xeth/xeth.go b/xeth/xeth.go index 157fe76c7..d0d51bfe0 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -936,24 +936,23 @@ func (self *XEth) Transact(fromStr, toStr, nonceStr, valueStr, gasStr, gasPriceS tx = types.NewTransactionMessage(to, value, gas, price, data) } - state := self.backend.ChainManager().TxState() + state := self.backend.TxPool().State() var nonce uint64 if len(nonceStr) != 0 { nonce = common.Big(nonceStr).Uint64() } else { - nonce = state.NewNonce(from) + nonce = state.GetNonce(from) } tx.SetNonce(nonce) if err := self.sign(tx, from, false); err != nil { - state.RemoveNonce(from, tx.Nonce()) return "", err } if err := self.backend.TxPool().Add(tx); err != nil { - state.RemoveNonce(from, tx.Nonce()) return "", err } + state.SetNonce(from, nonce+1) if contractCreation { addr := core.AddressFromMessage(tx)