From ee674f919fa165c3c263f03d4f30747e3828fe91 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 21 Jan 2018 13:32:04 -0500 Subject: [PATCH 1/4] StopPeerForError in blockchain and consensus --- blockchain/pool.go | 5 +++-- blockchain/reactor.go | 14 +++++++++----- consensus/reactor.go | 6 +++++- consensus/state.go | 18 +++++++++--------- consensus/types/height_vote_set.go | 9 +++++---- mempool/reactor.go | 4 ++-- state/execution.go | 3 +-- types/validator_set.go | 1 - types/vote_set.go | 10 ++++++---- 9 files changed, 40 insertions(+), 30 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index 164d3b3b..bb589684 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -195,7 +195,8 @@ func (pool *BlockPool) PopRequest() { // Invalidates the block at pool.height, // Remove the peer and redo request from others. -func (pool *BlockPool) RedoRequest(height int64) { +// Returns the ID of the removed peer. +func (pool *BlockPool) RedoRequest(height int64) p2p.ID { pool.mtx.Lock() defer pool.mtx.Unlock() @@ -205,8 +206,8 @@ func (pool *BlockPool) RedoRequest(height int64) { cmn.PanicSanity("Expected block to be non-nil") } // RemovePeer will redo all requesters associated with this peer. - // TODO: record this malfeasance pool.removePeer(request.peerID) + return request.peerID } // TODO: ensure that blocks come in order for each peer. diff --git a/blockchain/reactor.go b/blockchain/reactor.go index ee84a794..1bb82c23 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -3,6 +3,7 @@ package blockchain import ( "bytes" "errors" + "fmt" "reflect" "sync" "time" @@ -171,7 +172,6 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg) - // TODO: improve logic to satisfy megacheck switch msg := msg.(type) { case *bcBlockRequestMessage: if queued := bcR.respondToPeer(msg, src); !queued { @@ -287,16 +287,20 @@ FOR_LOOP: chainID, firstID, first.Height, second.LastCommit) if err != nil { bcR.Logger.Error("Error in validation", "err", err) - bcR.pool.RedoRequest(first.Height) + peerID := bcR.pool.RedoRequest(first.Height) + peer := bcR.Switch.Peers().Get(peerID) + if peer != nil { + bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err)) + } break SYNC_LOOP } else { bcR.pool.PopRequest() + // TODO: batch saves so we dont persist to disk every block bcR.store.SaveBlock(first, firstParts, second.LastCommit) - // NOTE: we could improve performance if we - // didn't make the app commit to disk every block - // ... but we would need a way to get the hash without it persisting + // TODO: same thing for app - but we would need a way to + // get the hash without persisting the state var err error state, err = bcR.blockExec.ApplyBlock(state, firstID, first) if err != nil { diff --git a/consensus/reactor.go b/consensus/reactor.go index 3f6ab506..44ff745c 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -205,7 +205,11 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) return } // Peer claims to have a maj23 for some BlockID at H,R,S, - votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.ID(), msg.BlockID) + err := votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.ID(), msg.BlockID) + if err != nil { + conR.Switch.StopPeerForError(src, err) + return + } // Respond with a VoteSetBitsMessage showing which votes we have. // (and consequently shows which we don't have) var ourVotes *cmn.BitArray diff --git a/consensus/state.go b/consensus/state.go index 56070b03..7b8c8e08 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -771,17 +771,17 @@ func (cs *ConsensusState) enterPropose(height int64, round int) { return } - if !cs.isProposer() { - cs.Logger.Info("enterPropose: Not our turn to propose", "proposer", cs.Validators.GetProposer().Address, "privValidator", cs.privValidator) - if cs.Validators.HasAddress(cs.privValidator.GetAddress()) { - cs.Logger.Debug("This node is a validator") - } else { - cs.Logger.Debug("This node is not a validator") - } - } else { - cs.Logger.Info("enterPropose: Our turn to propose", "proposer", cs.Validators.GetProposer().Address, "privValidator", cs.privValidator) + if cs.Validators.HasAddress(cs.privValidator.GetAddress()) { cs.Logger.Debug("This node is a validator") + } else { + cs.Logger.Debug("This node is not a validator") + } + + if cs.isProposer() { + cs.Logger.Info("enterPropose: Our turn to propose", "proposer", cs.Validators.GetProposer().Address, "privValidator", cs.privValidator) cs.decideProposal(height, round) + } else { + cs.Logger.Info("enterPropose: Not our turn to propose", "proposer", cs.Validators.GetProposer().Address, "privValidator", cs.privValidator) } } diff --git a/consensus/types/height_vote_set.go b/consensus/types/height_vote_set.go index 1435cf42..17ef334d 100644 --- a/consensus/types/height_vote_set.go +++ b/consensus/types/height_vote_set.go @@ -1,6 +1,7 @@ package types import ( + "fmt" "strings" "sync" @@ -207,15 +208,15 @@ func (hvs *HeightVoteSet) StringIndented(indent string) string { // NOTE: if there are too many peers, or too much peer churn, // this can cause memory issues. // TODO: implement ability to remove peers too -func (hvs *HeightVoteSet) SetPeerMaj23(round int, type_ byte, peerID p2p.ID, blockID types.BlockID) { +func (hvs *HeightVoteSet) SetPeerMaj23(round int, type_ byte, peerID p2p.ID, blockID types.BlockID) error { hvs.mtx.Lock() defer hvs.mtx.Unlock() if !types.IsVoteTypeValid(type_) { - return + return fmt.Errorf("SetPeerMaj23: Invalid vote type %v", type_) } voteSet := hvs.getVoteSet(round, type_) if voteSet == nil { - return + return nil // something we don't know about yet } - voteSet.SetPeerMaj23(peerID, blockID) + return voteSet.SetPeerMaj23(peerID, blockID) } diff --git a/mempool/reactor.go b/mempool/reactor.go index 4523f824..4e43bb0c 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -101,8 +101,8 @@ type PeerState interface { } // Send new mempool txs to peer. -// TODO: Handle mempool or reactor shutdown? -// As is this routine may block forever if no new txs come in. +// TODO: Handle mempool or reactor shutdown - as is this routine +// may block forever if no new txs come in. func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { if !memR.config.Broadcast { return diff --git a/state/execution.go b/state/execution.go index 921799b8..56635da1 100644 --- a/state/execution.go +++ b/state/execution.go @@ -190,11 +190,10 @@ func execBlockOnProxyApp(logger log.Logger, proxyAppConn proxy.AppConnConsensus, } } - // TODO: determine which validators were byzantine byzantineVals := make([]*abci.Evidence, len(block.Evidence.Evidence)) for i, ev := range block.Evidence.Evidence { byzantineVals[i] = &abci.Evidence{ - PubKey: ev.Address(), // XXX + PubKey: ev.Address(), // XXX/TODO Height: ev.Height(), } } diff --git a/types/validator_set.go b/types/validator_set.go index 3876c19d..7e895aba 100644 --- a/types/validator_set.go +++ b/types/validator_set.go @@ -21,7 +21,6 @@ import ( // upon calling .IncrementAccum(). // NOTE: Not goroutine-safe. // NOTE: All get/set to validators should copy the value for safety. -// TODO: consider validator Accum overflow type ValidatorSet struct { // NOTE: persisted via reflect, must be exported. Validators []*Validator `json:"validators"` diff --git a/types/vote_set.go b/types/vote_set.go index a97676f6..584a45e6 100644 --- a/types/vote_set.go +++ b/types/vote_set.go @@ -291,7 +291,7 @@ func (voteSet *VoteSet) addVerifiedVote(vote *Vote, blockKey string, votingPower // this can cause memory issues. // TODO: implement ability to remove peers too // NOTE: VoteSet must not be nil -func (voteSet *VoteSet) SetPeerMaj23(peerID p2p.ID, blockID BlockID) { +func (voteSet *VoteSet) SetPeerMaj23(peerID p2p.ID, blockID BlockID) error { if voteSet == nil { cmn.PanicSanity("SetPeerMaj23() on nil VoteSet") } @@ -303,9 +303,10 @@ func (voteSet *VoteSet) SetPeerMaj23(peerID p2p.ID, blockID BlockID) { // Make sure peer hasn't already told us something. if existing, ok := voteSet.peerMaj23s[peerID]; ok { if existing.Equals(blockID) { - return // Nothing to do + return nil // Nothing to do } else { - return // TODO bad peer! + return fmt.Errorf("SetPeerMaj23: Received conflicting blockID from peer %v. Got %v, expected %v", + peerID, blockID, existing) } } voteSet.peerMaj23s[peerID] = blockID @@ -314,7 +315,7 @@ func (voteSet *VoteSet) SetPeerMaj23(peerID p2p.ID, blockID BlockID) { votesByBlock, ok := voteSet.votesByBlock[blockKey] if ok { if votesByBlock.peerMaj23 { - return // Nothing to do + return nil // Nothing to do } else { votesByBlock.peerMaj23 = true // No need to copy votes, already there. @@ -324,6 +325,7 @@ func (voteSet *VoteSet) SetPeerMaj23(peerID p2p.ID, blockID BlockID) { voteSet.votesByBlock[blockKey] = votesByBlock // No need to copy votes, no votes to copy over. } + return nil } func (voteSet *VoteSet) BitArray() *cmn.BitArray { From 3090b05eb43a31e3c653dc114c0b4d2185da30d5 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 21 Jan 2018 16:26:59 -0500 Subject: [PATCH 2/4] p2p: use conn.Close when peer is nil --- p2p/switch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/switch.go b/p2p/switch.go index 3f026556..ccf2c5eb 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -426,7 +426,7 @@ func (sw *Switch) listenerRoutine(l Listener) { func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) error { peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config) if err != nil { - peer.CloseConn() + conn.Close() // peer is nil return err } peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) From 87087b8acd523889bde0714c3a8033535fbf47c0 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 23 Jan 2018 21:41:13 -0500 Subject: [PATCH 3/4] consensus: minor cosmetic --- consensus/state.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index 7b8c8e08..adf85d08 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -86,7 +86,7 @@ type ConsensusState struct { cstypes.RoundState state sm.State // State until height-1. - // state changes may be triggered by msgs from peers, + // state changes may be triggered by: msgs from peers, // msgs from ourself, or by timeouts peerMsgQueue chan msgInfo internalMsgQueue chan msgInfo @@ -771,11 +771,12 @@ func (cs *ConsensusState) enterPropose(height int64, round int) { return } - if cs.Validators.HasAddress(cs.privValidator.GetAddress()) { - cs.Logger.Debug("This node is a validator") - } else { + // if not a validator, we're done + if !cs.Validators.HasAddress(cs.privValidator.GetAddress()) { cs.Logger.Debug("This node is not a validator") + return } + cs.Logger.Debug("This node is a validator") if cs.isProposer() { cs.Logger.Info("enterPropose: Our turn to propose", "proposer", cs.Validators.GetProposer().Address, "privValidator", cs.privValidator) From 4051391039735ec919b42c90e7718448ab38bb61 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 23 Jan 2018 22:06:40 -0500 Subject: [PATCH 4/4] blockchain: test wip for hard to test functionality [ci skip] --- blockchain/reactor_test.go | 47 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 6f5b14ff..26747ea6 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -49,7 +49,7 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe return bcReactor } -func TestNoBlockMessageResponse(t *testing.T) { +func TestNoBlockResponse(t *testing.T) { maxBlockHeight := int64(20) bcr := newBlockchainReactor(log.TestingLogger(), maxBlockHeight) @@ -73,7 +73,7 @@ func TestNoBlockMessageResponse(t *testing.T) { } // receive a request message from peer, - // wait to hear response + // wait for our response to be received on the peer for _, tt := range tests { reqBlockMsg := &bcBlockRequestMessage{tt.height} reqBlockBytes := wire.BinaryBytes(struct{ BlockchainMessage }{reqBlockMsg}) @@ -97,6 +97,49 @@ func TestNoBlockMessageResponse(t *testing.T) { } } +/* +// NOTE: This is too hard to test without +// an easy way to add test peer to switch +// or without significant refactoring of the module. +// Alternatively we could actually dial a TCP conn but +// that seems extreme. +func TestBadBlockStopsPeer(t *testing.T) { + maxBlockHeight := int64(20) + + bcr := newBlockchainReactor(log.TestingLogger(), maxBlockHeight) + bcr.Start() + defer bcr.Stop() + + // Add some peers in + peer := newbcrTestPeer(p2p.ID(cmn.RandStr(12))) + + // XXX: This doesn't add the peer to anything, + // so it's hard to check that it's later removed + bcr.AddPeer(peer) + assert.True(t, bcr.Switch.Peers().Size() > 0) + + // send a bad block from the peer + // default blocks already dont have commits, so should fail + block := bcr.store.LoadBlock(3) + msg := &bcBlockResponseMessage{Block: block} + peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg}) + + ticker := time.NewTicker(time.Millisecond * 10) + timer := time.NewTimer(time.Second * 2) +LOOP: + for { + select { + case <-ticker.C: + if bcr.Switch.Peers().Size() == 0 { + break LOOP + } + case <-timer.C: + t.Fatal("Timed out waiting to disconnect peer") + } + } +} +*/ + //---------------------------------------------- // utility funcs