Fix termination issues and improve tests

This commit is contained in:
Zarko Milosevic 2018-09-26 12:03:01 +02:00
parent cb2e58411f
commit e2cd6b3be9
11 changed files with 513 additions and 304 deletions

View File

@ -40,6 +40,8 @@ IMPROVEMENTS:
BUG FIXES:
- [autofile] \#2428 Group.RotateFile need call Flush() before rename (@goolAdapter)
- [node] \#2434 Make node respond to signal interrupts while sleeping for genesis time
- [consensus] [\#1690](https://github.com/tendermint/tendermint/issues/1690) wait for
timeoutPrecommit before starting next round
- [evidence] \#2515 fix db iter leak (@goolAdapter)
- [common/bit_array] Fixed a bug in the `Or` function
- [common/bit_array] Fixed a bug in the `Sub` function (@bradyjoestar)

View File

@ -475,9 +475,9 @@ type MempoolConfig struct {
// DefaultMempoolConfig returns a default configuration for the Tendermint mempool
func DefaultMempoolConfig() *MempoolConfig {
return &MempoolConfig{
Recheck: true,
Broadcast: true,
WalPath: "",
Recheck: true,
Broadcast: true,
WalPath: "",
// Each signature verification takes .5ms, size reduced until we implement
// ABCI Recheck
Size: 5000,

View File

@ -7,6 +7,7 @@ import (
"io/ioutil"
"os"
"path"
"reflect"
"sort"
"sync"
"testing"
@ -306,22 +307,173 @@ func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
//-------------------------------------------------------------------------------
func ensureNoNewStep(stepCh <-chan interface{}) {
timer := time.NewTimer(ensureTimeout)
func ensureNoNewEvent(t *testing.T, ch <-chan interface{}, timeout time.Duration,
errorMessage string) {
select {
case <-timer.C:
case <-time.After(timeout):
break
case <-stepCh:
panic("We should be stuck waiting, not moving to the next step")
case <-ch:
panic(errorMessage)
}
}
func ensureNewStep(stepCh <-chan interface{}) {
timer := time.NewTimer(ensureTimeout)
func ensureNoNewEventOnChannel(t *testing.T, ch <-chan interface{}) {
ensureNoNewEvent(t, ch, ensureTimeout, "We should be stuck waiting, "+
"not receiving new event on the channel")
}
func ensureNoNewRoundStep(t *testing.T, stepCh <-chan interface{}) {
ensureNoNewEvent(t, stepCh, ensureTimeout, "We should be stuck waiting, "+
"not receiving NewRoundStep event")
}
func ensureNoNewUnlock(t *testing.T, unlockCh <-chan interface{}) {
ensureNoNewEvent(t, unlockCh, ensureTimeout, "We should be stuck waiting, "+
"not receiving Unlock event")
}
func ensureNoNewTimeout(t *testing.T, stepCh <-chan interface{}, timeout int64) {
timeoutDuration := time.Duration(timeout*5) * time.Nanosecond
ensureNoNewEvent(t, stepCh, timeoutDuration, "We should be stuck waiting, "+
"not receiving NewTimeout event")
}
func ensureNewEvent(t *testing.T, ch <-chan interface{}, height int64, round int,
timeout time.Duration, errorMessage string) {
select {
case <-timer.C:
panic("We shouldnt be stuck waiting")
case <-stepCh:
case <-time.After(timeout):
panic(errorMessage)
case ev := <-ch:
rs, ok := ev.(types.EventDataRoundState)
if !ok {
panic(fmt.Sprintf("expected a *types.EventDataRoundState, "+
"got %v. wrong subscription channel?",
reflect.TypeOf(rs)))
}
if rs.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
}
if rs.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
}
// TODO: We could check also for a step at this point!
}
}
func ensureNewRoundStep(t *testing.T, stepCh <-chan interface{}, height int64, round int) {
ensureNewEvent(t, stepCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewStep event")
}
func ensureNewVote(t *testing.T, voteCh <-chan interface{}, height int64, round int) {
select {
case <-time.After(ensureTimeout):
break
case v := <-voteCh:
edv, ok := v.(types.EventDataVote)
if !ok {
panic(fmt.Sprintf("expected a *types.Vote, "+
"got %v. wrong subscription channel?",
reflect.TypeOf(v)))
}
vote := edv.Vote
if vote.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, vote.Height))
}
if vote.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, vote.Round))
}
}
}
func ensureNewRound(t *testing.T, roundCh <-chan interface{}, height int64, round int) {
ensureNewEvent(t, roundCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewRound event")
}
func ensureNewTimeout(t *testing.T, timeoutCh <-chan interface{}, height int64, round int, timeout int64) {
timeoutDuration := time.Duration(timeout*5) * time.Nanosecond
ensureNewEvent(t, timeoutCh, height, round, timeoutDuration,
"Timeout expired while waiting for NewTimeout event")
}
func ensureNewProposal(t *testing.T, proposalCh <-chan interface{}, height int64, round int) {
ensureNewEvent(t, proposalCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewProposal event")
}
func ensureNewBlock(t *testing.T, blockCh <-chan interface{}, height int64) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewBlock event")
case ev := <-blockCh:
block, ok := ev.(types.EventDataNewBlock)
if !ok {
panic(fmt.Sprintf("expected a *types.EventDataNewBlock, "+
"got %v. wrong subscription channel?",
reflect.TypeOf(block)))
}
if block.Block.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, block.Block.Height))
}
}
}
func ensureNewBlockHeader(t *testing.T, blockCh <-chan interface{}, height int64, blockHash cmn.HexBytes) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewBlockHeader event")
case ev := <-blockCh:
blockHeader, ok := ev.(types.EventDataNewBlockHeader)
if !ok {
panic(fmt.Sprintf("expected a *types.EventDataNewBlockHeader, "+
"got %v. wrong subscription channel?",
reflect.TypeOf(blockHeader)))
}
if blockHeader.Header.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, blockHeader.Header.Height))
}
if !bytes.Equal(blockHeader.Header.Hash(), blockHash) {
panic(fmt.Sprintf("expected header %v, got %v", blockHash, blockHeader.Header.Hash()))
}
}
}
func ensureNewUnlock(t *testing.T, unlockCh <-chan interface{}, height int64, round int) {
ensureNewEvent(t, unlockCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewUnlock event")
}
func ensureVote(t *testing.T, voteCh <-chan interface{}, height int64, round int,
voteType byte) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewVote event")
case v := <-voteCh:
edv, ok := v.(types.EventDataVote)
if !ok {
panic(fmt.Sprintf("expected a *types.Vote, "+
"got %v. wrong subscription channel?",
reflect.TypeOf(v)))
}
vote := edv.Vote
if vote.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, vote.Height))
}
if vote.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, vote.Round))
}
if vote.Type != voteType {
panic(fmt.Sprintf("expected type %v, got %v", voteType, vote.Type))
}
}
}
func ensureNewEventOnChannel(t *testing.T, ch <-chan interface{}) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for new activity on the channel")
case <-ch:
break
}
}

View File

@ -28,12 +28,12 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {
newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
startTestRound(cs, height, round)
ensureNewStep(newBlockCh) // first block gets committed
ensureNoNewStep(newBlockCh)
ensureNewEventOnChannel(t, newBlockCh) // first block gets committed
ensureNoNewEventOnChannel(t, newBlockCh)
deliverTxsRange(cs, 0, 1)
ensureNewStep(newBlockCh) // commit txs
ensureNewStep(newBlockCh) // commit updated app hash
ensureNoNewStep(newBlockCh)
ensureNewEventOnChannel(t, newBlockCh) // commit txs
ensureNewEventOnChannel(t, newBlockCh) // commit updated app hash
ensureNoNewEventOnChannel(t, newBlockCh)
}
func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {
@ -46,9 +46,9 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {
newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
startTestRound(cs, height, round)
ensureNewStep(newBlockCh) // first block gets committed
ensureNoNewStep(newBlockCh) // then we dont make a block ...
ensureNewStep(newBlockCh) // until the CreateEmptyBlocksInterval has passed
ensureNewEventOnChannel(t, newBlockCh) // first block gets committed
ensureNoNewEventOnChannel(t, newBlockCh) // then we dont make a block ...
ensureNewEventOnChannel(t, newBlockCh) // until the CreateEmptyBlocksInterval has passed
}
func TestMempoolProgressInHigherRound(t *testing.T) {
@ -72,13 +72,19 @@ func TestMempoolProgressInHigherRound(t *testing.T) {
}
startTestRound(cs, height, round)
ensureNewStep(newRoundCh) // first round at first height
ensureNewStep(newBlockCh) // first block gets committed
ensureNewStep(newRoundCh) // first round at next height
deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round
<-timeoutCh
ensureNewStep(newRoundCh) // wait for the next round
ensureNewStep(newBlockCh) // now we can commit the block
ensureNewRoundStep(t, newRoundCh, height, round) // first round at first height
ensureNewEventOnChannel(t, newBlockCh) // first block gets committed
height = height + 1 // moving to the next height
round = 0
ensureNewRoundStep(t, newRoundCh, height, round) // first round at next height
deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round
ensureNewTimeout(t, timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds())
round = round + 1 // moving to the next round
ensureNewRoundStep(t, newRoundCh, height, round) // wait for the next round
ensureNewEventOnChannel(t, newBlockCh) // now we can commit the block
}
func deliverTxsRange(cs *ConsensusState, start, end int) {

View File

@ -55,7 +55,7 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool, options
conR := &ConsensusReactor{
conS: consensusState,
fastSync: fastSync,
metrics: NopMetrics(),
metrics: NopMetrics(),
}
conR.updateFastSyncingMetric()
conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR)

View File

@ -83,7 +83,8 @@ type ConsensusState struct {
// internal state
mtx sync.RWMutex
cstypes.RoundState
state sm.State // State until height-1.
triggeredTimeoutPrecommit bool
state sm.State // State until height-1.
// state changes may be triggered by: msgs from peers,
// msgs from ourself, or by timeouts
@ -711,6 +712,7 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) {
cs.enterPrecommit(ti.Height, ti.Round)
case cstypes.RoundStepPrecommitWait:
cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent())
cs.enterPrecommit(ti.Height, ti.Round)
cs.enterNewRound(ti.Height, ti.Round+1)
default:
panic(fmt.Sprintf("Invalid timeout step: %v", ti.Step))
@ -772,6 +774,7 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) {
cs.ProposalBlockParts = nil
}
cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping
cs.triggeredTimeoutPrecommit = false
cs.eventBus.PublishEventNewRound(cs.RoundStateEvent())
cs.metrics.Rounds.Set(float64(round))
@ -782,7 +785,8 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) {
waitForTxs := cs.config.WaitForTxs() && round == 0 && !cs.needProofBlock(height)
if waitForTxs {
if cs.config.CreateEmptyBlocksInterval > 0 {
cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round, cstypes.RoundStepNewRound)
cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round,
cstypes.RoundStepNewRound)
}
go cs.proposalHeartbeat(height, round)
} else {
@ -1013,6 +1017,8 @@ func (cs *ConsensusState) enterPrevote(height int64, round int) {
func (cs *ConsensusState) defaultDoPrevote(height int64, round int) {
logger := cs.Logger.With("height", height, "round", round)
//TODO: Remove this so it is aligned with spec!
// If a block is locked, prevote that.
if cs.LockedBlock != nil {
logger.Info("enterPrevote: Block was locked")
@ -1171,7 +1177,7 @@ func (cs *ConsensusState) enterPrecommit(height int64, round int) {
func (cs *ConsensusState) enterPrecommitWait(height int64, round int) {
logger := cs.Logger.With("height", height, "round", round)
if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPrecommitWait <= cs.Step) {
if cs.Height != height || round < cs.Round || (cs.Round == round && cs.triggeredTimeoutPrecommit) {
logger.Debug(fmt.Sprintf("enterPrecommitWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
return
}
@ -1182,7 +1188,7 @@ func (cs *ConsensusState) enterPrecommitWait(height int64, round int) {
defer func() {
// Done enterPrecommitWait:
cs.updateRoundStep(round, cstypes.RoundStepPrecommitWait)
cs.triggeredTimeoutPrecommit = true
cs.newStep()
}()
@ -1621,14 +1627,18 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool,
}
}
// If +2/3 prevotes for *anything* for this or future round:
if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() {
// If +2/3 prevotes for *anything* for future round:
if cs.Round < vote.Round && prevotes.HasTwoThirdsAny() {
// Round-skip over to PrevoteWait or goto Precommit.
cs.enterNewRound(height, vote.Round) // if the vote is ahead of us
if prevotes.HasTwoThirdsMajority() {
}
// If +2/3 prevotes for *anything* for this round:
if cs.Round == vote.Round && prevotes.HasTwoThirdsAny() {
if prevotes.HasTwoThirdsMajority() &&
(cstypes.RoundStepPrevote == cs.Step || cstypes.RoundStepPrevoteWait == cs.Step) {
cs.enterPrecommit(height, vote.Round)
} else {
cs.enterPrevote(height, vote.Round) // if the vote is ahead of us
} else if cstypes.RoundStepPrevote == cs.Step {
cs.enterPrevoteWait(height, vote.Round)
}
} else if cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round {
@ -1641,28 +1651,25 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool,
case types.VoteTypePrecommit:
precommits := cs.Votes.Precommits(vote.Round)
cs.Logger.Info("Added to precommit", "vote", vote, "precommits", precommits.StringShort())
blockID, ok := precommits.TwoThirdsMajority()
if ok {
if len(blockID.Hash) == 0 {
cs.enterNewRound(height, vote.Round+1)
} else {
if len(blockID.Hash) != 0 {
cs.enterNewRound(height, vote.Round)
cs.enterPrecommit(height, vote.Round)
cs.enterCommit(height, vote.Round)
if cs.config.SkipTimeoutCommit && precommits.HasAll() {
// if we have all the votes now,
// go straight to new round (skip timeout commit)
// cs.scheduleTimeout(time.Duration(0), cs.Height, 0, cstypes.RoundStepNewHeight)
cs.enterNewRound(cs.Height, 0)
}
} else {
cs.enterPrecommitWait(height, vote.Round)
}
} else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() {
// Executed as TwoThirdsMajority could be from a higher round
cs.enterNewRound(height, vote.Round)
cs.enterPrecommit(height, vote.Round)
cs.enterPrecommitWait(height, vote.Round)
}
default:
panic(fmt.Sprintf("Unexpected vote type %X", vote.Type)) // go-wire should prevent this.
}

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,7 @@ import (
"fmt"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

View File

@ -756,7 +756,7 @@ func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) {
func (ch *Channel) updateStats() {
// Exponential decay of stats.
// TODO: optimize.
atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent)) * 0.8))
atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8))
}
//----------------------------------------

View File

@ -102,7 +102,7 @@ type peer struct {
// User data
Data *cmn.CMap
metrics *Metrics
metrics *Metrics
metricsTicker *time.Ticker
}

View File

@ -28,7 +28,7 @@ func CreateRandomPeer(outbound bool) *peer {
ID: netAddr.ID,
ListenAddr: netAddr.DialString(),
},
mconn: &conn.MConnection{},
mconn: &conn.MConnection{},
metrics: NopMetrics(),
}
p.SetLogger(log.TestingLogger().With("peer", addr))