msgQueue and msgProcessor
This commit is contained in:
parent
f37f578b1d
commit
2c595284d8
|
@ -57,6 +57,7 @@ func (conR *ConsensusReactor) OnStart() error {
|
|||
}
|
||||
}
|
||||
go conR.broadcastNewRoundStepRoutine()
|
||||
go conR.msgProcessor()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -136,8 +137,6 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte
|
|||
return
|
||||
}
|
||||
|
||||
// Get peer states
|
||||
ps := peer.Data.Get(types.PeerStateKey).(*PeerState)
|
||||
_, msg, err := DecodeMessage(msgBytes)
|
||||
if err != nil {
|
||||
log.Warn("Error decoding message", "channel", chID, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes)
|
||||
|
@ -146,6 +145,9 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte
|
|||
}
|
||||
log.Debug("Receive", "channel", chID, "peer", peer, "msg", msg)
|
||||
|
||||
// Get peer states
|
||||
ps := peer.Data.Get(types.PeerStateKey).(*PeerState)
|
||||
|
||||
switch chID {
|
||||
case StateChannel:
|
||||
switch msg := msg.(type) {
|
||||
|
@ -167,12 +169,12 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte
|
|||
switch msg := msg.(type) {
|
||||
case *ProposalMessage:
|
||||
ps.SetHasProposal(msg.Proposal)
|
||||
err = conR.conS.SetProposal(msg.Proposal)
|
||||
conR.conS.msgQueue <- msgInfo{msg, peer.Key}
|
||||
case *ProposalPOLMessage:
|
||||
ps.ApplyProposalPOLMessage(msg)
|
||||
case *BlockPartMessage:
|
||||
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
|
||||
_, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part)
|
||||
conR.conS.msgQueue <- msgInfo{msg, peer.Key}
|
||||
default:
|
||||
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
|
||||
}
|
||||
|
@ -184,30 +186,15 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte
|
|||
}
|
||||
switch msg := msg.(type) {
|
||||
case *VoteMessage:
|
||||
vote, valIndex := msg.Vote, msg.ValidatorIndex
|
||||
|
||||
// attempt to add the vote and dupeout the validator if its a duplicate signature
|
||||
added, err := conR.conS.TryAddVote(valIndex, vote, peer.Key)
|
||||
if err == ErrAddingVote {
|
||||
// TODO: punish peer
|
||||
} else if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
cs := conR.conS
|
||||
cs.mtx.Lock()
|
||||
height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
|
||||
cs.mtx.Unlock()
|
||||
ps.EnsureVoteBitArrays(height, valSize)
|
||||
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
|
||||
ps.SetHasVote(vote, valIndex)
|
||||
ps.SetHasVote(msg.Vote, msg.ValidatorIndex)
|
||||
|
||||
if added {
|
||||
// If rs.Height == vote.Height && rs.Round < vote.Round,
|
||||
// the peer is sending us CatchupCommit precommits.
|
||||
// We could make note of this and help filter in broadcastHasVoteMessage().
|
||||
conR.broadcastHasVoteMessage(vote, valIndex)
|
||||
}
|
||||
conR.conS.msgQueue <- msgInfo{msg, peer.Key}
|
||||
|
||||
default:
|
||||
// don't punish (leave room for soft upgrades)
|
||||
|
@ -261,6 +248,53 @@ func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
|
|||
|
||||
//--------------------------------------
|
||||
|
||||
// a message coming in from the reactor
|
||||
type msgInfo struct {
|
||||
msg ConsensusMessage
|
||||
peerKey string
|
||||
}
|
||||
|
||||
func (conR *ConsensusReactor) msgProcessor() {
|
||||
for {
|
||||
var mi msgInfo
|
||||
var err error
|
||||
select {
|
||||
case mi = <-conR.conS.msgQueue:
|
||||
case <-conR.Quit:
|
||||
return
|
||||
}
|
||||
|
||||
msg, peerKey := mi.msg, mi.peerKey
|
||||
switch msg := msg.(type) {
|
||||
case *ProposalMessage:
|
||||
err = conR.conS.SetProposal(msg.Proposal)
|
||||
case *BlockPartMessage:
|
||||
_, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part)
|
||||
case *VoteMessage:
|
||||
// attempt to add the vote and dupeout the validator if its a duplicate signature
|
||||
added, err := conR.conS.TryAddVote(msg.ValidatorIndex, msg.Vote, peerKey)
|
||||
if err == ErrAddingVote {
|
||||
// TODO: punish peer
|
||||
}
|
||||
|
||||
if added {
|
||||
// If rs.Height == vote.Height && rs.Round < vote.Round,
|
||||
// the peer is sending us CatchupCommit precommits.
|
||||
// We could make note of this and help filter in broadcastHasVoteMessage().
|
||||
conR.broadcastHasVoteMessage(msg.Vote, msg.ValidatorIndex)
|
||||
}
|
||||
|
||||
}
|
||||
if err != nil {
|
||||
log.Warn("Error in msg processor", "error", err)
|
||||
}
|
||||
|
||||
// TODO: get Proposer into the Data
|
||||
conR.evsw.FireEvent(types.EventStringConsensusMessage(), &types.EventDataConsensusMessage{msg})
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
|
||||
nrsMsg = &NewRoundStepMessage{
|
||||
Height: rs.Height,
|
||||
|
|
|
@ -150,11 +150,15 @@ func (rs *RoundState) StringShort() string {
|
|||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
var (
|
||||
msgQueueSize = 1000
|
||||
)
|
||||
|
||||
// Tracks consensus state across block heights and rounds.
|
||||
type ConsensusState struct {
|
||||
BaseService
|
||||
|
||||
proxyAppCtx proxy.AppContext
|
||||
proxyAppCtx proxy.AppContext
|
||||
blockStore *bc.BlockStore
|
||||
mempool *mempl.Mempool
|
||||
privValidator *types.PrivValidator
|
||||
|
@ -166,16 +170,20 @@ type ConsensusState struct {
|
|||
stagedBlock *types.Block // Cache last staged block.
|
||||
stagedState *sm.State // Cache result of staged block.
|
||||
|
||||
// for messages which affect the state (proposals, block parts, votes)
|
||||
msgQueue chan msgInfo
|
||||
|
||||
evsw events.Fireable
|
||||
evc *events.EventCache // set in stageBlock and passed into state
|
||||
}
|
||||
|
||||
func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState {
|
||||
cs := &ConsensusState{
|
||||
proxyAppCtx: proxyAppCtx,
|
||||
blockStore: blockStore,
|
||||
mempool: mempool,
|
||||
newStepCh: make(chan *RoundState, 10),
|
||||
proxyAppCtx: proxyAppCtx,
|
||||
blockStore: blockStore,
|
||||
mempool: mempool,
|
||||
newStepCh: make(chan *RoundState, 10),
|
||||
msgQueue: make(chan msgInfo, msgQueueSize),
|
||||
}
|
||||
cs.updateToState(state)
|
||||
// Don't call scheduleRound0 yet.
|
||||
|
@ -447,6 +455,12 @@ func (cs *ConsensusState) decideProposal(height, round int) {
|
|||
cs.Proposal = proposal
|
||||
cs.ProposalBlock = block
|
||||
cs.ProposalBlockParts = blockParts
|
||||
|
||||
cs.msgQueue <- msgInfo{&ProposalMessage{proposal}, ""}
|
||||
for i := 0; i < blockParts.Total(); i++ {
|
||||
part := blockParts.GetPart(i)
|
||||
cs.msgQueue <- msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""}
|
||||
}
|
||||
} else {
|
||||
log.Warn("EnterPropose: Error signing proposal", "height", height, "round", round, "error", err)
|
||||
}
|
||||
|
@ -1118,6 +1132,8 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part
|
|||
valIndex, _ := cs.Validators.GetByAddress(cs.privValidator.Address)
|
||||
_, _, err := cs.addVote(valIndex, vote, "")
|
||||
log.Notice("Signed and added vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)
|
||||
// so we fire events for ourself and can run replays
|
||||
cs.msgQueue <- msgInfo{&VoteMessage{valIndex, vote}, ""}
|
||||
return vote
|
||||
} else {
|
||||
log.Warn("Error signing vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)
|
||||
|
|
Loading…
Reference in New Issue