Send SeenValidation to peer & use for proposals when we don't have LastCommits due to reboot

This commit is contained in:
Jae Kwon 2015-01-08 22:07:23 -08:00
parent 51c70dd8ac
commit 4a81b06b6e
5 changed files with 202 additions and 113 deletions

View File

@ -99,12 +99,15 @@ func (bs *BlockStore) LoadBlockMeta(height uint) *BlockMeta {
return meta
}
// NOTE: the Commit-vote heights are for the block at `height-1`
// Since these are included in the subsequent block, the height
// is off by 1.
func (bs *BlockStore) LoadBlockValidation(height uint) *Validation {
var n int64
var err error
r := bs.GetReader(calcBlockValidationKey(height))
if r == nil {
panic(Fmt("Validation does not exist for height %v", height))
panic(Fmt("BlockValidation does not exist for height %v", height))
}
validation := ReadBinary(&Validation{}, r, &n, &err).(*Validation)
if err != nil {
@ -113,7 +116,29 @@ func (bs *BlockStore) LoadBlockValidation(height uint) *Validation {
return validation
}
func (bs *BlockStore) SaveBlock(block *Block, blockParts *PartSet) {
// NOTE: the Commit-vote heights are for the block at `height`
func (bs *BlockStore) LoadSeenValidation(height uint) *Validation {
var n int64
var err error
r := bs.GetReader(calcSeenValidationKey(height))
if r == nil {
panic(Fmt("SeenValidation does not exist for height %v", height))
}
validation := ReadBinary(&Validation{}, r, &n, &err).(*Validation)
if err != nil {
panic(Fmt("Error reading validation: %v", err))
}
return validation
}
// blockParts: Must be parts of the block
// seenValidation: The +2/3 commits that were seen which finalized the height.
// If all the nodes restart after committing a block,
// we need this to reload the commits to catch-up nodes to the
// most recent height. Otherwise they'd stall at H-1.
// Also good to have to debug consensus issues & punish wrong-signers
// whose commits weren't included in the block.
func (bs *BlockStore) SaveBlock(block *Block, blockParts *PartSet, seenValidation *Validation) {
height := block.Height
if height != bs.height+1 {
panic(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.height+1, height))
@ -132,9 +157,13 @@ func (bs *BlockStore) SaveBlock(block *Block, blockParts *PartSet) {
bs.saveBlockPart(height, i, blockParts.GetPart(i))
}
// Save block validation (duplicate and separate)
validationBytes := BinaryBytes(block.Validation)
bs.db.Set(calcBlockValidationKey(height), validationBytes)
// Save block validation (duplicate and separate from the Block)
blockValidationBytes := BinaryBytes(block.Validation)
bs.db.Set(calcBlockValidationKey(height), blockValidationBytes)
// Save seen validation (seen +2/3 commits)
seenValidationBytes := BinaryBytes(seenValidation)
bs.db.Set(calcSeenValidationKey(height), seenValidationBytes)
// Save new BlockStoreStateJSON descriptor
BlockStoreStateJSON{Height: height}.Save(bs.db)
@ -181,6 +210,10 @@ func calcBlockValidationKey(height uint) []byte {
return []byte(fmt.Sprintf("V:%v", height))
}
func calcSeenValidationKey(height uint) []byte {
return []byte(fmt.Sprintf("SV:%v", height))
}
//-----------------------------------------------------------------------------
var blockStoreKey = []byte("blockStore")

View File

@ -44,6 +44,9 @@ func NewNode() *Node {
var privValidator *state_.PrivValidator
if _, err := os.Stat(config.App.GetString("PrivValidatorFile")); err == nil {
privValidator = state_.LoadPrivValidator(config.App.GetString("PrivValidatorFile"))
log.Info("Loaded PrivValidator", "file", config.App.GetString("PrivValidatorFile"), "privValidator", privValidator)
} else {
log.Info("No PrivValidator found", "file", config.App.GetString("PrivValidatorFile"))
}
// Get PEXReactor

View File

@ -7,18 +7,19 @@ import (
"path/filepath"
"strings"
"github.com/tendermint/confer"
flag "github.com/spf13/pflag"
"github.com/tendermint/confer"
)
var rootDir string
var App *confer.Config
var defaultConfig = `
# This is a TOML config file.
// NOTE: If you change this, maybe also change initDefaults()
var defaultConfig = `# This is a TOML config file.
# For more information, see https://github.com/toml-lang/toml
Network = "tendermint_testnet0"
ListenAddr = "0.0.0.0:0"
ListenAddr = "0.0.0.0:8080"
# First node to connect to. Command-line overridable.
# SeedNode = "a.b.c.d:pppp"
@ -28,9 +29,9 @@ Backend = "leveldb"
# The leveldb data directory.
# Dir = "<YOUR_HOME_DIRECTORY>/.tendermint/data"
[RPC]
[RPC.HTTP]
# For the RPC API HTTP server. Port required.
HTTP.ListenAddr = "0.0.0.0:8080"
ListenAddr = "0.0.0.0:8081"
[Alert]
# TODO: Document options
@ -39,6 +40,21 @@ HTTP.ListenAddr = "0.0.0.0:8080"
# TODO: Document options
`
// NOTE: If you change this, maybe also change defaultConfig
func initDefaults() {
App.SetDefault("Network", "tendermint_testnet0")
App.SetDefault("ListenAddr", "0.0.0.0:8080")
App.SetDefault("DB.Backend", "leveldb")
App.SetDefault("DB.Dir", rootDir+"/data")
App.SetDefault("Log.Level", "debug")
App.SetDefault("Log.Dir", rootDir+"/log")
App.SetDefault("RPC.HTTP.ListenAddr", "0.0.0.0:8081")
App.SetDefault("GenesisFile", rootDir+"/genesis.json")
App.SetDefault("AddrbookFile", rootDir+"/addrbook.json")
App.SetDefault("PrivValidatorfile", rootDir+"/priv_validator.json")
}
func init() {
// Get RootDir
@ -73,20 +89,9 @@ func init() {
if err := App.ReadPaths(paths...); err != nil {
log.Warn("Error reading configuration", "paths", paths, "error", err)
}
}
func initDefaults() {
App.SetDefault("Network", "tendermint_testnet0")
App.SetDefault("ListenAddr", "0.0.0.0:0")
App.SetDefault("DB.Backend", "leveldb")
App.SetDefault("DB.Dir", rootDir+"/data")
App.SetDefault("Log.Level", "debug")
App.SetDefault("Log.Dir", rootDir+"/log")
App.SetDefault("RPC.HTTP.ListenAddr", "0.0.0.0:8080")
App.SetDefault("GenesisFile", rootDir+"/genesis.json")
App.SetDefault("AddrbookFile", rootDir+"/addrbook.json")
App.SetDefault("PrivValidatorfile", rootDir+"/priv_valdiator.json")
// Confused?
// App.Debug()
}
func ParseFlags(args []string) {

View File

@ -24,7 +24,7 @@ const (
peerStateKey = "ConsensusReactor.peerState"
peerGossipSleepDuration = 1000 * time.Millisecond // Time to sleep if there's nothing to send.
peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
)
//-----------------------------------------------------------------------------
@ -264,7 +264,7 @@ OUTER_LOOP:
// NOTE: if we or peer is at RoundStepCommit*, the round
// won't necessarily match, but that's OK.
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) {
log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
//log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok {
part := rs.ProposalBlockParts.GetPart(index)
msg := &PartMessage{
@ -281,7 +281,7 @@ OUTER_LOOP:
// If the peer is on a previous height, help catch up.
if 0 < prs.Height && prs.Height < rs.Height {
log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height)
//log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray)
if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok {
// Ensure that the peer's PartSetHeaeder is correct
blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
@ -310,15 +310,15 @@ OUTER_LOOP:
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
continue OUTER_LOOP
} else {
log.Debug("No parts to send in catch-up, sleeping")
//log.Debug("No parts to send in catch-up, sleeping")
time.Sleep(peerGossipSleepDuration)
continue OUTER_LOOP
}
}
// If height and round doesn't match, sleep.
// If height and round don't match, sleep.
if rs.Height != prs.Height || rs.Round != prs.Round {
log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
//log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
time.Sleep(peerGossipSleepDuration)
continue OUTER_LOOP
}
@ -364,6 +364,9 @@ OUTER_LOOP:
prs := ps.GetRoundState()
trySendVote := func(voteSet *VoteSet, peerVoteSet BitArray) (sent bool) {
// Initialize Prevotes/Precommits/Commits if needed
ps.EnsureVoteBitArrays(prs.Height, voteSet.Size())
// TODO: give priority to our vote.
if index, ok := voteSet.BitArray().Sub(peerVoteSet.Copy()).PickRandom(); ok {
vote := voteSet.GetByIndex(index)
@ -376,6 +379,30 @@ OUTER_LOOP:
return false
}
trySendCommitFromValidation := func(blockMeta *BlockMeta, validation *Validation, peerVoteSet BitArray) (sent bool) {
// Initialize Commits if needed
ps.EnsureVoteBitArrays(prs.Height, uint(len(validation.Commits)))
if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok {
commit := validation.Commits[index]
log.Debug("Picked commit to send", "index", index, "commit", commit)
// Reconstruct vote.
vote := &Vote{
Height: prs.Height,
Round: commit.Round,
Type: VoteTypeCommit,
BlockHash: blockMeta.Hash,
BlockParts: blockMeta.Parts,
Signature: commit.Signature,
}
msg := &VoteMessage{index, vote}
peer.Send(VoteCh, msg)
ps.SetHasVote(vote, index)
return true
}
return false
}
// If height matches, then send LastCommits, Prevotes, Precommits, or Commits.
if rs.Height == prs.Height {
@ -388,9 +415,6 @@ OUTER_LOOP:
}
}
// Initialize Prevotes/Precommits/Commits if needed
ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
// If there are prevotes to send...
if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
if trySendVote(rs.Prevotes, prs.Prevotes) {
@ -411,50 +435,48 @@ OUTER_LOOP:
}
}
// If peer is lagging by height 1, match our LastCommits to peer's Commits.
if rs.Height == prs.Height+1 {
// Catchup logic
if prs.Height != 0 && !prs.HasAllCatchupCommits {
// Initialize Commits if needed
ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommits.Size())
// If there are lastcommits to send...
if trySendVote(rs.LastCommits, prs.Commits) {
continue OUTER_LOOP
// If peer is lagging by height 1, match our LastCommits or SeenValidation to peer's Commits.
if rs.Height == prs.Height+1 && rs.LastCommits.Size() > 0 {
// If there are lastcommits to send...
if trySendVote(rs.LastCommits, prs.Commits) {
continue OUTER_LOOP
} else {
ps.SetHasAllCatchupCommits(prs.Height)
}
}
}
// Or, if peer is lagging by 1 and we don't have LastCommits, send SeenValidation.
if rs.Height == prs.Height+1 && rs.LastCommits.Size() == 0 {
// Load the blockMeta for block at prs.Height
blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
// Load the seen validation for prs.Height
validation := conR.blockStore.LoadSeenValidation(prs.Height)
log.Debug("Loaded SeenValidation for catch-up", "height", prs.Height, "blockMeta", blockMeta, "validation", validation)
// If peer is lagging by more than 1, load and send Validation and send Commits.
if prs.Height != 0 && !prs.HasAllCatchupCommits && rs.Height >= prs.Height+2 {
// Load the block header and validation for prs.Height+1,
// which contains commit signatures for prs.Height.
header, validation := conR.conS.LoadHeaderValidation(prs.Height + 1)
size := uint(len(validation.Commits))
log.Debug("Loaded HeaderValidation for catch-up", "height", prs.Height+1, "header", header, "validation", validation, "size", size)
// Initialize Commits if needed
ps.EnsureVoteBitArrays(prs.Height, size)
if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok {
commit := validation.Commits[index]
log.Debug("Picked commit to send", "index", index, "commit", commit)
// Reconstruct vote.
vote := &Vote{
Height: prs.Height,
Round: commit.Round,
Type: VoteTypeCommit,
BlockHash: header.LastBlockHash,
BlockParts: header.LastBlockParts,
Signature: commit.Signature,
if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
continue OUTER_LOOP
} else {
ps.SetHasAllCatchupCommits(prs.Height)
}
}
// If peer is lagging by more than 1, send Validation.
if rs.Height >= prs.Height+2 {
// Load the blockMeta for block at prs.Height
blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
// Load the block validation for prs.Height+1,
// which contains commit signatures for prs.Height.
validation := conR.blockStore.LoadBlockValidation(prs.Height + 1)
log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height+1, "blockMeta", blockMeta, "validation", validation)
if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
continue OUTER_LOOP
} else {
ps.SetHasAllCatchupCommits(prs.Height)
}
msg := &VoteMessage{index, vote}
peer.Send(VoteCh, msg)
ps.SetHasVote(vote, index)
continue OUTER_LOOP
} else {
log.Debug("No commits to send", "ours", validation.BitArray(), "theirs", prs.Commits)
ps.SetHasAllCatchupCommits(prs.Height)
}
}

View File

@ -129,8 +129,8 @@ const (
RoundActionTryFinalize = RoundActionType(0xC2) // Maybe goto RoundStepPropose for next round.
)
func (ra RoundActionType) String() string {
switch ra {
func (rat RoundActionType) String() string {
switch rat {
case RoundActionPropose:
return "RoundActionPropose"
case RoundActionPrevote:
@ -144,7 +144,7 @@ func (ra RoundActionType) String() string {
case RoundActionTryFinalize:
return "RoundActionTryFinalize"
default:
panic(Fmt("Unknown RoundAction %X", ra))
panic(Fmt("Unknown RoundAction %X", rat))
}
}
@ -156,6 +156,10 @@ type RoundAction struct {
Action RoundActionType // Action to perform.
}
func (ra RoundAction) String() string {
return Fmt("RoundAction{H:%v R:%v A:%v}", ra.Height, ra.Round, ra.Action)
}
//-----------------------------------------------------------------------------
// Immutable when returned from ConsensusState.GetRoundState()
@ -237,10 +241,10 @@ type ConsensusState struct {
mtx sync.Mutex
RoundState
state *state.State // State until height-1.
stagedBlock *Block // Cache last staged block.
stagedState *state.State // Cache result of staged block.
lastCommittedHeight uint // Last called saveCommitVoteBlock() on.
state *state.State // State until height-1.
stagedBlock *Block // Cache last staged block.
stagedState *state.State // Cache result of staged block.
lastCommitVoteHeight uint // Last called commitVoteBlock() or saveCommitVoteBlock() on.
}
func NewConsensusState(state *state.State, blockStore *BlockStore, mempoolReactor *mempool.MempoolReactor) *ConsensusState {
@ -358,17 +362,20 @@ ACTION_LOOP:
height, round, action := roundAction.Height, roundAction.Round, roundAction.Action
rs := cs.GetRoundState()
log.Info("Running round action", "height", rs.Height, "round", rs.Round, "step", rs.Step, "action", action, "startTime", rs.StartTime)
// Continue if action is not relevant
if height != rs.Height {
log.Debug("Discarding round action: Height mismatch", "height", rs.Height, "roundAction", roundAction)
continue
}
// If action <= RoundActionPrecommit, the round must match too.
if action <= RoundActionPrecommit && round != rs.Round {
log.Debug("Discarding round action: Round mismatch", "round", rs.Round, "roundAction", roundAction)
continue
}
log.Info("Running round action", "height", rs.Height, "round", rs.Round, "step", rs.Step, "roundAction", roundAction, "startTime", rs.StartTime)
// Run action
switch action {
case RoundActionPropose:
@ -563,9 +570,15 @@ func (cs *ConsensusState) RunActionPropose(height uint, round uint) {
}()
// Nothing to do if it's not our turn.
if cs.PrivValidator == nil || !bytes.Equal(cs.Validators.Proposer().Address, cs.PrivValidator.Address) {
if cs.PrivValidator == nil {
return
}
if !bytes.Equal(cs.Validators.Proposer().Address, cs.PrivValidator.Address) {
log.Debug("Not our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.PrivValidator)
return
} else {
log.Debug("Our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.PrivValidator)
}
var block *Block
var blockParts *PartSet
@ -585,14 +598,16 @@ func (cs *ConsensusState) RunActionPropose(height uint, round uint) {
// We're creating a proposal for the first block.
// The validation is empty.
validation = &Validation{}
} else if cs.LastCommits.HasTwoThirdsMajority() {
// Make the validation from LastCommits
validation = cs.LastCommits.MakeValidation()
} else {
// We need to create a proposal.
// If we don't have enough commits from the last height,
// we can't do anything.
if !cs.LastCommits.HasTwoThirdsMajority() {
// Upon reboot, we may have to use SeenValidation
validation = cs.blockStore.LoadSeenValidation(height - 1)
if validation == nil {
// We just don't have any validation for the previous block
log.Debug("Cannot propose anything: No validation for the previous block.")
return
} else {
validation = cs.LastCommits.MakeValidation()
}
}
txs := cs.mempoolReactor.Mempool.GetProposalTxs()
@ -788,8 +803,8 @@ func (cs *ConsensusState) RunActionCommit(height uint) {
// We just need to keep waiting.
}
} else {
// We have the block, so save/stage/sign-commit-vote.
cs.saveCommitVoteBlock(cs.ProposalBlock, cs.ProposalBlockParts)
// We have the block, so sign a Commit-vote.
cs.commitVoteBlock(cs.ProposalBlock, cs.ProposalBlockParts)
}
// If we have the block AND +2/3 commits, queue RoundActionTryFinalize.
@ -830,7 +845,7 @@ func (cs *ConsensusState) TryFinalizeCommit(height uint) bool {
if err == nil {
log.Debug(Fmt("Finalizing commit of block: %v", cs.ProposalBlock))
// We have the block, so save/stage/sign-commit-vote.
cs.saveCommitVoteBlock(cs.ProposalBlock, cs.ProposalBlockParts)
cs.saveCommitVoteBlock(cs.ProposalBlock, cs.ProposalBlockParts, cs.Commits)
// Increment height.
cs.updateToState(cs.stagedState)
// cs.Step is now RoundStepNewHeight or RoundStepNewRound
@ -945,16 +960,6 @@ func (cs *ConsensusState) AddVote(address []byte, vote *Vote) (added bool, index
return cs.addVote(address, vote)
}
// TODO: Maybe move this out of here?
func (cs *ConsensusState) LoadHeaderValidation(height uint) (*Header, *Validation) {
meta := cs.blockStore.LoadBlockMeta(height)
if meta == nil {
return nil, nil
}
validation := cs.blockStore.LoadBlockValidation(height)
return meta.Header, validation
}
//-----------------------------------------------------------------------------
func (cs *ConsensusState) addVote(address []byte, vote *Vote) (added bool, index uint, err error) {
@ -1039,32 +1044,53 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header PartSetHea
}
}
func (cs *ConsensusState) saveCommitVoteBlock(block *Block, blockParts *PartSet) {
// Only run once per height.
if cs.lastCommittedHeight >= block.Height {
return
} else {
cs.lastCommittedHeight = block.Height
}
// sign a Commit-Vote
func (cs *ConsensusState) commitVoteBlock(block *Block, blockParts *PartSet) {
// The proposal must be valid.
if err := cs.stageBlock(block, blockParts); err != nil {
// Prevent zombies.
log.Warn("+2/3 precommitted an invalid block", "error", err)
log.Warn("commitVoteBlock() an invalid block", "error", err)
return
}
// Save to blockStore
cs.blockStore.SaveBlock(block, blockParts)
// Commit-vote.
if cs.lastCommitVoteHeight < block.Height {
cs.signAddVote(VoteTypeCommit, block.Hash(), blockParts.Header())
cs.lastCommitVoteHeight = block.Height
} else {
log.Error("Duplicate commitVoteBlock() attempt", "lastCommitVoteHeight", cs.lastCommitVoteHeight, "block.Height", block.Height)
}
}
// Save the state
// Save Block, save the +2/3 Commits we've seen,
// and sign a Commit-Vote if we haven't already
func (cs *ConsensusState) saveCommitVoteBlock(block *Block, blockParts *PartSet, commits *VoteSet) {
// The proposal must be valid.
if err := cs.stageBlock(block, blockParts); err != nil {
// Prevent zombies.
log.Warn("saveCommitVoteBlock() an invalid block", "error", err)
return
}
// Save to blockStore.
if cs.blockStore.Height() < block.Height {
seenValidation := commits.MakeValidation()
cs.blockStore.SaveBlock(block, blockParts, seenValidation)
}
// Save the state.
cs.stagedState.Save()
// Update mempool.
cs.mempoolReactor.Mempool.ResetForBlockAndState(block, cs.stagedState)
cs.signAddVote(VoteTypeCommit, block.Hash(), blockParts.Header())
// Commit-vote if we haven't already.
if cs.lastCommitVoteHeight < block.Height {
cs.signAddVote(VoteTypeCommit, block.Hash(), blockParts.Header())
cs.lastCommitVoteHeight = block.Height
}
}
//-----------------------------------------------------------------------------