log warning if peer send failed (Refs #174)

make lint happy

remove dead code

remove not needed go-common dependency

check peer.Send failures (Refs #174)
This commit is contained in:
Anton Kaliaev 2017-01-17 20:58:27 +04:00
parent 62adbe69ff
commit 6dbe9febce
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
4 changed files with 61 additions and 44 deletions

View File

@ -3,11 +3,10 @@ package blockchain
import ( import (
"bytes" "bytes"
"errors" "errors"
"fmt"
"reflect" "reflect"
"time" "time"
. "github.com/tendermint/go-common" cmn "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
@ -17,7 +16,9 @@ import (
) )
const ( const (
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40) BlockchainChannel = byte(0x40)
defaultChannelCapacity = 100 defaultChannelCapacity = 100
defaultSleepIntervalMS = 500 defaultSleepIntervalMS = 500
trySyncIntervalMS = 100 trySyncIntervalMS = 100
@ -55,12 +56,13 @@ type BlockchainReactor struct {
evsw types.EventSwitch evsw types.EventSwitch
} }
// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor { func NewBlockchainReactor(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor {
if state.LastBlockHeight == store.Height()-1 { if state.LastBlockHeight == store.Height()-1 {
store.height -= 1 // XXX HACK, make this better store.height-- // XXX HACK, make this better
} }
if state.LastBlockHeight != store.Height() { if state.LastBlockHeight != store.Height() {
PanicSanity(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())) cmn.PanicSanity(cmn.Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
} }
requestsCh := make(chan BlockRequest, defaultChannelCapacity) requestsCh := make(chan BlockRequest, defaultChannelCapacity)
timeoutsCh := make(chan string, defaultChannelCapacity) timeoutsCh := make(chan string, defaultChannelCapacity)
@ -83,6 +85,7 @@ func NewBlockchainReactor(config cfg.Config, state *sm.State, proxyAppConn proxy
return bcR return bcR
} }
// OnStart implements BaseService
func (bcR *BlockchainReactor) OnStart() error { func (bcR *BlockchainReactor) OnStart() error {
bcR.BaseReactor.OnStart() bcR.BaseReactor.OnStart()
if bcR.fastSync { if bcR.fastSync {
@ -95,12 +98,13 @@ func (bcR *BlockchainReactor) OnStart() error {
return nil return nil
} }
// OnStop implements BaseService
func (bcR *BlockchainReactor) OnStop() { func (bcR *BlockchainReactor) OnStop() {
bcR.BaseReactor.OnStop() bcR.BaseReactor.OnStop()
bcR.pool.Stop() bcR.pool.Stop()
} }
// Implements Reactor // GetChannels implements Reactor
func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{ return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{
@ -111,19 +115,20 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
} }
} }
// Implements Reactor // AddPeer implements Reactor by sending our state to peer.
func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) { func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
// Send peer our state. if !peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}}) {
peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}}) log.Warn("Failed to send our state to peer", "peer", peer)
// doing nothing, will try later in `poolRoutine`
}
} }
// Implements Reactor // RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
// Remove peer from the pool.
bcR.pool.RemovePeer(peer.Key) bcR.pool.RemovePeer(peer.Key)
} }
// Implements Reactor // Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes) _, msg, err := DecodeMessage(msgBytes)
if err != nil { if err != nil {
@ -159,7 +164,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
// Got a peer status. Unverified. // Got a peer status. Unverified.
bcR.pool.SetPeerHeight(src.Key, msg.Height) bcR.pool.SetPeerHeight(src.Key, msg.Height)
default: default:
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) log.Warn(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
} }
} }
@ -245,7 +250,7 @@ FOR_LOOP:
err := bcR.state.ApplyBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader, types.MockMempool{}) err := bcR.state.ApplyBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader, types.MockMempool{})
if err != nil { if err != nil {
// TODO This is bad, are we zombie? // TODO This is bad, are we zombie?
PanicQ(Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
} }
bcR.state.Save() bcR.state.Save()
} }
@ -257,17 +262,13 @@ FOR_LOOP:
} }
} }
func (bcR *BlockchainReactor) BroadcastStatusResponse() error { // BroadcastStatusRequest broadcasts `BlockStore` height.
bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
return nil
}
func (bcR *BlockchainReactor) BroadcastStatusRequest() error { func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.store.Height()}}) bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.store.Height()}})
return nil return nil
} }
// implements events.Eventable // SetEventSwitch implements events.Eventable
func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) { func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) {
bcR.evsw = evsw bcR.evsw = evsw
} }
@ -282,6 +283,7 @@ const (
msgTypeStatusRequest = byte(0x21) msgTypeStatusRequest = byte(0x21)
) )
// BlockchainMessage is a generic message for this reactor.
type BlockchainMessage interface{} type BlockchainMessage interface{}
var _ = wire.RegisterInterface( var _ = wire.RegisterInterface(
@ -292,6 +294,7 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest}, wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
) )
// DecodeMessage decodes BlockchainMessage.
// TODO: ensure that bz is completely read. // TODO: ensure that bz is completely read.
func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) { func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
msgType = bz[0] msgType = bz[0]
@ -299,7 +302,7 @@ func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
r := bytes.NewReader(bz) r := bytes.NewReader(bz)
msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
if err != nil && n != len(bz) { if err != nil && n != len(bz) {
err = errors.New("DecodeMessage() had bytes left over.") err = errors.New("DecodeMessage() had bytes left over")
} }
return return
} }
@ -311,7 +314,7 @@ type bcBlockRequestMessage struct {
} }
func (m *bcBlockRequestMessage) String() string { func (m *bcBlockRequestMessage) String() string {
return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height) return cmn.Fmt("[bcBlockRequestMessage %v]", m.Height)
} }
//------------------------------------- //-------------------------------------
@ -322,7 +325,7 @@ type bcBlockResponseMessage struct {
} }
func (m *bcBlockResponseMessage) String() string { func (m *bcBlockResponseMessage) String() string {
return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height) return cmn.Fmt("[bcBlockResponseMessage %v]", m.Block.Height)
} }
//------------------------------------- //-------------------------------------
@ -332,7 +335,7 @@ type bcStatusRequestMessage struct {
} }
func (m *bcStatusRequestMessage) String() string { func (m *bcStatusRequestMessage) String() string {
return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height) return cmn.Fmt("[bcStatusRequestMessage %v]", m.Height)
} }
//------------------------------------- //-------------------------------------
@ -342,5 +345,5 @@ type bcStatusResponseMessage struct {
} }
func (m *bcStatusResponseMessage) String() string { func (m *bcStatusResponseMessage) String() string {
return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height) return cmn.Fmt("[bcStatusResponseMessage %v]", m.Height)
} }

View File

@ -242,7 +242,7 @@ func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) {
// Send our state to peer. // Send our state to peer.
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !br.reactor.fastSync { if !br.reactor.fastSync {
br.reactor.sendNewRoundStepMessage(peer) br.reactor.sendNewRoundStepMessages(peer)
} }
} }
func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {

View File

@ -127,7 +127,7 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
// Send our state to peer. // Send our state to peer.
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !conR.fastSync { if !conR.fastSync {
conR.sendNewRoundStepMessage(peer) conR.sendNewRoundStepMessages(peer)
} }
} }
@ -201,7 +201,6 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
BlockID: msg.BlockID, BlockID: msg.BlockID,
Votes: ourVotes, Votes: ourVotes,
}}) }})
default: default:
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
} }
@ -365,14 +364,20 @@ func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *
return return
} }
func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) { func (conR *ConsensusReactor) sendNewRoundStepMessages(peer *p2p.Peer) {
log := log.New("peer", peer)
rs := conR.conS.GetRoundState() rs := conR.conS.GetRoundState()
nrsMsg, csMsg := makeRoundStepMessages(rs) nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil { if nrsMsg != nil {
peer.Send(StateChannel, struct{ ConsensusMessage }{nrsMsg}) if !peer.Send(StateChannel, struct{ ConsensusMessage }{nrsMsg}) {
log.Warn("Failed to send NewRoundStepMessage to peer")
}
} }
if csMsg != nil { if csMsg != nil {
peer.Send(StateChannel, struct{ ConsensusMessage }{csMsg}) if !peer.Send(StateChannel, struct{ ConsensusMessage }{csMsg}) {
log.Warn("Failed to send RoundStepCommit to peer")
}
} }
} }
@ -399,8 +404,11 @@ OUTER_LOOP:
Round: rs.Round, // This tells peer that this part applies to us. Round: rs.Round, // This tells peer that this part applies to us.
Part: part, Part: part,
} }
peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
} else {
log.Warn("Failed to send BlockPartMessage to peer")
}
continue OUTER_LOOP continue OUTER_LOOP
} }
} }
@ -435,8 +443,11 @@ OUTER_LOOP:
Round: prs.Round, // Not our height, so it doesn't matter. Round: prs.Round, // Not our height, so it doesn't matter.
Part: part, Part: part,
} }
peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
} else {
log.Warn("Failed to send BlockPartMessage to peer")
}
continue OUTER_LOOP continue OUTER_LOOP
} else { } else {
//log.Info("No parts to send in catch-up, sleeping") //log.Info("No parts to send in catch-up, sleeping")
@ -462,8 +473,11 @@ OUTER_LOOP:
// Proposal: share the proposal metadata with peer. // Proposal: share the proposal metadata with peer.
{ {
msg := &ProposalMessage{Proposal: rs.Proposal} msg := &ProposalMessage{Proposal: rs.Proposal}
peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
ps.SetHasProposal(rs.Proposal) ps.SetHasProposal(rs.Proposal)
} else {
log.Warn("Failed to send ProposalMessage to peer")
}
} }
// ProposalPOL: lets peer know which POL votes we have so far. // ProposalPOL: lets peer know which POL votes we have so far.
// Peer must receive ProposalMessage first. // Peer must receive ProposalMessage first.
@ -475,7 +489,9 @@ OUTER_LOOP:
ProposalPOLRound: rs.Proposal.POLRound, ProposalPOLRound: rs.Proposal.POLRound,
ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(), ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
} }
peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) if !peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
log.Warn("Failed to send ProposalPOLMessage to peer")
}
} }
continue OUTER_LOOP continue OUTER_LOOP
} }
@ -806,13 +822,12 @@ func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
ps.ProposalBlockParts.SetIndex(index, true) ps.ProposalBlockParts.SetIndex(index, true)
} }
// Convenience function to send vote to peer. // PickVoteToSend sends vote to peer.
// Returns true if vote was sent. // Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) { func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
if vote, ok := ps.PickVoteToSend(votes); ok { if vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{vote} msg := &VoteMessage{vote}
ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg}) return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg})
return true
} }
return false return false
} }

View File

@ -6,13 +6,12 @@ import (
"reflect" "reflect"
"time" "time"
abci "github.com/tendermint/abci/types"
"github.com/tendermint/go-clist" "github.com/tendermint/go-clist"
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
abci "github.com/tendermint/abci/types"
) )
const ( const (
@ -80,7 +79,7 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
} }
// broadcasting happens from go routines per peer // broadcasting happens from go routines per peer
default: default:
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) log.Warn(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
} }
} }