diff --git a/consensus/reactor.go b/consensus/reactor.go index 5da5ce56..4fc9fcc0 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -57,6 +57,7 @@ func (conR *ConsensusReactor) OnStart() error { } } go conR.broadcastNewRoundStepRoutine() + go conR.msgProcessor() return nil } @@ -136,8 +137,6 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte return } - // Get peer states - ps := peer.Data.Get(types.PeerStateKey).(*PeerState) _, msg, err := DecodeMessage(msgBytes) if err != nil { log.Warn("Error decoding message", "channel", chID, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes) @@ -146,6 +145,9 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte } log.Debug("Receive", "channel", chID, "peer", peer, "msg", msg) + // Get peer states + ps := peer.Data.Get(types.PeerStateKey).(*PeerState) + switch chID { case StateChannel: switch msg := msg.(type) { @@ -167,12 +169,12 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte switch msg := msg.(type) { case *ProposalMessage: ps.SetHasProposal(msg.Proposal) - err = conR.conS.SetProposal(msg.Proposal) + conR.conS.msgQueue <- msgInfo{msg, peer.Key} case *ProposalPOLMessage: ps.ApplyProposalPOLMessage(msg) case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index) - _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part) + conR.conS.msgQueue <- msgInfo{msg, peer.Key} default: log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) } @@ -184,30 +186,15 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte } switch msg := msg.(type) { case *VoteMessage: - vote, valIndex := msg.Vote, msg.ValidatorIndex - - // attempt to add the vote and dupeout the validator if its a duplicate signature - added, err := conR.conS.TryAddVote(valIndex, vote, peer.Key) - if err == ErrAddingVote { - // TODO: punish peer - } else if err != nil { - return - } - cs := conR.conS cs.mtx.Lock() height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size() cs.mtx.Unlock() ps.EnsureVoteBitArrays(height, valSize) ps.EnsureVoteBitArrays(height-1, lastCommitSize) - ps.SetHasVote(vote, valIndex) + ps.SetHasVote(msg.Vote, msg.ValidatorIndex) - if added { - // If rs.Height == vote.Height && rs.Round < vote.Round, - // the peer is sending us CatchupCommit precommits. - // We could make note of this and help filter in broadcastHasVoteMessage(). - conR.broadcastHasVoteMessage(vote, valIndex) - } + conR.conS.msgQueue <- msgInfo{msg, peer.Key} default: // don't punish (leave room for soft upgrades) @@ -261,6 +248,53 @@ func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { //-------------------------------------- +// a message coming in from the reactor +type msgInfo struct { + msg ConsensusMessage + peerKey string +} + +func (conR *ConsensusReactor) msgProcessor() { + for { + var mi msgInfo + var err error + select { + case mi = <-conR.conS.msgQueue: + case <-conR.Quit: + return + } + + msg, peerKey := mi.msg, mi.peerKey + switch msg := msg.(type) { + case *ProposalMessage: + err = conR.conS.SetProposal(msg.Proposal) + case *BlockPartMessage: + _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part) + case *VoteMessage: + // attempt to add the vote and dupeout the validator if its a duplicate signature + added, err := conR.conS.TryAddVote(msg.ValidatorIndex, msg.Vote, peerKey) + if err == ErrAddingVote { + // TODO: punish peer + } + + if added { + // If rs.Height == vote.Height && rs.Round < vote.Round, + // the peer is sending us CatchupCommit precommits. + // We could make note of this and help filter in broadcastHasVoteMessage(). + conR.broadcastHasVoteMessage(msg.Vote, msg.ValidatorIndex) + } + + } + if err != nil { + log.Warn("Error in msg processor", "error", err) + } + + // TODO: get Proposer into the Data + conR.evsw.FireEvent(types.EventStringConsensusMessage(), &types.EventDataConsensusMessage{msg}) + + } +} + func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { nrsMsg = &NewRoundStepMessage{ Height: rs.Height, diff --git a/consensus/state.go b/consensus/state.go index 867f019d..645fb876 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -150,11 +150,15 @@ func (rs *RoundState) StringShort() string { //----------------------------------------------------------------------------- +var ( + msgQueueSize = 1000 +) + // Tracks consensus state across block heights and rounds. type ConsensusState struct { BaseService - proxyAppCtx proxy.AppContext + proxyAppCtx proxy.AppContext blockStore *bc.BlockStore mempool *mempl.Mempool privValidator *types.PrivValidator @@ -166,16 +170,20 @@ type ConsensusState struct { stagedBlock *types.Block // Cache last staged block. stagedState *sm.State // Cache result of staged block. + // for messages which affect the state (proposals, block parts, votes) + msgQueue chan msgInfo + evsw events.Fireable evc *events.EventCache // set in stageBlock and passed into state } func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState { cs := &ConsensusState{ - proxyAppCtx: proxyAppCtx, - blockStore: blockStore, - mempool: mempool, - newStepCh: make(chan *RoundState, 10), + proxyAppCtx: proxyAppCtx, + blockStore: blockStore, + mempool: mempool, + newStepCh: make(chan *RoundState, 10), + msgQueue: make(chan msgInfo, msgQueueSize), } cs.updateToState(state) // Don't call scheduleRound0 yet. @@ -447,6 +455,12 @@ func (cs *ConsensusState) decideProposal(height, round int) { cs.Proposal = proposal cs.ProposalBlock = block cs.ProposalBlockParts = blockParts + + cs.msgQueue <- msgInfo{&ProposalMessage{proposal}, ""} + for i := 0; i < blockParts.Total(); i++ { + part := blockParts.GetPart(i) + cs.msgQueue <- msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""} + } } else { log.Warn("EnterPropose: Error signing proposal", "height", height, "round", round, "error", err) } @@ -1118,6 +1132,8 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part valIndex, _ := cs.Validators.GetByAddress(cs.privValidator.Address) _, _, err := cs.addVote(valIndex, vote, "") log.Notice("Signed and added vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) + // so we fire events for ourself and can run replays + cs.msgQueue <- msgInfo{&VoteMessage{valIndex, vote}, ""} return vote } else { log.Warn("Error signing vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)