Fix premature abort during blocks processing for raft (#881)

* fix premature abort during blocks processing for raft
This commit is contained in:
Zhou Zhiyao 2019-11-30 03:13:37 +08:00 committed by Samer Falah
parent 1c6fdc0d06
commit b7edc0b6c9
3 changed files with 20 additions and 2 deletions

View File

@ -1149,6 +1149,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during blocks processing")
// QUORUM
if bc.chainConfig.IsQuorum && bc.chainConfig.Istanbul == nil && bc.chainConfig.Clique == nil {
// Only returns an error for raft mode
return i, events, coalescedLogs, ErrAbortBlocksProcessing
}
// END QUORUM
break
}
// If the header is a banned one, straight out abort

View File

@ -32,4 +32,7 @@ var (
// ErrNonceTooHigh is returned if the nonce of a transaction is higher than the
// next one expected based on the local chain.
ErrNonceTooHigh = errors.New("nonce too high")
// ErrAbortBlocksProcessing is returned if bc.insertChain is interrupted under raft mode
ErrAbortBlocksProcessing = errors.New("abort during blocks processing")
)

View File

@ -782,7 +782,11 @@ func (pm *ProtocolManager) eventLoop() {
headBlockHash := pm.blockchain.CurrentBlock().Hash()
log.Warn("not applying already-applied block", "block hash", block.Hash(), "parent", block.ParentHash(), "head", headBlockHash)
} else {
pm.applyNewChainHead(&block)
if !pm.applyNewChainHead(&block) {
// return false only if insert chain is interrupted
// stop eventloop
return
}
}
case raftpb.EntryConfChange:
@ -902,7 +906,7 @@ func blockExtendsChain(block *types.Block, chain *core.BlockChain) bool {
return block.ParentHash() == chain.CurrentBlock().Hash()
}
func (pm *ProtocolManager) applyNewChainHead(block *types.Block) {
func (pm *ProtocolManager) applyNewChainHead(block *types.Block) bool {
if !blockExtendsChain(block, pm.blockchain) {
headBlock := pm.blockchain.CurrentBlock()
@ -923,11 +927,16 @@ func (pm *ProtocolManager) applyNewChainHead(block *types.Block) {
_, err := pm.blockchain.InsertChain([]*types.Block{block})
if err != nil {
if err == core.ErrAbortBlocksProcessing {
log.Error(fmt.Sprintf("failed to extend chain: %s", err.Error()))
return false
}
panic(fmt.Sprintf("failed to extend chain: %s", err.Error()))
}
log.EmitCheckpoint(log.BlockCreated, "block", fmt.Sprintf("%x", block.Hash()))
}
return true
}
// Sets new appliedIndex in-memory, *and* writes this appliedIndex to LevelDB.