consensus refactor: reconstruct LastCommits upon restart

This commit is contained in:
Jae Kwon 2015-06-04 13:36:47 -07:00
parent 6a0223641f
commit 5790ea9f43
4 changed files with 95 additions and 65 deletions

View File

@ -524,28 +524,15 @@ OUTER_LOOP:
// Catchup logic
if prs.Height != 0 && !prs.HasAllCatchupCommits {
// If peer is lagging by height 1, match our LastCommits or SeenValidation to peer's Commits.
if rs.Height == prs.Height+1 && rs.LastCommits.Size() > 0 {
// If there are lastcommits to send...
if trySendVote(prs.Height, rs.LastCommits, prs.Commits) {
continue OUTER_LOOP
} else {
ps.SetHasAllCatchupCommits(prs.Height)
}
}
// Or, if peer is lagging by 1 and we don't have LastCommits, send SeenValidation.
if rs.Height == prs.Height+1 && rs.LastCommits.Size() == 0 {
// Load the blockMeta for block at prs.Height
blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
// Load the seen validation for prs.Height
validation := conR.blockStore.LoadSeenValidation(prs.Height)
log.Debug("Loaded SeenValidation for catch-up", "height", prs.Height, "blockMeta", blockMeta, "validation", validation)
if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
continue OUTER_LOOP
} else {
ps.SetHasAllCatchupCommits(prs.Height)
// If peer is lagging by height 1
if rs.Height == prs.Height+1 {
if rs.LastCommits.Size() > 0 {
// Sync peer to rs.LastCommits
if trySendVote(prs.Height, rs.LastCommits, prs.Commits) {
continue OUTER_LOOP
} else {
ps.SetHasAllCatchupCommits(prs.Height)
}
}
}

View File

@ -268,9 +268,38 @@ func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReacto
newStepCh: make(chan *RoundState, 1),
}
cs.updateToState(state, true)
cs.reconstructLastCommits(state)
return cs
}
// Reconstruct LastCommits from SeenValidation, which we saved along with the block,
// (which happens even before saving the state)
func (cs *ConsensusState) reconstructLastCommits(state *sm.State) {
if state.LastBlockHeight == 0 {
return
}
lastCommits := NewVoteSet(state.LastBlockHeight, 0, types.VoteTypeCommit, state.LastBondedValidators)
seenValidation := cs.blockStore.LoadSeenValidation(state.LastBlockHeight)
for idx, commit := range seenValidation.Commits {
commitVote := &types.Vote{
Height: state.LastBlockHeight,
Round: commit.Round,
Type: types.VoteTypeCommit,
BlockHash: state.LastBlockHash,
BlockParts: state.LastBlockParts,
Signature: commit.Signature,
}
added, _, err := lastCommits.AddByIndex(uint(idx), commitVote)
if !added || err != nil {
panic(Fmt("Failed to reconstruct LastCommits: %v", err))
}
}
if !lastCommits.HasTwoThirdsMajority() {
panic("Failed to reconstruct LastCommits: Does not have +2/3 maj")
}
cs.LastCommits = lastCommits
}
func (cs *ConsensusState) GetState() *sm.State {
cs.mtx.Lock()
defer cs.mtx.Unlock()
@ -645,13 +674,9 @@ func (cs *ConsensusState) RunActionPropose(height uint, round uint) {
// Make the validation from LastCommits
validation = cs.LastCommits.MakeValidation()
} else {
// Upon reboot, we may have to use SeenValidation
validation = cs.blockStore.LoadSeenValidation(height - 1)
if validation == nil {
// We just don't have any validation for the previous block
log.Debug("Cannot propose anything: No validation for the previous block.")
return
}
// We just don't have any validation for the previous block
log.Debug("Cannot propose anything: No validation for the previous block.")
return
}
txs := cs.mempoolReactor.Mempool.GetProposalTxs()
block = &types.Block{
@ -1024,14 +1049,14 @@ func (cs *ConsensusState) addVote(address []byte, vote *types.Vote) (added bool,
switch vote.Type {
case types.VoteTypePrevote:
// Prevotes checks for height+round match.
added, index, err = cs.Prevotes.Add(address, vote)
added, index, err = cs.Prevotes.AddByAddress(address, vote)
if added {
log.Debug(Fmt("Added prevote: %v", cs.Prevotes.StringShort()))
}
return
case types.VoteTypePrecommit:
// Precommits checks for height+round match.
added, index, err = cs.Precommits.Add(address, vote)
added, index, err = cs.Precommits.AddByAddress(address, vote)
if added {
log.Debug(Fmt("Added precommit: %v", cs.Precommits.StringShort()))
}
@ -1040,9 +1065,9 @@ func (cs *ConsensusState) addVote(address []byte, vote *types.Vote) (added bool,
if vote.Height == cs.Height {
// No need to check if vote.Round < cs.Round ...
// Prevotes && Precommits already checks that.
cs.Prevotes.Add(address, vote)
cs.Precommits.Add(address, vote)
added, index, err = cs.Commits.Add(address, vote)
cs.Prevotes.AddByAddress(address, vote)
cs.Precommits.AddByAddress(address, vote)
added, index, err = cs.Commits.AddByAddress(address, vote)
if added && cs.Commits.HasTwoThirdsMajority() && cs.CommitTime.IsZero() {
cs.CommitTime = time.Now()
log.Debug(Fmt("Set CommitTime to %v", cs.CommitTime))
@ -1061,7 +1086,7 @@ func (cs *ConsensusState) addVote(address []byte, vote *types.Vote) (added bool,
return
}
if vote.Height+1 == cs.Height {
added, index, err = cs.LastCommits.Add(address, vote)
added, index, err = cs.LastCommits.AddByAddress(address, vote)
log.Debug(Fmt("Added lastCommits: %v", cs.LastCommits.StringShort()))
return
}

View File

@ -70,14 +70,46 @@ func (voteSet *VoteSet) Size() uint {
}
}
// True if added, false if not.
// Returns ErrVote[UnexpectedStep|InvalidAccount|InvalidSignature|InvalidBlockHash|ConflictingSignature]
// Returns added=true, index if vote was added
// Otherwise returns err=ErrVote[UnexpectedStep|InvalidAccount|InvalidSignature|InvalidBlockHash|ConflictingSignature]
// CONTRACT: if err == nil, added == true
// NOTE: vote should not be mutated after adding.
// Returns the validator index of the vote unless error is set.
func (voteSet *VoteSet) Add(address []byte, vote *types.Vote) (bool, uint, error) {
func (voteSet *VoteSet) AddByIndex(valIndex uint, vote *types.Vote) (added bool, index uint, err error) {
voteSet.mtx.Lock()
defer voteSet.mtx.Unlock()
return voteSet.addByIndex(valIndex, vote)
}
// Returns added=true, index if vote was added
// Otherwise returns err=ErrVote[UnexpectedStep|InvalidAccount|InvalidSignature|InvalidBlockHash|ConflictingSignature]
// CONTRACT: if err == nil, added == true
// NOTE: vote should not be mutated after adding.
func (voteSet *VoteSet) AddByAddress(address []byte, vote *types.Vote) (added bool, index uint, err error) {
voteSet.mtx.Lock()
defer voteSet.mtx.Unlock()
// Ensure that signer is a validator.
valIndex, val := voteSet.valSet.GetByAddress(address)
if val == nil {
return false, 0, types.ErrVoteInvalidAccount
}
return voteSet.addVote(val, valIndex, vote)
}
func (voteSet *VoteSet) addByIndex(valIndex uint, vote *types.Vote) (bool, uint, error) {
// Ensure that signer is a validator.
_, val := voteSet.valSet.GetByIndex(valIndex)
if val == nil {
return false, 0, types.ErrVoteInvalidAccount
}
return voteSet.addVote(val, valIndex, vote)
}
func (voteSet *VoteSet) addVote(val *sm.Validator, valIndex uint, vote *types.Vote) (bool, uint, error) {
// Make sure the step matches. (or that vote is commit && round < voteSet.round)
if vote.Height != voteSet.height ||
(vote.Type != types.VoteTypeCommit && vote.Round != voteSet.round) ||
@ -86,22 +118,12 @@ func (voteSet *VoteSet) Add(address []byte, vote *types.Vote) (bool, uint, error
return false, 0, types.ErrVoteUnexpectedStep
}
// Ensure that signer is a validator.
valIndex, val := voteSet.valSet.GetByAddress(address)
if val == nil {
return false, 0, types.ErrVoteInvalidAccount
}
// Check signature.
if !val.PubKey.VerifyBytes(account.SignBytes(config.GetString("chain_id"), vote), vote.Signature) {
// Bad signature.
return false, 0, types.ErrVoteInvalidSignature
}
return voteSet.addVote(valIndex, vote)
}
func (voteSet *VoteSet) addVote(valIndex uint, vote *types.Vote) (bool, uint, error) {
// If vote already exists, return false.
if existingVote := voteSet.votes[valIndex]; existingVote != nil {
if bytes.Equal(existingVote.BlockHash, vote.BlockHash) {
@ -115,10 +137,6 @@ func (voteSet *VoteSet) addVote(valIndex uint, vote *types.Vote) (bool, uint, er
}
// Add vote.
_, val := voteSet.valSet.GetByIndex(valIndex)
if val == nil {
panic(fmt.Sprintf("Missing validator for index %v", valIndex))
}
voteSet.votes[valIndex] = vote
voteSet.votesBitArray.SetIndex(valIndex, true)
blockKey := string(vote.BlockHash) + string(binary.BinaryBytes(vote.BlockParts))
@ -144,7 +162,7 @@ func (voteSet *VoteSet) AddFromCommits(commits *VoteSet) {
continue
}
if commit.Round < voteSet.round {
voteSet.addVote(uint(valIndex), commit)
voteSet.addByIndex(uint(valIndex), commit)
}
}
}

View File

@ -51,7 +51,7 @@ func withBlockParts(vote *types.Vote, blockParts types.PartSetHeader) *types.Vot
func signAddVote(privVal *sm.PrivValidator, vote *types.Vote, voteSet *VoteSet) (bool, error) {
privVal.SignVoteUnsafe(config.GetString("chain_id"), vote)
added, _, err := voteSet.Add(privVal.Address, vote)
added, _, err := voteSet.AddByAddress(privVal.Address, vote)
return added, err
}
@ -197,31 +197,31 @@ func TestBadVotes(t *testing.T) {
vote := &types.Vote{Height: height, Round: round, Type: types.VoteTypePrevote, BlockHash: nil}
added, err := signAddVote(privValidators[0], vote, voteSet)
if !added || err != nil {
t.Errorf("Expected Add() to succeed")
t.Errorf("Expected VoteSet.Add to succeed")
}
// val0 votes again for some block.
added, err = signAddVote(privValidators[0], withBlockHash(vote, RandBytes(32)), voteSet)
if added || err == nil {
t.Errorf("Expected Add() to fail, dupeout.")
t.Errorf("Expected VoteSet.Add to fail, dupeout.")
}
// val1 votes on another height
added, err = signAddVote(privValidators[1], withHeight(vote, height+1), voteSet)
if added {
t.Errorf("Expected Add() to fail, wrong height")
t.Errorf("Expected VoteSet.Add to fail, wrong height")
}
// val2 votes on another round
added, err = signAddVote(privValidators[2], withRound(vote, round+1), voteSet)
if added {
t.Errorf("Expected Add() to fail, wrong round")
t.Errorf("Expected VoteSet.Add to fail, wrong round")
}
// val3 votes of another type.
added, err = signAddVote(privValidators[3], withType(vote, types.VoteTypePrecommit), voteSet)
if added {
t.Errorf("Expected Add() to fail, wrong type")
t.Errorf("Expected VoteSet.Add to fail, wrong type")
}
}
@ -243,35 +243,35 @@ func TestAddCommitsToPrevoteVotes(t *testing.T) {
vote = &types.Vote{Height: height - 1, Round: round, Type: types.VoteTypeCommit, BlockHash: nil}
added, _ := signAddVote(privValidators[6], vote, voteSet)
if added {
t.Errorf("Expected Add() to fail, wrong height.")
t.Errorf("Expected VoteSet.Add to fail, wrong height.")
}
// Attempt to add a commit from val6 at a later round
vote = &types.Vote{Height: height, Round: round + 1, Type: types.VoteTypeCommit, BlockHash: nil}
added, _ = signAddVote(privValidators[6], vote, voteSet)
if added {
t.Errorf("Expected Add() to fail, cannot add future round vote.")
t.Errorf("Expected VoteSet.Add to fail, cannot add future round vote.")
}
// Attempt to add a commit from val6 for currrent height/round.
vote = &types.Vote{Height: height, Round: round, Type: types.VoteTypeCommit, BlockHash: nil}
added, err := signAddVote(privValidators[6], vote, voteSet)
if added || err == nil {
t.Errorf("Expected Add() to fail, only prior round commits can be added.")
t.Errorf("Expected VoteSet.Add to fail, only prior round commits can be added.")
}
// Add commit from val6 at a previous round
vote = &types.Vote{Height: height, Round: round - 1, Type: types.VoteTypeCommit, BlockHash: nil}
added, err = signAddVote(privValidators[6], vote, voteSet)
if !added || err != nil {
t.Errorf("Expected Add() to succeed, commit for prior rounds are relevant.")
t.Errorf("Expected VoteSet.Add to succeed, commit for prior rounds are relevant.")
}
// Also add commit from val7 for previous round.
vote = &types.Vote{Height: height, Round: round - 2, Type: types.VoteTypeCommit, BlockHash: nil}
added, err = signAddVote(privValidators[7], vote, voteSet)
if !added || err != nil {
t.Errorf("Expected Add() to succeed. err: %v", err)
t.Errorf("Expected VoteSet.Add to succeed. err: %v", err)
}
// We should have 2/3 majority