Allow raft to recover state after non-graceful shutdown in non-archive mode (#860)

raft: re-enable gcmode full for raft and ensure the node is able to sync up post non-disgraceful shutdown
This commit is contained in:
Sai V 2019-12-02 22:46:41 +08:00 committed by Samer Falah
parent ef99f6d82c
commit 882a303255
3 changed files with 35 additions and 16 deletions

View File

@ -1225,11 +1225,6 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
}
cfg.NoPruning = ctx.GlobalString(GCModeFlag.Name) == "archive"
//Quorum - set gcmode=archive for Raft
if ctx.GlobalBool(RaftModeFlag.Name) {
log.Info("set gcmode=archive for Raft")
cfg.NoPruning = true
}
if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheGCFlag.Name) {
cfg.TrieCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheGCFlag.Name) / 100
@ -1499,15 +1494,8 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai
Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name)
}
trieWriteCacheDisabled := ctx.GlobalString(GCModeFlag.Name) == "archive"
//Quorum - set gcmode=archive for Raft
if !trieWriteCacheDisabled && ctx.GlobalBool(RaftModeFlag.Name) {
log.Info("set gcmode=archive for Raft")
trieWriteCacheDisabled = true
}
cache := &core.CacheConfig{
Disabled: trieWriteCacheDisabled,
Disabled: ctx.GlobalString(GCModeFlag.Name) == "archive",
TrieNodeLimit: eth.DefaultConfig.TrieCache,
TrieTimeLimit: eth.DefaultConfig.TrieTimeout,
}

View File

@ -431,9 +431,40 @@ func (pm *ProtocolManager) startRaft() {
maybeRaftSnapshot = pm.loadSnapshot() // re-establishes peer connections
}
pm.wal = pm.replayWAL(maybeRaftSnapshot)
loadedWal, entries := pm.replayWAL(maybeRaftSnapshot)
pm.wal = loadedWal
if walExisted {
// If we shutdown but didn't manage to flush the state to disk, then it will be the case that we will only sync
// up to the snapshot. In this case, we can replay the raft entries that we have in saved to replay the blocks
// back into our chain. We output errors but cannot do much if one occurs, since we can't fork to a different
// chain and all other nodes in the network have confirmed these blocks
if maybeRaftSnapshot != nil {
currentChainHead := pm.blockchain.CurrentBlock().Number()
for _, entry := range entries {
if entry.Type == raftpb.EntryNormal {
var block types.Block
if err := rlp.DecodeBytes(entry.Data, &block); err != nil {
log.Error("error decoding block: ", "err", err)
continue
}
if thisBlockHead := pm.blockchain.GetBlockByHash(block.Hash()); thisBlockHead != nil {
// check if the block is already existing in the local chain
// and the block number is greater than current chain head
if thisBlockHeadNum := thisBlockHead.Number(); thisBlockHeadNum.Cmp(currentChainHead) > 0 {
// insert the block only if its already seen
blocks := []*types.Block{&block}
if _, err := pm.blockchain.InsertChain(blocks); err != nil {
log.Error("error inserting the block into the chain", "number", block.NumberU64(), "hash", block.Hash(), "err", err)
}
}
}
}
}
}
if hardState, _, err := pm.raftStorage.InitialState(); err != nil {
panic(fmt.Sprintf("failed to read initial state from raft while restarting: %v", err))
} else {

View File

@ -38,7 +38,7 @@ func (pm *ProtocolManager) openWAL(maybeRaftSnapshot *raftpb.Snapshot) *wal.WAL
return wal
}
func (pm *ProtocolManager) replayWAL(maybeRaftSnapshot *raftpb.Snapshot) *wal.WAL {
func (pm *ProtocolManager) replayWAL(maybeRaftSnapshot *raftpb.Snapshot) (*wal.WAL, []raftpb.Entry) {
log.Info("replaying WAL")
wal := pm.openWAL(maybeRaftSnapshot)
@ -50,5 +50,5 @@ func (pm *ProtocolManager) replayWAL(maybeRaftSnapshot *raftpb.Snapshot) *wal.WA
pm.raftStorage.SetHardState(hardState)
pm.raftStorage.Append(entries)
return wal
return wal, entries
}