From f99e4010f2bfdd7d0624e0d9be8428a9b435235e Mon Sep 17 00:00:00 2001 From: Zarko Milosevic Date: Fri, 21 Sep 2018 20:36:48 +0200 Subject: [PATCH] Add stats related channel between consensus state and reactor (#2388) --- CHANGELOG_PENDING.md | 3 +- cmd/tendermint/commands/init.go | 4 +- cmd/tendermint/commands/testnet.go | 6 +- consensus/metrics.go | 1 - consensus/reactor.go | 86 +++++++++++-------- consensus/reactor_test.go | 127 +++++------------------------ consensus/state.go | 35 +++++--- consensus/state_test.go | 78 ++++++++++++++++++ 8 files changed, 182 insertions(+), 158 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index d471b2fd..52d90ec7 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -33,4 +33,5 @@ IMPROVEMENTS: BUG FIXES: - [node] \#2294 Delay starting node until Genesis time -- [rpc] \#2460 StartHTTPAndTLSServer() now passes StartTLS() errors back to the caller rather than hanging forever. +- [consensus] \#2048 Correct peer statistics for marking peer as good +- [rpc] \#2460 StartHTTPAndTLSServer() now passes StartTLS() errors back to the caller rather than hanging forever. \ No newline at end of file diff --git a/cmd/tendermint/commands/init.go b/cmd/tendermint/commands/init.go index 5136ded3..85ee4491 100644 --- a/cmd/tendermint/commands/init.go +++ b/cmd/tendermint/commands/init.go @@ -59,8 +59,8 @@ func initFilesWithConfig(config *cfg.Config) error { } genDoc.Validators = []types.GenesisValidator{{ Address: pv.GetPubKey().Address(), - PubKey: pv.GetPubKey(), - Power: 10, + PubKey: pv.GetPubKey(), + Power: 10, }} if err := genDoc.SaveAs(genFile); err != nil { diff --git a/cmd/tendermint/commands/testnet.go b/cmd/tendermint/commands/testnet.go index 19f137e4..0f7dd79a 100644 --- a/cmd/tendermint/commands/testnet.go +++ b/cmd/tendermint/commands/testnet.go @@ -92,9 +92,9 @@ func testnetFiles(cmd *cobra.Command, args []string) error { pv := privval.LoadFilePV(pvFile) genVals[i] = types.GenesisValidator{ Address: pv.GetPubKey().Address(), - PubKey: pv.GetPubKey(), - Power: 1, - Name: nodeDirName, + PubKey: pv.GetPubKey(), + Power: 1, + Name: nodeDirName, } } diff --git a/consensus/metrics.go b/consensus/metrics.go index 91ae738d..68d065ec 100644 --- a/consensus/metrics.go +++ b/consensus/metrics.go @@ -85,7 +85,6 @@ func PrometheusMetrics() *Metrics { Help: "Total power of the byzantine validators.", }, []string{}), - BlockIntervalSeconds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Subsystem: "consensus", Name: "block_interval_seconds", diff --git a/consensus/reactor.go b/consensus/reactor.go index 6ba81726..4a915ace 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -29,6 +29,7 @@ const ( maxMsgSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes. blocksToContributeToBecomeGoodPeer = 10000 + votesToContributeToBecomeGoodPeer = 10000 ) //----------------------------------------------------------------------------- @@ -60,6 +61,9 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens func (conR *ConsensusReactor) OnStart() error { conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync()) + // start routine that computes peer statistics for evaluating peer quality + go conR.peerStatsRoutine() + conR.subscribeToBroadcastEvents() if !conR.FastSync() { @@ -258,9 +262,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) ps.ApplyProposalPOLMessage(msg) case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index) - if numBlocks := ps.RecordBlockPart(msg); numBlocks%blocksToContributeToBecomeGoodPeer == 0 { - conR.Switch.MarkPeerAsGood(src) - } + conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} default: conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) @@ -280,9 +282,6 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) ps.EnsureVoteBitArrays(height, valSize) ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) - if blocks := ps.RecordVote(msg.Vote); blocks%blocksToContributeToBecomeGoodPeer == 0 { - conR.Switch.MarkPeerAsGood(src) - } cs.peerMsgQueue <- msgInfo{msg, src.ID()} @@ -794,6 +793,43 @@ OUTER_LOOP: } } +func (conR *ConsensusReactor) peerStatsRoutine() { + for { + if !conR.IsRunning() { + conR.Logger.Info("Stopping peerStatsRoutine") + return + } + + select { + case msg := <-conR.conS.statsMsgQueue: + // Get peer + peer := conR.Switch.Peers().Get(msg.PeerID) + if peer == nil { + conR.Logger.Debug("Attempt to update stats for non-existent peer", + "peer", msg.PeerID) + continue + } + // Get peer state + ps := peer.Get(types.PeerStateKey).(*PeerState) + switch msg.Msg.(type) { + case *VoteMessage: + if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 { + conR.Switch.MarkPeerAsGood(peer) + } + case *BlockPartMessage: + if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 { + conR.Switch.MarkPeerAsGood(peer) + } + } + case <-conR.conS.Quit(): + return + + case <-conR.Quit(): + return + } + } +} + // String returns a string representation of the ConsensusReactor. // NOTE: For now, it is just a hard-coded string to avoid accessing unprotected shared variables. // TODO: improve! @@ -836,15 +872,13 @@ type PeerState struct { // peerStateStats holds internal statistics for a peer. type peerStateStats struct { - LastVoteHeight int64 `json:"last_vote_height"` - Votes int `json:"votes"` - LastBlockPartHeight int64 `json:"last_block_part_height"` - BlockParts int `json:"block_parts"` + Votes int `json:"votes"` + BlockParts int `json:"block_parts"` } func (pss peerStateStats) String() string { - return fmt.Sprintf("peerStateStats{lvh: %d, votes: %d, lbph: %d, blockParts: %d}", - pss.LastVoteHeight, pss.Votes, pss.LastBlockPartHeight, pss.BlockParts) + return fmt.Sprintf("peerStateStats{votes: %d, blockParts: %d}", + pss.Votes, pss.BlockParts) } // NewPeerState returns a new PeerState for the given Peer @@ -1080,18 +1114,14 @@ func (ps *PeerState) ensureVoteBitArrays(height int64, numValidators int) { } } -// RecordVote updates internal statistics for this peer by recording the vote. -// It returns the total number of votes (1 per block). This essentially means -// the number of blocks for which peer has been sending us votes. -func (ps *PeerState) RecordVote(vote *types.Vote) int { +// RecordVote increments internal votes related statistics for this peer. +// It returns the total number of added votes. +func (ps *PeerState) RecordVote() int { ps.mtx.Lock() defer ps.mtx.Unlock() - if ps.Stats.LastVoteHeight >= vote.Height { - return ps.Stats.Votes - } - ps.Stats.LastVoteHeight = vote.Height ps.Stats.Votes++ + return ps.Stats.Votes } @@ -1104,25 +1134,17 @@ func (ps *PeerState) VotesSent() int { return ps.Stats.Votes } -// RecordBlockPart updates internal statistics for this peer by recording the -// block part. It returns the total number of block parts (1 per block). This -// essentially means the number of blocks for which peer has been sending us -// block parts. -func (ps *PeerState) RecordBlockPart(bp *BlockPartMessage) int { +// RecordBlockPart increments internal block part related statistics for this peer. +// It returns the total number of added block parts. +func (ps *PeerState) RecordBlockPart() int { ps.mtx.Lock() defer ps.mtx.Unlock() - if ps.Stats.LastBlockPartHeight >= bp.Height { - return ps.Stats.BlockParts - } - - ps.Stats.LastBlockPartHeight = bp.Height ps.Stats.BlockParts++ return ps.Stats.BlockParts } -// BlockPartsSent returns the number of blocks for which peer has been sending -// us block parts. +// BlockPartsSent returns the number of useful block parts the peer has sent us. func (ps *PeerState) BlockPartsSent() int { ps.mtx.Lock() defer ps.mtx.Unlock() diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 2c4c4452..41bddbd6 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -11,20 +11,16 @@ import ( "testing" "time" - abcicli "github.com/tendermint/tendermint/abci/client" + "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/abci/example/kvstore" abci "github.com/tendermint/tendermint/abci/types" bc "github.com/tendermint/tendermint/blockchain" - cmn "github.com/tendermint/tendermint/libs/common" + cfg "github.com/tendermint/tendermint/config" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" mempl "github.com/tendermint/tendermint/mempool" - sm "github.com/tendermint/tendermint/state" - tmtime "github.com/tendermint/tendermint/types/time" - - cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" - p2pdummy "github.com/tendermint/tendermint/p2p/dummy" + sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "github.com/stretchr/testify/assert" @@ -246,110 +242,25 @@ func TestReactorProposalHeartbeats(t *testing.T) { }, css) } -// Test we record block parts from other peers -func TestReactorRecordsBlockParts(t *testing.T) { - // create dummy peer - peer := p2pdummy.NewPeer() - ps := NewPeerState(peer).SetLogger(log.TestingLogger()) - peer.Set(types.PeerStateKey, ps) +// Test we record stats about votes and block parts from other peers. +func TestReactorRecordsVotesAndBlockParts(t *testing.T) { + N := 4 + css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) + reactors, eventChans, eventBuses := startConsensusNet(t, css, N) + defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) - // create reactor - css := randConsensusNet(1, "consensus_reactor_records_block_parts_test", newMockTickerFunc(true), newPersistentKVStore) - reactor := NewConsensusReactor(css[0], false) // so we dont start the consensus states - reactor.SetEventBus(css[0].eventBus) - reactor.SetLogger(log.TestingLogger()) - sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) - reactor.SetSwitch(sw) - err := reactor.Start() - require.NoError(t, err) - defer reactor.Stop() + // wait till everyone makes the first new block + timeoutWaitGroup(t, N, func(j int) { + <-eventChans[j] + }, css) - // 1) new block part - parts := types.NewPartSetFromData(cmn.RandBytes(100), 10) - msg := &BlockPartMessage{ - Height: 2, - Round: 0, - Part: parts.GetPart(0), - } - bz, err := cdc.MarshalBinaryBare(msg) - require.NoError(t, err) + // Get peer + peer := reactors[1].Switch.Peers().List()[0] + // Get peer state + ps := peer.Get(types.PeerStateKey).(*PeerState) - reactor.Receive(DataChannel, peer, bz) - require.Equal(t, 1, ps.BlockPartsSent(), "number of block parts sent should have increased by 1") - - // 2) block part with the same height, but different round - msg.Round = 1 - - bz, err = cdc.MarshalBinaryBare(msg) - require.NoError(t, err) - - reactor.Receive(DataChannel, peer, bz) - require.Equal(t, 1, ps.BlockPartsSent(), "number of block parts sent should stay the same") - - // 3) block part from earlier height - msg.Height = 1 - msg.Round = 0 - - bz, err = cdc.MarshalBinaryBare(msg) - require.NoError(t, err) - - reactor.Receive(DataChannel, peer, bz) - require.Equal(t, 1, ps.BlockPartsSent(), "number of block parts sent should stay the same") -} - -// Test we record votes from other peers. -func TestReactorRecordsVotes(t *testing.T) { - // Create dummy peer. - peer := p2pdummy.NewPeer() - ps := NewPeerState(peer).SetLogger(log.TestingLogger()) - peer.Set(types.PeerStateKey, ps) - - // Create reactor. - css := randConsensusNet(1, "consensus_reactor_records_votes_test", newMockTickerFunc(true), newPersistentKVStore) - reactor := NewConsensusReactor(css[0], false) // so we dont start the consensus states - reactor.SetEventBus(css[0].eventBus) - reactor.SetLogger(log.TestingLogger()) - sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) - reactor.SetSwitch(sw) - err := reactor.Start() - require.NoError(t, err) - defer reactor.Stop() - _, val := css[0].state.Validators.GetByIndex(0) - - // 1) new vote - vote := &types.Vote{ - ValidatorIndex: 0, - ValidatorAddress: val.Address, - Height: 2, - Round: 0, - Timestamp: tmtime.Now(), - Type: types.VoteTypePrevote, - BlockID: types.BlockID{}, - } - bz, err := cdc.MarshalBinaryBare(&VoteMessage{vote}) - require.NoError(t, err) - - reactor.Receive(VoteChannel, peer, bz) - assert.Equal(t, 1, ps.VotesSent(), "number of votes sent should have increased by 1") - - // 2) vote with the same height, but different round - vote.Round = 1 - - bz, err = cdc.MarshalBinaryBare(&VoteMessage{vote}) - require.NoError(t, err) - - reactor.Receive(VoteChannel, peer, bz) - assert.Equal(t, 1, ps.VotesSent(), "number of votes sent should stay the same") - - // 3) vote from earlier height - vote.Height = 1 - vote.Round = 0 - - bz, err = cdc.MarshalBinaryBare(&VoteMessage{vote}) - require.NoError(t, err) - - reactor.Receive(VoteChannel, peer, bz) - assert.Equal(t, 1, ps.VotesSent(), "number of votes sent should stay the same") + assert.Equal(t, true, ps.VotesSent() > 0, "number of votes sent should have increased") + assert.Equal(t, true, ps.BlockPartsSent() > 0, "number of votes sent should have increased") } //------------------------------------------------------------- diff --git a/consensus/state.go b/consensus/state.go index 3cc29b2b..3ee1cfbf 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -91,6 +91,10 @@ type ConsensusState struct { internalMsgQueue chan msgInfo timeoutTicker TimeoutTicker + // information about about added votes and block parts are written on this channel + // so statistics can be computed by reactor + statsMsgQueue chan msgInfo + // we use eventBus to trigger msg broadcasts in the reactor, // and to notify external subscribers, eg. through a websocket eventBus *types.EventBus @@ -141,6 +145,7 @@ func NewConsensusState( peerMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize), timeoutTicker: NewTimeoutTicker(), + statsMsgQueue: make(chan msgInfo, msgQueueSize), done: make(chan struct{}), doWALCatchup: true, wal: nilWAL{}, @@ -639,7 +644,11 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) { err = cs.setProposal(msg.Proposal) case *BlockPartMessage: // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit - _, err = cs.addProposalBlockPart(msg, peerID) + added, err := cs.addProposalBlockPart(msg, peerID) + if added { + cs.statsMsgQueue <- mi + } + if err != nil && msg.Round != cs.Round { cs.Logger.Debug("Received block part from wrong round", "height", cs.Height, "csRound", cs.Round, "blockRound", msg.Round) err = nil @@ -647,7 +656,11 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) { case *VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature // if the vote gives us a 2/3-any or 2/3-one, we transition - err := cs.tryAddVote(msg.Vote, peerID) + added, err := cs.tryAddVote(msg.Vote, peerID) + if added { + cs.statsMsgQueue <- mi + } + if err == ErrAddingVote { // TODO: punish peer // We probably don't want to stop the peer here. The vote does not @@ -1454,7 +1467,7 @@ func (cs *ConsensusState) addProposalBlockPart(msg *BlockPartMessage, peerID p2p int64(cs.state.ConsensusParams.BlockSize.MaxBytes), ) if err != nil { - return true, err + return added, err } // NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash()) @@ -1484,35 +1497,35 @@ func (cs *ConsensusState) addProposalBlockPart(msg *BlockPartMessage, peerID p2p // If we're waiting on the proposal block... cs.tryFinalizeCommit(height) } - return true, nil + return added, nil } return added, nil } // Attempt to add the vote. if its a duplicate signature, dupeout the validator -func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerID p2p.ID) error { - _, err := cs.addVote(vote, peerID) +func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerID p2p.ID) (bool, error) { + added, err := cs.addVote(vote, peerID) if err != nil { // If the vote height is off, we'll just ignore it, // But if it's a conflicting sig, add it to the cs.evpool. // If it's otherwise invalid, punish peer. if err == ErrVoteHeightMismatch { - return err + return added, err } else if voteErr, ok := err.(*types.ErrVoteConflictingVotes); ok { if bytes.Equal(vote.ValidatorAddress, cs.privValidator.GetAddress()) { cs.Logger.Error("Found conflicting vote from ourselves. Did you unsafe_reset a validator?", "height", vote.Height, "round", vote.Round, "type", vote.Type) - return err + return added, err } cs.evpool.AddEvidence(voteErr.DuplicateVoteEvidence) - return err + return added, err } else { // Probably an invalid signature / Bad peer. // Seems this can also err sometimes with "Unexpected step" - perhaps not from a bad peer ? cs.Logger.Error("Error attempting to add vote", "err", err) - return ErrAddingVote + return added, ErrAddingVote } } - return nil + return added, nil } //----------------------------------------------------------------------------- diff --git a/consensus/state_test.go b/consensus/state_test.go index 9c363150..32fc5fd6 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -7,9 +7,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + cstypes "github.com/tendermint/tendermint/consensus/types" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" + p2pdummy "github.com/tendermint/tendermint/p2p/dummy" "github.com/tendermint/tendermint/types" ) @@ -1081,6 +1085,80 @@ func TestStateHalt1(t *testing.T) { } } +func TestStateOutputsBlockPartsStats(t *testing.T) { + // create dummy peer + cs, _ := randConsensusState(1) + peer := p2pdummy.NewPeer() + + // 1) new block part + parts := types.NewPartSetFromData(cmn.RandBytes(100), 10) + msg := &BlockPartMessage{ + Height: 1, + Round: 0, + Part: parts.GetPart(0), + } + + cs.ProposalBlockParts = types.NewPartSetFromHeader(parts.Header()) + cs.handleMsg(msgInfo{msg, peer.ID()}) + + statsMessage := <-cs.statsMsgQueue + require.Equal(t, msg, statsMessage.Msg, "") + require.Equal(t, peer.ID(), statsMessage.PeerID, "") + + // sending the same part from different peer + cs.handleMsg(msgInfo{msg, "peer2"}) + + // sending the part with the same height, but different round + msg.Round = 1 + cs.handleMsg(msgInfo{msg, peer.ID()}) + + // sending the part from the smaller height + msg.Height = 0 + cs.handleMsg(msgInfo{msg, peer.ID()}) + + // sending the part from the bigger height + msg.Height = 3 + cs.handleMsg(msgInfo{msg, peer.ID()}) + + select { + case <-cs.statsMsgQueue: + t.Errorf("Should not output stats message after receiving the known block part!") + case <-time.After(50 * time.Millisecond): + } + +} + +func TestStateOutputVoteStats(t *testing.T) { + cs, vss := randConsensusState(2) + // create dummy peer + peer := p2pdummy.NewPeer() + + vote := signVote(vss[1], types.VoteTypePrecommit, []byte("test"), types.PartSetHeader{}) + + voteMessage := &VoteMessage{vote} + cs.handleMsg(msgInfo{voteMessage, peer.ID()}) + + statsMessage := <-cs.statsMsgQueue + require.Equal(t, voteMessage, statsMessage.Msg, "") + require.Equal(t, peer.ID(), statsMessage.PeerID, "") + + // sending the same part from different peer + cs.handleMsg(msgInfo{&VoteMessage{vote}, "peer2"}) + + // sending the vote for the bigger height + incrementHeight(vss[1]) + vote = signVote(vss[1], types.VoteTypePrecommit, []byte("test"), types.PartSetHeader{}) + + cs.handleMsg(msgInfo{&VoteMessage{vote}, peer.ID()}) + + select { + case <-cs.statsMsgQueue: + t.Errorf("Should not output stats message after receiving the known vote or vote from bigger height") + case <-time.After(50 * time.Millisecond): + } + +} + // subscribe subscribes test client to the given query and returns a channel with cap = 1. func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan interface{} { out := make(chan interface{}, 1)