From 714f885dac20dd7e58589c27aa11e9de25f8565a Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 5 Mar 2018 16:51:52 +0400 Subject: [PATCH] mark peer as good if it contributed enough votes or block parts Refs #1147 --- consensus/reactor.go | 49 ++++++++++++++++++++++++++++++++++++++++++++ node/node.go | 2 ++ p2p/switch.go | 15 ++++++++++++++ 3 files changed, 66 insertions(+) diff --git a/consensus/reactor.go b/consensus/reactor.go index b57d8557..f19b0a9a 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -256,6 +256,9 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) ps.ApplyProposalPOLMessage(msg) case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index) + if numBlocks := ps.RecordBlockPart(msg); numBlocks > 10000 { + conR.Switch.MarkPeerAsGood(src) + } conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} default: conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg))) @@ -275,6 +278,9 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) ps.EnsureVoteBitArrays(height, valSize) ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) + if blocks := ps.RecordVote(msg.Vote); blocks > 10000 { + conR.Switch.MarkPeerAsGood(src) + } cs.peerMsgQueue <- msgInfo{msg, src.ID()} @@ -869,6 +875,17 @@ type PeerState struct { mtx sync.Mutex cstypes.PeerRoundState + + stats *peerStateStats +} + +// peerStateStats holds internal statistics for a peer. +type peerStateStats struct { + lastVoteHeight int64 + votes int + + lastBlockPartHeight int64 + blockParts int } // NewPeerState returns a new PeerState for the given Peer @@ -882,6 +899,7 @@ func NewPeerState(peer p2p.Peer) *PeerState { LastCommitRound: -1, CatchupCommitRound: -1, }, + stats: &peerStateStats{}, } } @@ -1093,6 +1111,37 @@ func (ps *PeerState) ensureVoteBitArrays(height int64, numValidators int) { } } +// RecordVote updates internal statistics for this peer by recording the vote. +// It returns the total number of votes (1 per block). This essentially means +// the number of blocks for which peer has been sending us block parts. +func (ps *PeerState) RecordVote(vote *types.Vote) int { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + if ps.stats.lastVoteHeight == vote.Height { + return ps.stats.votes + } + ps.stats.lastVoteHeight = vote.Height + ps.stats.votes += 1 + return ps.stats.votes +} + +// RecordVote updates internal statistics for this peer by recording the block part. +// It returns the total number of block parts (1 per block). This essentially means +// the number of blocks for which peer has been sending us block parts. +func (ps *PeerState) RecordBlockPart(bp *BlockPartMessage) int { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + if ps.stats.lastBlockPartHeight == bp.Height { + return ps.stats.blockParts + } + + ps.stats.lastBlockPartHeight = bp.Height + ps.stats.blockParts += 1 + return ps.stats.blockParts +} + // SetHasVote sets the given vote as known by the peer func (ps *PeerState) SetHasVote(vote *types.Vote) { ps.mtx.Lock() diff --git a/node/node.go b/node/node.go index d40322fa..83ac50ec 100644 --- a/node/node.go +++ b/node/node.go @@ -287,6 +287,8 @@ func NewNode(config *cfg.Config, sw.AddReactor("PEX", pexReactor) } + sw.SetAddrBook(addrBook) + // Filter peers by addr or pubkey with an ABCI query. // If the query return code is OK, add peer. // XXX: Query format subject to change diff --git a/p2p/switch.go b/p2p/switch.go index cffadf3b..63deace2 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -35,6 +35,7 @@ const ( type AddrBook interface { AddAddress(addr *NetAddress, src *NetAddress) error + MarkGood(*NetAddress) Save() } @@ -57,6 +58,7 @@ type Switch struct { dialing *cmn.CMap nodeInfo NodeInfo // our node info nodeKey *NodeKey // our node privkey + addrBook AddrBook filterConnByAddr func(net.Addr) error filterConnByID func(ID) error @@ -317,6 +319,19 @@ func (sw *Switch) reconnectToPeer(peer Peer) { sw.Logger.Error("Failed to reconnect to peer. Giving up", "peer", peer, "elapsed", time.Since(start)) } +// SetAddrBook allows to set address book on Switch. +func (sw *Switch) SetAddrBook(addrBook AddrBook) { + sw.addrBook = addrBook +} + +// MarkPeerAsGood marks the given peer as good when it did something useful +// like contributed to consensus. +func (sw *Switch) MarkPeerAsGood(peer Peer) { + if sw.addrBook != nil { + sw.addrBook.MarkGood(peer.NodeInfo().NetAddress()) + } +} + //--------------------------------------------------------------------- // Dialing