diff --git a/block/store.go b/block/store.go index a453895c..7c7df09b 100644 --- a/block/store.go +++ b/block/store.go @@ -99,12 +99,15 @@ func (bs *BlockStore) LoadBlockMeta(height uint) *BlockMeta { return meta } +// NOTE: the Commit-vote heights are for the block at `height-1` +// Since these are included in the subsequent block, the height +// is off by 1. func (bs *BlockStore) LoadBlockValidation(height uint) *Validation { var n int64 var err error r := bs.GetReader(calcBlockValidationKey(height)) if r == nil { - panic(Fmt("Validation does not exist for height %v", height)) + panic(Fmt("BlockValidation does not exist for height %v", height)) } validation := ReadBinary(&Validation{}, r, &n, &err).(*Validation) if err != nil { @@ -113,7 +116,29 @@ func (bs *BlockStore) LoadBlockValidation(height uint) *Validation { return validation } -func (bs *BlockStore) SaveBlock(block *Block, blockParts *PartSet) { +// NOTE: the Commit-vote heights are for the block at `height` +func (bs *BlockStore) LoadSeenValidation(height uint) *Validation { + var n int64 + var err error + r := bs.GetReader(calcSeenValidationKey(height)) + if r == nil { + panic(Fmt("SeenValidation does not exist for height %v", height)) + } + validation := ReadBinary(&Validation{}, r, &n, &err).(*Validation) + if err != nil { + panic(Fmt("Error reading validation: %v", err)) + } + return validation +} + +// blockParts: Must be parts of the block +// seenValidation: The +2/3 commits that were seen which finalized the height. +// If all the nodes restart after committing a block, +// we need this to reload the commits to catch-up nodes to the +// most recent height. Otherwise they'd stall at H-1. +// Also good to have to debug consensus issues & punish wrong-signers +// whose commits weren't included in the block. +func (bs *BlockStore) SaveBlock(block *Block, blockParts *PartSet, seenValidation *Validation) { height := block.Height if height != bs.height+1 { panic(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.height+1, height)) @@ -132,9 +157,13 @@ func (bs *BlockStore) SaveBlock(block *Block, blockParts *PartSet) { bs.saveBlockPart(height, i, blockParts.GetPart(i)) } - // Save block validation (duplicate and separate) - validationBytes := BinaryBytes(block.Validation) - bs.db.Set(calcBlockValidationKey(height), validationBytes) + // Save block validation (duplicate and separate from the Block) + blockValidationBytes := BinaryBytes(block.Validation) + bs.db.Set(calcBlockValidationKey(height), blockValidationBytes) + + // Save seen validation (seen +2/3 commits) + seenValidationBytes := BinaryBytes(seenValidation) + bs.db.Set(calcSeenValidationKey(height), seenValidationBytes) // Save new BlockStoreStateJSON descriptor BlockStoreStateJSON{Height: height}.Save(bs.db) @@ -181,6 +210,10 @@ func calcBlockValidationKey(height uint) []byte { return []byte(fmt.Sprintf("V:%v", height)) } +func calcSeenValidationKey(height uint) []byte { + return []byte(fmt.Sprintf("SV:%v", height)) +} + //----------------------------------------------------------------------------- var blockStoreKey = []byte("blockStore") diff --git a/cmd/daemon.go b/cmd/daemon.go index 10e9b523..fca8b46a 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -44,6 +44,9 @@ func NewNode() *Node { var privValidator *state_.PrivValidator if _, err := os.Stat(config.App.GetString("PrivValidatorFile")); err == nil { privValidator = state_.LoadPrivValidator(config.App.GetString("PrivValidatorFile")) + log.Info("Loaded PrivValidator", "file", config.App.GetString("PrivValidatorFile"), "privValidator", privValidator) + } else { + log.Info("No PrivValidator found", "file", config.App.GetString("PrivValidatorFile")) } // Get PEXReactor diff --git a/config/config.go b/config/config.go index f0dfdc11..d7d7b194 100644 --- a/config/config.go +++ b/config/config.go @@ -7,18 +7,19 @@ import ( "path/filepath" "strings" - "github.com/tendermint/confer" flag "github.com/spf13/pflag" + "github.com/tendermint/confer" ) var rootDir string var App *confer.Config -var defaultConfig = ` -# This is a TOML config file. + +// NOTE: If you change this, maybe also change initDefaults() +var defaultConfig = `# This is a TOML config file. # For more information, see https://github.com/toml-lang/toml Network = "tendermint_testnet0" -ListenAddr = "0.0.0.0:0" +ListenAddr = "0.0.0.0:8080" # First node to connect to. Command-line overridable. # SeedNode = "a.b.c.d:pppp" @@ -28,9 +29,9 @@ Backend = "leveldb" # The leveldb data directory. # Dir = "/.tendermint/data" -[RPC] +[RPC.HTTP] # For the RPC API HTTP server. Port required. -HTTP.ListenAddr = "0.0.0.0:8080" +ListenAddr = "0.0.0.0:8081" [Alert] # TODO: Document options @@ -39,6 +40,21 @@ HTTP.ListenAddr = "0.0.0.0:8080" # TODO: Document options ` +// NOTE: If you change this, maybe also change defaultConfig +func initDefaults() { + App.SetDefault("Network", "tendermint_testnet0") + App.SetDefault("ListenAddr", "0.0.0.0:8080") + App.SetDefault("DB.Backend", "leveldb") + App.SetDefault("DB.Dir", rootDir+"/data") + App.SetDefault("Log.Level", "debug") + App.SetDefault("Log.Dir", rootDir+"/log") + App.SetDefault("RPC.HTTP.ListenAddr", "0.0.0.0:8081") + + App.SetDefault("GenesisFile", rootDir+"/genesis.json") + App.SetDefault("AddrbookFile", rootDir+"/addrbook.json") + App.SetDefault("PrivValidatorfile", rootDir+"/priv_validator.json") +} + func init() { // Get RootDir @@ -73,20 +89,9 @@ func init() { if err := App.ReadPaths(paths...); err != nil { log.Warn("Error reading configuration", "paths", paths, "error", err) } -} -func initDefaults() { - App.SetDefault("Network", "tendermint_testnet0") - App.SetDefault("ListenAddr", "0.0.0.0:0") - App.SetDefault("DB.Backend", "leveldb") - App.SetDefault("DB.Dir", rootDir+"/data") - App.SetDefault("Log.Level", "debug") - App.SetDefault("Log.Dir", rootDir+"/log") - App.SetDefault("RPC.HTTP.ListenAddr", "0.0.0.0:8080") - - App.SetDefault("GenesisFile", rootDir+"/genesis.json") - App.SetDefault("AddrbookFile", rootDir+"/addrbook.json") - App.SetDefault("PrivValidatorfile", rootDir+"/priv_valdiator.json") + // Confused? + // App.Debug() } func ParseFlags(args []string) { diff --git a/consensus/reactor.go b/consensus/reactor.go index bd555f8a..0e29481e 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -24,7 +24,7 @@ const ( peerStateKey = "ConsensusReactor.peerState" - peerGossipSleepDuration = 1000 * time.Millisecond // Time to sleep if there's nothing to send. + peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send. ) //----------------------------------------------------------------------------- @@ -264,7 +264,7 @@ OUTER_LOOP: // NOTE: if we or peer is at RoundStepCommit*, the round // won't necessarily match, but that's OK. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) { - log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts) + //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts) if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok { part := rs.ProposalBlockParts.GetPart(index) msg := &PartMessage{ @@ -281,7 +281,7 @@ OUTER_LOOP: // If the peer is on a previous height, help catch up. if 0 < prs.Height && prs.Height < rs.Height { - log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height) + //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray) if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok { // Ensure that the peer's PartSetHeaeder is correct blockMeta := conR.blockStore.LoadBlockMeta(prs.Height) @@ -310,15 +310,15 @@ OUTER_LOOP: ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) continue OUTER_LOOP } else { - log.Debug("No parts to send in catch-up, sleeping") + //log.Debug("No parts to send in catch-up, sleeping") time.Sleep(peerGossipSleepDuration) continue OUTER_LOOP } } - // If height and round doesn't match, sleep. + // If height and round don't match, sleep. if rs.Height != prs.Height || rs.Round != prs.Round { - log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer) + //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer) time.Sleep(peerGossipSleepDuration) continue OUTER_LOOP } @@ -364,6 +364,9 @@ OUTER_LOOP: prs := ps.GetRoundState() trySendVote := func(voteSet *VoteSet, peerVoteSet BitArray) (sent bool) { + // Initialize Prevotes/Precommits/Commits if needed + ps.EnsureVoteBitArrays(prs.Height, voteSet.Size()) + // TODO: give priority to our vote. if index, ok := voteSet.BitArray().Sub(peerVoteSet.Copy()).PickRandom(); ok { vote := voteSet.GetByIndex(index) @@ -376,6 +379,30 @@ OUTER_LOOP: return false } + trySendCommitFromValidation := func(blockMeta *BlockMeta, validation *Validation, peerVoteSet BitArray) (sent bool) { + // Initialize Commits if needed + ps.EnsureVoteBitArrays(prs.Height, uint(len(validation.Commits))) + + if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok { + commit := validation.Commits[index] + log.Debug("Picked commit to send", "index", index, "commit", commit) + // Reconstruct vote. + vote := &Vote{ + Height: prs.Height, + Round: commit.Round, + Type: VoteTypeCommit, + BlockHash: blockMeta.Hash, + BlockParts: blockMeta.Parts, + Signature: commit.Signature, + } + msg := &VoteMessage{index, vote} + peer.Send(VoteCh, msg) + ps.SetHasVote(vote, index) + return true + } + return false + } + // If height matches, then send LastCommits, Prevotes, Precommits, or Commits. if rs.Height == prs.Height { @@ -388,9 +415,6 @@ OUTER_LOOP: } } - // Initialize Prevotes/Precommits/Commits if needed - ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size()) - // If there are prevotes to send... if rs.Round == prs.Round && prs.Step <= RoundStepPrevote { if trySendVote(rs.Prevotes, prs.Prevotes) { @@ -411,50 +435,48 @@ OUTER_LOOP: } } - // If peer is lagging by height 1, match our LastCommits to peer's Commits. - if rs.Height == prs.Height+1 { + // Catchup logic + if prs.Height != 0 && !prs.HasAllCatchupCommits { - // Initialize Commits if needed - ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommits.Size()) - - // If there are lastcommits to send... - if trySendVote(rs.LastCommits, prs.Commits) { - continue OUTER_LOOP + // If peer is lagging by height 1, match our LastCommits or SeenValidation to peer's Commits. + if rs.Height == prs.Height+1 && rs.LastCommits.Size() > 0 { + // If there are lastcommits to send... + if trySendVote(rs.LastCommits, prs.Commits) { + continue OUTER_LOOP + } else { + ps.SetHasAllCatchupCommits(prs.Height) + } } - } + // Or, if peer is lagging by 1 and we don't have LastCommits, send SeenValidation. + if rs.Height == prs.Height+1 && rs.LastCommits.Size() == 0 { + // Load the blockMeta for block at prs.Height + blockMeta := conR.blockStore.LoadBlockMeta(prs.Height) + // Load the seen validation for prs.Height + validation := conR.blockStore.LoadSeenValidation(prs.Height) + log.Debug("Loaded SeenValidation for catch-up", "height", prs.Height, "blockMeta", blockMeta, "validation", validation) - // If peer is lagging by more than 1, load and send Validation and send Commits. - if prs.Height != 0 && !prs.HasAllCatchupCommits && rs.Height >= prs.Height+2 { - - // Load the block header and validation for prs.Height+1, - // which contains commit signatures for prs.Height. - header, validation := conR.conS.LoadHeaderValidation(prs.Height + 1) - size := uint(len(validation.Commits)) - log.Debug("Loaded HeaderValidation for catch-up", "height", prs.Height+1, "header", header, "validation", validation, "size", size) - - // Initialize Commits if needed - ps.EnsureVoteBitArrays(prs.Height, size) - - if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok { - commit := validation.Commits[index] - log.Debug("Picked commit to send", "index", index, "commit", commit) - // Reconstruct vote. - vote := &Vote{ - Height: prs.Height, - Round: commit.Round, - Type: VoteTypeCommit, - BlockHash: header.LastBlockHash, - BlockParts: header.LastBlockParts, - Signature: commit.Signature, + if trySendCommitFromValidation(blockMeta, validation, prs.Commits) { + continue OUTER_LOOP + } else { + ps.SetHasAllCatchupCommits(prs.Height) + } + } + + // If peer is lagging by more than 1, send Validation. + if rs.Height >= prs.Height+2 { + // Load the blockMeta for block at prs.Height + blockMeta := conR.blockStore.LoadBlockMeta(prs.Height) + // Load the block validation for prs.Height+1, + // which contains commit signatures for prs.Height. + validation := conR.blockStore.LoadBlockValidation(prs.Height + 1) + log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height+1, "blockMeta", blockMeta, "validation", validation) + + if trySendCommitFromValidation(blockMeta, validation, prs.Commits) { + continue OUTER_LOOP + } else { + ps.SetHasAllCatchupCommits(prs.Height) } - msg := &VoteMessage{index, vote} - peer.Send(VoteCh, msg) - ps.SetHasVote(vote, index) - continue OUTER_LOOP - } else { - log.Debug("No commits to send", "ours", validation.BitArray(), "theirs", prs.Commits) - ps.SetHasAllCatchupCommits(prs.Height) } } diff --git a/consensus/state.go b/consensus/state.go index f0f14bb2..c26eca06 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -129,8 +129,8 @@ const ( RoundActionTryFinalize = RoundActionType(0xC2) // Maybe goto RoundStepPropose for next round. ) -func (ra RoundActionType) String() string { - switch ra { +func (rat RoundActionType) String() string { + switch rat { case RoundActionPropose: return "RoundActionPropose" case RoundActionPrevote: @@ -144,7 +144,7 @@ func (ra RoundActionType) String() string { case RoundActionTryFinalize: return "RoundActionTryFinalize" default: - panic(Fmt("Unknown RoundAction %X", ra)) + panic(Fmt("Unknown RoundAction %X", rat)) } } @@ -156,6 +156,10 @@ type RoundAction struct { Action RoundActionType // Action to perform. } +func (ra RoundAction) String() string { + return Fmt("RoundAction{H:%v R:%v A:%v}", ra.Height, ra.Round, ra.Action) +} + //----------------------------------------------------------------------------- // Immutable when returned from ConsensusState.GetRoundState() @@ -237,10 +241,10 @@ type ConsensusState struct { mtx sync.Mutex RoundState - state *state.State // State until height-1. - stagedBlock *Block // Cache last staged block. - stagedState *state.State // Cache result of staged block. - lastCommittedHeight uint // Last called saveCommitVoteBlock() on. + state *state.State // State until height-1. + stagedBlock *Block // Cache last staged block. + stagedState *state.State // Cache result of staged block. + lastCommitVoteHeight uint // Last called commitVoteBlock() or saveCommitVoteBlock() on. } func NewConsensusState(state *state.State, blockStore *BlockStore, mempoolReactor *mempool.MempoolReactor) *ConsensusState { @@ -358,17 +362,20 @@ ACTION_LOOP: height, round, action := roundAction.Height, roundAction.Round, roundAction.Action rs := cs.GetRoundState() - log.Info("Running round action", "height", rs.Height, "round", rs.Round, "step", rs.Step, "action", action, "startTime", rs.StartTime) // Continue if action is not relevant if height != rs.Height { + log.Debug("Discarding round action: Height mismatch", "height", rs.Height, "roundAction", roundAction) continue } // If action <= RoundActionPrecommit, the round must match too. if action <= RoundActionPrecommit && round != rs.Round { + log.Debug("Discarding round action: Round mismatch", "round", rs.Round, "roundAction", roundAction) continue } + log.Info("Running round action", "height", rs.Height, "round", rs.Round, "step", rs.Step, "roundAction", roundAction, "startTime", rs.StartTime) + // Run action switch action { case RoundActionPropose: @@ -563,9 +570,15 @@ func (cs *ConsensusState) RunActionPropose(height uint, round uint) { }() // Nothing to do if it's not our turn. - if cs.PrivValidator == nil || !bytes.Equal(cs.Validators.Proposer().Address, cs.PrivValidator.Address) { + if cs.PrivValidator == nil { return } + if !bytes.Equal(cs.Validators.Proposer().Address, cs.PrivValidator.Address) { + log.Debug("Not our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.PrivValidator) + return + } else { + log.Debug("Our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.PrivValidator) + } var block *Block var blockParts *PartSet @@ -585,14 +598,16 @@ func (cs *ConsensusState) RunActionPropose(height uint, round uint) { // We're creating a proposal for the first block. // The validation is empty. validation = &Validation{} + } else if cs.LastCommits.HasTwoThirdsMajority() { + // Make the validation from LastCommits + validation = cs.LastCommits.MakeValidation() } else { - // We need to create a proposal. - // If we don't have enough commits from the last height, - // we can't do anything. - if !cs.LastCommits.HasTwoThirdsMajority() { + // Upon reboot, we may have to use SeenValidation + validation = cs.blockStore.LoadSeenValidation(height - 1) + if validation == nil { + // We just don't have any validation for the previous block + log.Debug("Cannot propose anything: No validation for the previous block.") return - } else { - validation = cs.LastCommits.MakeValidation() } } txs := cs.mempoolReactor.Mempool.GetProposalTxs() @@ -788,8 +803,8 @@ func (cs *ConsensusState) RunActionCommit(height uint) { // We just need to keep waiting. } } else { - // We have the block, so save/stage/sign-commit-vote. - cs.saveCommitVoteBlock(cs.ProposalBlock, cs.ProposalBlockParts) + // We have the block, so sign a Commit-vote. + cs.commitVoteBlock(cs.ProposalBlock, cs.ProposalBlockParts) } // If we have the block AND +2/3 commits, queue RoundActionTryFinalize. @@ -830,7 +845,7 @@ func (cs *ConsensusState) TryFinalizeCommit(height uint) bool { if err == nil { log.Debug(Fmt("Finalizing commit of block: %v", cs.ProposalBlock)) // We have the block, so save/stage/sign-commit-vote. - cs.saveCommitVoteBlock(cs.ProposalBlock, cs.ProposalBlockParts) + cs.saveCommitVoteBlock(cs.ProposalBlock, cs.ProposalBlockParts, cs.Commits) // Increment height. cs.updateToState(cs.stagedState) // cs.Step is now RoundStepNewHeight or RoundStepNewRound @@ -945,16 +960,6 @@ func (cs *ConsensusState) AddVote(address []byte, vote *Vote) (added bool, index return cs.addVote(address, vote) } -// TODO: Maybe move this out of here? -func (cs *ConsensusState) LoadHeaderValidation(height uint) (*Header, *Validation) { - meta := cs.blockStore.LoadBlockMeta(height) - if meta == nil { - return nil, nil - } - validation := cs.blockStore.LoadBlockValidation(height) - return meta.Header, validation -} - //----------------------------------------------------------------------------- func (cs *ConsensusState) addVote(address []byte, vote *Vote) (added bool, index uint, err error) { @@ -1039,32 +1044,53 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header PartSetHea } } -func (cs *ConsensusState) saveCommitVoteBlock(block *Block, blockParts *PartSet) { - - // Only run once per height. - if cs.lastCommittedHeight >= block.Height { - return - } else { - cs.lastCommittedHeight = block.Height - } +// sign a Commit-Vote +func (cs *ConsensusState) commitVoteBlock(block *Block, blockParts *PartSet) { // The proposal must be valid. if err := cs.stageBlock(block, blockParts); err != nil { // Prevent zombies. - log.Warn("+2/3 precommitted an invalid block", "error", err) + log.Warn("commitVoteBlock() an invalid block", "error", err) return } - // Save to blockStore - cs.blockStore.SaveBlock(block, blockParts) + // Commit-vote. + if cs.lastCommitVoteHeight < block.Height { + cs.signAddVote(VoteTypeCommit, block.Hash(), blockParts.Header()) + cs.lastCommitVoteHeight = block.Height + } else { + log.Error("Duplicate commitVoteBlock() attempt", "lastCommitVoteHeight", cs.lastCommitVoteHeight, "block.Height", block.Height) + } +} - // Save the state +// Save Block, save the +2/3 Commits we've seen, +// and sign a Commit-Vote if we haven't already +func (cs *ConsensusState) saveCommitVoteBlock(block *Block, blockParts *PartSet, commits *VoteSet) { + + // The proposal must be valid. + if err := cs.stageBlock(block, blockParts); err != nil { + // Prevent zombies. + log.Warn("saveCommitVoteBlock() an invalid block", "error", err) + return + } + + // Save to blockStore. + if cs.blockStore.Height() < block.Height { + seenValidation := commits.MakeValidation() + cs.blockStore.SaveBlock(block, blockParts, seenValidation) + } + + // Save the state. cs.stagedState.Save() // Update mempool. cs.mempoolReactor.Mempool.ResetForBlockAndState(block, cs.stagedState) - cs.signAddVote(VoteTypeCommit, block.Hash(), blockParts.Header()) + // Commit-vote if we haven't already. + if cs.lastCommitVoteHeight < block.Height { + cs.signAddVote(VoteTypeCommit, block.Hash(), blockParts.Header()) + cs.lastCommitVoteHeight = block.Height + } } //-----------------------------------------------------------------------------