package raft import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "gopkg.in/fatih/set.v0" lane "gopkg.in/oleiade/lane.v1" ) // The speculative chain represents blocks that we have minted which haven't been accepted into the chain yet, building // on each other in a chain. It has three basic operations: // * add new block to end // * accept / remove oldest block // * unwind / remove invalid blocks to the end // // Additionally: // * clear state when we stop minting // * set the parent when we're not minting (so it's always current) type speculativeChain struct { head *types.Block unappliedBlocks *lane.Deque expectedInvalidBlockHashes *set.Set // This is thread-safe. This set is referred to as our "guard" below. proposedTxes *set.Set // This is thread-safe. } func newSpeculativeChain() *speculativeChain { return &speculativeChain{ head: nil, unappliedBlocks: lane.NewDeque(), expectedInvalidBlockHashes: set.New(), proposedTxes: set.New(), } } func (chain *speculativeChain) clear(block *types.Block) { chain.head = block chain.unappliedBlocks = lane.NewDeque() chain.expectedInvalidBlockHashes.Clear() chain.proposedTxes.Clear() } // Append a new speculative block func (chain *speculativeChain) extend(block *types.Block) { chain.head = block chain.recordProposedTransactions(block.Transactions()) chain.unappliedBlocks.Append(block) } // Set the parent of the speculative chain // // Note: This is only called when not minter func (chain *speculativeChain) setHead(block *types.Block) { chain.head = block } // Accept this block, removing it from the head of the speculative chain func (chain *speculativeChain) accept(acceptedBlock *types.Block) { earliestProposedI := chain.unappliedBlocks.Shift() var earliestProposed *types.Block if nil != earliestProposedI { earliestProposed = earliestProposedI.(*types.Block) } if expectedBlock := earliestProposed == nil || earliestProposed.Hash() == acceptedBlock.Hash(); expectedBlock { // Remove the txes in this accepted block from our blacklist. chain.removeProposedTxes(acceptedBlock) } else { glog.V(logger.Warn).Infof("Another node minted %x; Clearing speculative state\n", acceptedBlock.Hash()) chain.clear(acceptedBlock) } } // Remove all blocks in the chain from the specified one until the end func (chain *speculativeChain) unwindFrom(invalidHash common.Hash, headBlock *types.Block) { // check our "guard" to see if this is a (descendant) block we // expected to be ruled invalid. if we find it, remove from the guard if chain.expectedInvalidBlockHashes.Has(invalidHash) { glog.V(logger.Warn).Infof("Removing expected-invalid block %x from guard.\n", invalidHash) chain.expectedInvalidBlockHashes.Remove(invalidHash) return } // pop from the RHS repeatedly, updating minter.parent each time. if not // our block, add to guard. in all cases, call removeProposedTxes for { currBlockI := chain.unappliedBlocks.Pop() if nil == currBlockI { glog.V(logger.Warn).Infof("(Popped all blocks from queue.)\n") break } currBlock := currBlockI.(*types.Block) glog.V(logger.Info).Infof("Popped block %x from queue RHS.\n", currBlock.Hash()) // Maintain invariant: the parent always points to the last speculative block or the head of the blockchain // if there are not speculative blocks. if speculativeParentI := chain.unappliedBlocks.Last(); nil != speculativeParentI { chain.head = speculativeParentI.(*types.Block) } else { chain.head = headBlock } chain.removeProposedTxes(currBlock) if currBlock.Hash() != invalidHash { glog.V(logger.Warn).Infof("Haven't yet found block %x; adding descendent %x to guard.\n", invalidHash, currBlock.Hash()) chain.expectedInvalidBlockHashes.Add(currBlock.Hash()) } else { break } } } // We keep track of txes we've put in all newly-mined blocks since the last // ChainHeadEvent, and filter them out so that we don't try to create blocks // with the same transactions. This is necessary because the TX pool will keep // supplying us these transactions until they are in the chain (after having // flown through raft). func (chain *speculativeChain) recordProposedTransactions(txes types.Transactions) { txHashIs := make([]interface{}, len(txes)) for i, tx := range txes { txHashIs[i] = tx.Hash() } chain.proposedTxes.Add(txHashIs...) } // Removes txes in block from our "blacklist" of "proposed tx" hashes. When we // create a new block and use txes from the tx pool, we ignore those that we // have already used ("proposed"), but that haven't yet officially made it into // the chain. // // It's important to remove hashes from this blacklist (once we know we don't // need them in there anymore) so that it doesn't grow endlessly. func (chain *speculativeChain) removeProposedTxes(block *types.Block) { minedTxes := block.Transactions() minedTxInterfaces := make([]interface{}, len(minedTxes)) for i, tx := range minedTxes { minedTxInterfaces[i] = tx.Hash() } // NOTE: we are using a thread-safe Set here, so it's fine if we access this // here and in mintNewBlock concurrently. using a finer-grained set-specific // lock here is preferable, because mintNewBlock holds its locks for a // nontrivial amount of time. chain.proposedTxes.Remove(minedTxInterfaces...) } func (chain *speculativeChain) withoutProposedTxes(addrTxes AddressTxes) AddressTxes { newMap := make(AddressTxes) for addr, txes := range addrTxes { filteredTxes := make(types.Transactions, 0) for _, tx := range txes { if !chain.proposedTxes.Has(tx.Hash()) { filteredTxes = append(filteredTxes, tx) } } if len(filteredTxes) > 0 { newMap[addr] = filteredTxes } } return newMap }