tendermint/consensus/reactor.go

1375 lines
41 KiB
Go
Raw Normal View History

2014-08-10 16:35:08 -07:00
package consensus
import (
"bytes"
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
2017-06-26 08:00:30 -07:00
"context"
2014-08-10 16:35:08 -07:00
"fmt"
"reflect"
2014-08-10 16:35:08 -07:00
"sync"
"time"
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
2017-06-26 08:00:30 -07:00
"github.com/pkg/errors"
2017-05-02 00:53:32 -07:00
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
cstypes "github.com/tendermint/tendermint/consensus/types"
2017-04-28 14:57:06 -07:00
"github.com/tendermint/tendermint/p2p"
2015-04-01 17:30:16 -07:00
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
2014-08-10 16:35:08 -07:00
)
const (
2016-09-05 17:33:02 -07:00
StateChannel = byte(0x20)
DataChannel = byte(0x21)
VoteChannel = byte(0x22)
VoteSetBitsChannel = byte(0x23)
maxConsensusMessageSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes.
2014-08-10 16:35:08 -07:00
)
//-----------------------------------------------------------------------------
// ConsensusReactor defines a reactor for the consensus service.
2014-09-14 15:37:32 -07:00
type ConsensusReactor struct {
2016-10-28 12:14:24 -07:00
p2p.BaseReactor // BaseService + p2p.Switch
2014-08-10 16:35:08 -07:00
2017-08-09 11:55:21 -07:00
conS *ConsensusState
mtx sync.RWMutex
fastSync bool
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
2017-06-26 08:00:30 -07:00
eventBus *types.EventBus
2014-08-10 16:35:08 -07:00
}
// NewConsensusReactor returns a new ConsensusReactor with the given consensusState.
func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor {
2014-09-14 15:37:32 -07:00
conR := &ConsensusReactor{
conS: consensusState,
fastSync: fastSync,
2014-08-10 16:35:08 -07:00
}
2017-05-02 00:53:32 -07:00
conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR)
2014-09-14 15:37:32 -07:00
return conR
2014-08-10 16:35:08 -07:00
}
// OnStart implements BaseService.
func (conR *ConsensusReactor) OnStart() error {
2017-08-09 11:55:21 -07:00
conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync())
2015-07-21 18:31:01 -07:00
conR.BaseReactor.OnStart()
2017-10-30 19:52:03 -07:00
err := conR.startBroadcastRoutine()
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
2017-06-26 08:00:30 -07:00
if err != nil {
return err
}
2017-08-09 11:55:21 -07:00
if !conR.FastSync() {
_, err := conR.conS.Start()
if err != nil {
return err
}
2014-09-14 15:37:32 -07:00
}
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
2017-06-26 08:00:30 -07:00
return nil
}
// OnStop implements BaseService
2015-07-21 18:31:01 -07:00
func (conR *ConsensusReactor) OnStop() {
conR.BaseReactor.OnStop()
conR.conS.Stop()
2014-08-10 16:35:08 -07:00
}
// SwitchToConsensus switches from fast_sync mode to consensus mode.
// It resets the state, turns off fast_sync, and starts the consensus state-machine
func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State, blocksSynced int) {
2017-05-02 00:53:32 -07:00
conR.Logger.Info("SwitchToConsensus")
2016-02-07 16:56:59 -08:00
conR.conS.reconstructLastCommit(state)
// NOTE: The line below causes broadcastNewRoundStepRoutine() to
// broadcast a NewRoundStepMessage.
2015-09-15 13:13:39 -07:00
conR.conS.updateToState(state)
2017-08-09 11:55:21 -07:00
conR.mtx.Lock()
conR.fastSync = false
2017-08-09 11:55:21 -07:00
conR.mtx.Unlock()
if blocksSynced > 0 {
// dont bother with the WAL if we fast synced
conR.conS.doWALCatchup = false
}
conR.conS.Start()
2014-10-30 03:32:09 -07:00
}
// GetChannels implements Reactor
2014-09-14 15:37:32 -07:00
func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
// TODO optimize
return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{
ID: StateChannel,
2015-05-05 17:03:11 -07:00
Priority: 5,
SendQueueCapacity: 100,
2014-09-14 15:37:32 -07:00
},
&p2p.ChannelDescriptor{
ID: DataChannel, // maybe split between gossiping current block and catchup stuff
2016-04-26 19:17:13 -07:00
Priority: 10, // once we gossip the whole block there's nothing left to send until next height or round
SendQueueCapacity: 100,
2015-12-09 13:53:31 -08:00
RecvBufferCapacity: 50 * 4096,
2014-09-14 15:37:32 -07:00
},
&p2p.ChannelDescriptor{
2015-12-09 13:53:31 -08:00
ID: VoteChannel,
Priority: 5,
SendQueueCapacity: 100,
RecvBufferCapacity: 100 * 100,
2014-09-14 15:37:32 -07:00
},
2016-09-05 17:33:02 -07:00
&p2p.ChannelDescriptor{
ID: VoteSetBitsChannel,
Priority: 1,
SendQueueCapacity: 2,
RecvBufferCapacity: 1024,
},
2014-08-10 16:35:08 -07:00
}
}
2017-07-09 15:01:25 -07:00
// AddPeer implements Reactor
2017-09-12 17:49:22 -07:00
func (conR *ConsensusReactor) AddPeer(peer p2p.Peer) {
if !conR.IsRunning() {
return
}
2014-09-14 15:37:32 -07:00
// Create peerState for peer
2017-09-12 17:49:22 -07:00
peerState := NewPeerState(peer).SetLogger(conR.Logger)
peer.Set(types.PeerStateKey, peerState)
2014-09-14 15:37:32 -07:00
2016-09-05 17:33:02 -07:00
// Begin routines for this peer.
2014-09-14 15:37:32 -07:00
go conR.gossipDataRoutine(peer, peerState)
go conR.gossipVotesRoutine(peer, peerState)
2016-09-05 17:33:02 -07:00
go conR.queryMaj23Routine(peer, peerState)
// Send our state to peer.
2015-07-09 21:46:15 -07:00
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
2017-08-09 11:55:21 -07:00
if !conR.FastSync() {
conR.sendNewRoundStepMessages(peer)
2015-07-09 21:46:15 -07:00
}
2014-08-10 16:35:08 -07:00
}
// RemovePeer implements Reactor
2017-09-12 17:49:22 -07:00
func (conR *ConsensusReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
if !conR.IsRunning() {
return
}
2015-07-10 08:39:49 -07:00
// TODO
2017-09-12 17:49:22 -07:00
//peer.Get(PeerStateKey).(*PeerState).Disconnect()
2014-08-10 16:35:08 -07:00
}
// Receive implements Reactor
// NOTE: We process these messages even when we're fast_syncing.
// Messages affect either a peer state or the consensus state.
// Peer state updates can happen in parallel, but processing of
// proposals, block parts, and votes are ordered by the receiveRoutine
// NOTE: blocks on consensus state for proposals, block parts, and votes
2017-09-12 17:49:22 -07:00
func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
if !conR.IsRunning() {
2017-05-02 00:53:32 -07:00
conR.Logger.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
return
}
2014-08-10 16:35:08 -07:00
2015-07-13 16:00:01 -07:00
_, msg, err := DecodeMessage(msgBytes)
2014-12-29 15:14:54 -08:00
if err != nil {
conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
2015-08-12 11:00:23 -07:00
// TODO punish peer?
2014-12-29 15:14:54 -08:00
return
}
2017-05-02 00:53:32 -07:00
conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
2014-10-25 14:27:53 -07:00
2015-12-05 11:58:12 -08:00
// Get peer states
2017-09-12 17:49:22 -07:00
ps := src.Get(types.PeerStateKey).(*PeerState)
2015-12-05 11:58:12 -08:00
switch chID {
case StateChannel:
2015-07-13 16:00:01 -07:00
switch msg := msg.(type) {
2014-09-14 15:37:32 -07:00
case *NewRoundStepMessage:
2015-08-26 15:56:34 -07:00
ps.ApplyNewRoundStepMessage(msg)
case *CommitStepMessage:
ps.ApplyCommitStepMessage(msg)
case *HasVoteMessage:
ps.ApplyHasVoteMessage(msg)
2016-09-05 17:33:02 -07:00
case *VoteSetMaj23Message:
cs := conR.conS
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
cs.mtx.Unlock()
2016-09-16 09:20:07 -07:00
if height != msg.Height {
return
2016-09-05 17:33:02 -07:00
}
2016-09-16 09:20:07 -07:00
// Peer claims to have a maj23 for some BlockID at H,R,S,
2017-09-12 17:49:22 -07:00
votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key(), msg.BlockID)
2016-09-16 09:20:07 -07:00
// Respond with a VoteSetBitsMessage showing which votes we have.
// (and consequently shows which we don't have)
var ourVotes *cmn.BitArray
2016-09-16 09:20:07 -07:00
switch msg.Type {
case types.VoteTypePrevote:
ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
case types.VoteTypePrecommit:
ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
default:
2017-05-02 00:53:32 -07:00
conR.Logger.Error("Bad VoteSetBitsMessage field Type")
2016-09-16 09:20:07 -07:00
return
}
src.TrySend(VoteSetBitsChannel, struct{ ConsensusMessage }{&VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID,
Votes: ourVotes,
}})
2017-08-09 21:12:16 -07:00
case *ProposalHeartbeatMessage:
hb := msg.Heartbeat
conR.Logger.Debug("Received proposal heartbeat message",
"height", hb.Height, "round", hb.Round, "sequence", hb.Sequence,
"valIdx", hb.ValidatorIndex, "valAddr", hb.ValidatorAddress)
2014-09-14 15:37:32 -07:00
default:
conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
2014-09-14 15:37:32 -07:00
}
2014-08-10 16:35:08 -07:00
case DataChannel:
2017-08-09 11:55:21 -07:00
if conR.FastSync() {
2017-05-12 14:07:53 -07:00
conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
2015-07-09 21:46:15 -07:00
return
}
2015-07-13 16:00:01 -07:00
switch msg := msg.(type) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
2017-09-12 17:49:22 -07:00
conR.conS.peerMsgQueue <- msgInfo{msg, src.Key()}
2015-06-22 19:04:31 -07:00
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
2015-12-10 11:09:25 -08:00
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
2017-09-12 17:49:22 -07:00
conR.conS.peerMsgQueue <- msgInfo{msg, src.Key()}
2014-09-14 15:37:32 -07:00
default:
conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
2014-09-14 15:37:32 -07:00
}
case VoteChannel:
2017-08-09 11:55:21 -07:00
if conR.FastSync() {
2017-05-02 00:53:32 -07:00
conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
2015-07-09 21:46:15 -07:00
return
}
2015-07-13 16:00:01 -07:00
switch msg := msg.(type) {
case *VoteMessage:
2015-08-26 15:56:34 -07:00
cs := conR.conS
cs.mtx.Lock()
height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
cs.mtx.Unlock()
ps.EnsureVoteBitArrays(height, valSize)
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)
2015-08-12 11:00:23 -07:00
2017-09-12 17:49:22 -07:00
cs.peerMsgQueue <- msgInfo{msg, src.Key()}
2014-08-10 16:35:08 -07:00
default:
2015-08-26 15:56:34 -07:00
// don't punish (leave room for soft upgrades)
conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
2014-08-10 16:35:08 -07:00
}
2016-09-05 17:33:02 -07:00
case VoteSetBitsChannel:
2017-08-09 11:55:21 -07:00
if conR.FastSync() {
2017-05-02 00:53:32 -07:00
conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
2016-09-05 17:33:02 -07:00
return
}
switch msg := msg.(type) {
case *VoteSetBitsMessage:
cs := conR.conS
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
cs.mtx.Unlock()
if height == msg.Height {
var ourVotes *cmn.BitArray
2016-09-05 17:33:02 -07:00
switch msg.Type {
case types.VoteTypePrevote:
ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
case types.VoteTypePrecommit:
ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
default:
2017-05-02 00:53:32 -07:00
conR.Logger.Error("Bad VoteSetBitsMessage field Type")
2016-09-05 17:33:02 -07:00
return
}
ps.ApplyVoteSetBitsMessage(msg, ourVotes)
} else {
ps.ApplyVoteSetBitsMessage(msg, nil)
}
default:
// don't punish (leave room for soft upgrades)
conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
2016-09-05 17:33:02 -07:00
}
2014-09-14 15:37:32 -07:00
default:
conR.Logger.Error(cmn.Fmt("Unknown chId %X", chID))
2014-08-10 16:35:08 -07:00
}
2014-09-14 15:37:32 -07:00
if err != nil {
conR.Logger.Error("Error in Receive()", "err", err)
2014-09-14 15:37:32 -07:00
}
2014-08-10 16:35:08 -07:00
}
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
2017-06-26 08:00:30 -07:00
// SetEventBus sets event bus.
func (conR *ConsensusReactor) SetEventBus(b *types.EventBus) {
conR.eventBus = b
conR.conS.SetEventBus(b)
}
2017-08-09 20:51:09 -07:00
// FastSync returns whether the consensus reactor is in fast-sync mode.
2017-07-16 23:44:23 -07:00
func (conR *ConsensusReactor) FastSync() bool {
2017-08-09 11:55:21 -07:00
conR.mtx.RLock()
defer conR.mtx.RUnlock()
2017-07-16 23:44:23 -07:00
return conR.fastSync
}
//--------------------------------------
2017-10-30 19:52:03 -07:00
// startBroadcastRoutine subscribes for new round steps, votes and proposal
// heartbeats using the event bus and starts a go routine to broadcasts events
// to peers upon receiving them.
func (conR *ConsensusReactor) startBroadcastRoutine() error {
2017-10-26 14:01:00 -07:00
const subscriber = "consensus-reactor"
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
2017-06-26 08:00:30 -07:00
ctx := context.Background()
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
2017-06-26 08:00:30 -07:00
// new round steps
stepsCh := make(chan interface{})
err := conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, stepsCh)
if err != nil {
return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryNewRoundStep)
}
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
2017-06-26 08:00:30 -07:00
// votes
votesCh := make(chan interface{})
err = conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryVote, votesCh)
if err != nil {
return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryVote)
}
2017-07-20 12:09:44 -07:00
// proposal heartbeats
heartbeatsCh := make(chan interface{})
err = conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryProposalHeartbeat, heartbeatsCh)
if err != nil {
return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryProposalHeartbeat)
}
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
2017-06-26 08:00:30 -07:00
go func() {
for {
select {
case data, ok := <-stepsCh:
if ok { // a receive from a closed channel returns the zero value immediately
edrs := data.(types.TMEventData).Unwrap().(types.EventDataRoundState)
conR.broadcastNewRoundStep(edrs.RoundState.(*cstypes.RoundState))
}
case data, ok := <-votesCh:
if ok {
edv := data.(types.TMEventData).Unwrap().(types.EventDataVote)
conR.broadcastHasVoteMessage(edv.Vote)
}
case data, ok := <-heartbeatsCh:
if ok {
edph := data.(types.TMEventData).Unwrap().(types.EventDataProposalHeartbeat)
conR.broadcastProposalHeartbeatMessage(edph)
}
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
2017-06-26 08:00:30 -07:00
case <-conR.Quit:
conR.eventBus.UnsubscribeAll(ctx, subscriber)
return
}
}
}()
return nil
2017-07-20 12:09:44 -07:00
}
2017-07-29 11:15:10 -07:00
func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(heartbeat types.EventDataProposalHeartbeat) {
2017-08-09 21:12:16 -07:00
hb := heartbeat.Heartbeat
conR.Logger.Debug("Broadcasting proposal heartbeat message",
"height", hb.Height, "round", hb.Round, "sequence", hb.Sequence)
msg := &ProposalHeartbeatMessage{hb}
2017-07-20 12:09:44 -07:00
conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
}
func (conR *ConsensusReactor) broadcastNewRoundStep(rs *cstypes.RoundState) {
nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil {
conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{nrsMsg})
}
if csMsg != nil {
conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{csMsg})
}
}
2015-06-22 19:04:31 -07:00
// Broadcasts HasVoteMessage to peers that care.
func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote) {
2015-06-22 19:04:31 -07:00
msg := &HasVoteMessage{
Height: vote.Height,
Round: vote.Round,
Type: vote.Type,
Index: vote.ValidatorIndex,
2015-06-22 19:04:31 -07:00
}
conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
2015-06-22 19:04:31 -07:00
/*
// TODO: Make this broadcast more selective.
for _, peer := range conR.Switch.Peers().List() {
2017-09-12 17:49:22 -07:00
ps := peer.Get(PeerStateKey).(*PeerState)
2015-06-22 19:04:31 -07:00
prs := ps.GetRoundState()
if prs.Height == vote.Height {
// TODO: Also filter on round?
peer.TrySend(StateChannel, struct{ ConsensusMessage }{msg})
2015-06-22 19:04:31 -07:00
} else {
// Height doesn't match
// TODO: check a field, maybe CatchupCommitRound?
// TODO: But that requires changing the struct field comment.
}
}
*/
}
func makeRoundStepMessages(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
nrsMsg = &NewRoundStepMessage{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step,
SecondsSinceStartTime: int(time.Since(rs.StartTime).Seconds()),
2015-06-19 15:30:10 -07:00
LastCommitRound: rs.LastCommit.Round(),
}
if rs.Step == cstypes.RoundStepCommit {
csMsg = &CommitStepMessage{
2015-06-22 19:04:31 -07:00
Height: rs.Height,
BlockPartsHeader: rs.ProposalBlockParts.Header(),
BlockParts: rs.ProposalBlockParts.BitArray(),
}
}
return
}
2017-09-12 17:49:22 -07:00
func (conR *ConsensusReactor) sendNewRoundStepMessages(peer p2p.Peer) {
rs := conR.conS.GetRoundState()
nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil {
peer.Send(StateChannel, struct{ ConsensusMessage }{nrsMsg})
}
if csMsg != nil {
peer.Send(StateChannel, struct{ ConsensusMessage }{csMsg})
}
}
2017-09-12 17:49:22 -07:00
func (conR *ConsensusReactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
2017-05-02 00:53:32 -07:00
logger := conR.Logger.With("peer", peer)
2014-09-14 15:37:32 -07:00
2014-08-10 16:35:08 -07:00
OUTER_LOOP:
for {
2014-09-14 15:37:32 -07:00
// Manage disconnects from self or peer.
if !peer.IsRunning() || !conR.IsRunning() {
2017-05-02 00:53:32 -07:00
logger.Info("Stopping gossipDataRoutine for peer")
2014-09-14 15:37:32 -07:00
return
2014-08-10 16:35:08 -07:00
}
2014-09-14 15:37:32 -07:00
rs := conR.conS.GetRoundState()
prs := ps.GetRoundState()
2014-08-10 16:35:08 -07:00
2014-10-26 13:26:27 -07:00
// Send proposal Block parts?
2015-06-22 19:04:31 -07:00
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
part := rs.ProposalBlockParts.GetPart(index)
2015-06-22 19:04:31 -07:00
msg := &BlockPartMessage{
2015-06-25 14:05:18 -07:00
Height: rs.Height, // This tells peer that this part applies to us.
Round: rs.Round, // This tells peer that this part applies to us.
Part: part,
}
2017-07-07 13:58:16 -07:00
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
}
}
// If the peer is on a previous height, help catch up.
2015-06-25 12:52:16 -07:00
if (0 < prs.Height) && (prs.Height < rs.Height) {
heightLogger := logger.With("height", prs.Height)
conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer)
continue OUTER_LOOP
}
// If height and round don't match, sleep.
2015-06-25 14:05:18 -07:00
if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
2017-05-02 00:53:32 -07:00
//logger.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
time.Sleep(conR.conS.config.PeerGossipSleep())
2014-08-10 16:35:08 -07:00
continue OUTER_LOOP
}
2015-06-22 19:04:31 -07:00
// By here, height and round match.
2015-06-25 14:05:18 -07:00
// Proposal block parts were already matched and sent if any were wanted.
// (These can match on hash so the round doesn't matter)
// Now consider sending other things, like the Proposal itself.
2015-06-22 19:04:31 -07:00
// Send Proposal && ProposalPOL BitArray?
2014-09-14 15:37:32 -07:00
if rs.Proposal != nil && !prs.Proposal {
// Proposal: share the proposal metadata with peer.
2015-06-22 19:04:31 -07:00
{
msg := &ProposalMessage{Proposal: rs.Proposal}
2017-07-07 13:58:16 -07:00
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
ps.SetHasProposal(rs.Proposal)
}
2015-06-22 19:04:31 -07:00
}
// ProposalPOL: lets peer know which POL votes we have so far.
2015-06-25 14:05:18 -07:00
// Peer must receive ProposalMessage first.
// rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
2015-06-22 19:04:31 -07:00
if 0 <= rs.Proposal.POLRound {
msg := &ProposalPOLMessage{
Height: rs.Height,
ProposalPOLRound: rs.Proposal.POLRound,
ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
2015-06-22 19:04:31 -07:00
}
2017-07-07 13:58:16 -07:00
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
2015-06-22 19:04:31 -07:00
}
2014-09-14 15:37:32 -07:00
continue OUTER_LOOP
2014-08-10 16:35:08 -07:00
}
2014-09-14 15:37:32 -07:00
// Nothing to do. Sleep.
time.Sleep(conR.conS.config.PeerGossipSleep())
2014-09-14 15:37:32 -07:00
continue OUTER_LOOP
2014-08-10 16:35:08 -07:00
}
}
func (conR *ConsensusReactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundState,
prs *cstypes.PeerRoundState, ps *PeerState, peer p2p.Peer) {
// this might happen if we didn't receive the commit message from the peer
// NOTE: wouldn't it be better if the peer resubmitted his CommitStepMessage periodically if not progressing?
if prs.ProposalBlockParts == nil {
blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
if blockMeta == nil {
logger.Error("Failed to load block meta",
"ourHeight", rs.Height, "blockstoreHeight", conR.conS.blockStore.Height())
time.Sleep(conR.conS.config.PeerGossipSleep())
return
}
prs.ProposalBlockPartsHeader = blockMeta.BlockID.PartsHeader
prs.ProposalBlockParts = cmn.NewBitArray(blockMeta.BlockID.PartsHeader.Total)
}
if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
// Ensure that the peer's PartSetHeader is correct
blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
if blockMeta == nil {
logger.Error("Failed to load block meta",
"ourHeight", rs.Height, "blockstoreHeight", conR.conS.blockStore.Height())
time.Sleep(conR.conS.config.PeerGossipSleep())
return
} else if !blockMeta.BlockID.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
logger.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
"blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
time.Sleep(conR.conS.config.PeerGossipSleep())
return
}
// Load the part
part := conR.conS.blockStore.LoadBlockPart(prs.Height, index)
if part == nil {
logger.Error("Could not load part", "index", index,
"blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
time.Sleep(conR.conS.config.PeerGossipSleep())
return
}
// Send the part
msg := &BlockPartMessage{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: part,
}
logger.Debug("Sending block part for catchup", "round", prs.Round)
if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
return
} else {
//logger.Info("No parts to send in catch-up, sleeping")
time.Sleep(conR.conS.config.PeerGossipSleep())
return
}
}
2017-09-12 17:49:22 -07:00
func (conR *ConsensusReactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
2017-05-02 00:53:32 -07:00
logger := conR.Logger.With("peer", peer)
2015-05-04 22:21:07 -07:00
// Simple hack to throttle logs upon sleep.
var sleeping = 0
2014-09-14 15:37:32 -07:00
OUTER_LOOP:
for {
// Manage disconnects from self or peer.
if !peer.IsRunning() || !conR.IsRunning() {
2017-05-02 00:53:32 -07:00
logger.Info("Stopping gossipVotesRoutine for peer")
2014-09-14 15:37:32 -07:00
return
2014-08-10 16:35:08 -07:00
}
2014-09-14 15:37:32 -07:00
rs := conR.conS.GetRoundState()
prs := ps.GetRoundState()
2015-05-04 22:21:07 -07:00
switch sleeping {
case 1: // First sleep
sleeping = 2
case 2: // No more sleep
sleeping = 0
}
2017-05-02 00:53:32 -07:00
//logger.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
2015-12-09 11:54:08 -08:00
// "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
2015-06-19 15:30:10 -07:00
// If height matches, then send LastCommit, Prevotes, Precommits.
if rs.Height == prs.Height {
heightLogger := logger.With("height", prs.Height)
if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) {
continue OUTER_LOOP
2015-06-22 19:04:31 -07:00
}
2014-09-14 15:37:32 -07:00
}
2015-06-19 15:30:10 -07:00
// Special catchup logic.
// If peer is lagging by height 1, send LastCommit.
if prs.Height != 0 && rs.Height == prs.Height+1 {
if ps.PickSendVote(rs.LastCommit) {
2017-07-07 13:58:16 -07:00
logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
continue OUTER_LOOP
}
2014-09-14 15:37:32 -07:00
}
2014-08-10 16:35:08 -07:00
2015-06-19 15:30:10 -07:00
// Catchup logic
2016-04-02 09:10:16 -07:00
// If peer is lagging by more than 1, send Commit.
if prs.Height != 0 && rs.Height >= prs.Height+2 {
2016-04-02 09:10:16 -07:00
// Load the block commit for prs.Height,
2015-06-19 15:30:10 -07:00
// which contains precommit signatures for prs.Height.
commit := conR.conS.blockStore.LoadBlockCommit(prs.Height)
2017-05-02 00:53:32 -07:00
logger.Info("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit)
2016-04-02 09:10:16 -07:00
if ps.PickSendVote(commit) {
2017-07-07 13:58:16 -07:00
logger.Debug("Picked Catchup commit to send", "height", prs.Height)
2015-06-19 15:30:10 -07:00
continue OUTER_LOOP
}
}
2015-05-04 22:21:07 -07:00
if sleeping == 0 {
// We sent nothing. Sleep...
sleeping = 1
2017-07-07 13:58:16 -07:00
logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height,
2015-06-22 19:04:31 -07:00
"localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
"localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
2015-05-04 22:21:07 -07:00
} else if sleeping == 2 {
// Continued sleep...
sleeping = 1
}
time.Sleep(conR.conS.config.PeerGossipSleep())
2014-09-14 15:37:32 -07:00
continue OUTER_LOOP
2014-08-10 16:35:08 -07:00
}
2014-09-14 15:37:32 -07:00
}
2014-08-10 16:35:08 -07:00
func (conR *ConsensusReactor) gossipVotesForHeight(logger log.Logger, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) bool {
// If there are lastCommits to send...
if prs.Step == cstypes.RoundStepNewHeight {
if ps.PickSendVote(rs.LastCommit) {
logger.Debug("Picked rs.LastCommit to send")
return true
}
}
// If there are prevotes to send...
if prs.Step <= cstypes.RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are precommits to send...
if prs.Step <= cstypes.RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are POLPrevotes to send...
if prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ps.PickSendVote(polPrevotes) {
logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
"round", prs.ProposalPOLRound)
return true
}
}
}
return false
}
2017-01-11 08:57:10 -08:00
// NOTE: `queryMaj23Routine` has a simple crude design since it only comes
// into play for liveness when there's a signature DDoS attack happening.
2017-09-12 17:49:22 -07:00
func (conR *ConsensusReactor) queryMaj23Routine(peer p2p.Peer, ps *PeerState) {
2017-05-02 00:53:32 -07:00
logger := conR.Logger.With("peer", peer)
2016-09-05 17:33:02 -07:00
OUTER_LOOP:
for {
// Manage disconnects from self or peer.
if !peer.IsRunning() || !conR.IsRunning() {
2017-05-02 00:53:32 -07:00
logger.Info("Stopping queryMaj23Routine for peer")
2016-09-05 17:33:02 -07:00
return
}
// Maybe send Height/Round/Prevotes
{
rs := conR.conS.GetRoundState()
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
2016-09-05 17:33:02 -07:00
Height: prs.Height,
Round: prs.Round,
Type: types.VoteTypePrevote,
BlockID: maj23,
}})
time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
2016-09-05 17:33:02 -07:00
}
}
}
// Maybe send Height/Round/Precommits
{
rs := conR.conS.GetRoundState()
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
2016-09-05 17:33:02 -07:00
Height: prs.Height,
Round: prs.Round,
Type: types.VoteTypePrecommit,
BlockID: maj23,
}})
time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
2016-09-05 17:33:02 -07:00
}
}
}
// Maybe send Height/Round/ProposalPOL
{
rs := conR.conS.GetRoundState()
prs := ps.GetRoundState()
2016-09-13 13:24:31 -07:00
if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
2016-09-05 17:33:02 -07:00
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
2016-09-05 17:33:02 -07:00
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: types.VoteTypePrevote,
BlockID: maj23,
}})
time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
2016-09-05 17:33:02 -07:00
}
}
}
// Little point sending LastCommitRound/LastCommit,
// These are fleeting and non-blocking.
// Maybe send Height/CatchupCommitRound/CatchupCommit.
{
prs := ps.GetRoundState()
if prs.CatchupCommitRound != -1 && 0 < prs.Height && prs.Height <= conR.conS.blockStore.Height() {
2016-11-16 13:47:31 -08:00
commit := conR.conS.LoadCommit(prs.Height)
peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
2016-09-05 17:33:02 -07:00
Height: prs.Height,
Round: commit.Round(),
Type: types.VoteTypePrecommit,
BlockID: commit.BlockID,
}})
time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
2016-09-05 17:33:02 -07:00
}
}
time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
2016-09-05 17:33:02 -07:00
continue OUTER_LOOP
}
}
// String returns a string representation of the ConsensusReactor.
// NOTE: For now, it is just a hard-coded string to avoid accessing unprotected shared variables.
// TODO: improve!
func (conR *ConsensusReactor) String() string {
// better not to access shared variables
2016-12-23 08:11:22 -08:00
return "ConsensusReactor" // conR.StringIndented("")
}
// StringIndented returns an indented string representation of the ConsensusReactor
func (conR *ConsensusReactor) StringIndented(indent string) string {
s := "ConsensusReactor{\n"
s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n"
for _, peer := range conR.Switch.Peers().List() {
2017-09-12 17:49:22 -07:00
ps := peer.Get(types.PeerStateKey).(*PeerState)
s += indent + " " + ps.StringIndented(indent+" ") + "\n"
}
s += indent + "}"
return s
}
2014-08-10 16:35:08 -07:00
//-----------------------------------------------------------------------------
var (
ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
)
// PeerState contains the known state of a peer, including its connection
// and threadsafe access to its PeerRoundState.
2014-08-10 16:35:08 -07:00
type PeerState struct {
2017-09-12 17:49:22 -07:00
Peer p2p.Peer
logger log.Logger
2014-09-14 15:37:32 -07:00
mtx sync.Mutex
cstypes.PeerRoundState
2014-08-10 16:35:08 -07:00
}
// NewPeerState returns a new PeerState for the given Peer
2017-09-12 17:49:22 -07:00
func NewPeerState(peer p2p.Peer) *PeerState {
2015-09-10 01:29:49 -07:00
return &PeerState{
2017-09-12 17:49:22 -07:00
Peer: peer,
logger: log.NewNopLogger(),
PeerRoundState: cstypes.PeerRoundState{
2015-09-10 01:29:49 -07:00
Round: -1,
ProposalPOLRound: -1,
LastCommitRound: -1,
CatchupCommitRound: -1,
},
}
2014-08-10 16:35:08 -07:00
}
2017-09-12 17:49:22 -07:00
func (ps *PeerState) SetLogger(logger log.Logger) *PeerState {
ps.logger = logger
return ps
}
// GetRoundState returns an atomic snapshot of the PeerRoundState.
2014-09-14 15:37:32 -07:00
// There's no point in mutating it since it won't change PeerState.
func (ps *PeerState) GetRoundState() *cstypes.PeerRoundState {
ps.mtx.Lock()
defer ps.mtx.Unlock()
2014-09-14 15:37:32 -07:00
prs := ps.PeerRoundState // copy
return &prs
}
// GetHeight returns an atomic snapshot of the PeerRoundState's height
2015-09-25 09:55:59 -07:00
// used by the mempool to ensure peers are caught up before broadcasting new txs
func (ps *PeerState) GetHeight() int {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.PeerRoundState.Height
}
// SetHasProposal sets the given proposal as known for the peer.
func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.Height != proposal.Height || ps.Round != proposal.Round {
return
}
if ps.Proposal {
return
}
ps.Proposal = true
2015-06-22 19:04:31 -07:00
ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
ps.ProposalBlockParts = cmn.NewBitArray(proposal.BlockPartsHeader.Total)
2015-06-22 19:04:31 -07:00
ps.ProposalPOLRound = proposal.POLRound
ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
2014-09-14 15:37:32 -07:00
}
// SetHasProposalBlockPart sets the given block part index as known for the peer.
func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
2014-08-10 16:35:08 -07:00
ps.mtx.Lock()
defer ps.mtx.Unlock()
2014-09-14 15:37:32 -07:00
if ps.Height != height || ps.Round != round {
return
2014-08-10 16:35:08 -07:00
}
ps.ProposalBlockParts.SetIndex(index, true)
}
// PickSendVote picks a vote and sends it to the peer.
// Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{vote}
return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg})
}
return false
}
// PickVoteToSend picks a vote to send to the peer.
// Returns true if a vote was picked.
// NOTE: `votes` must be the correct Size() for the Height().
func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if votes.Size() == 0 {
return nil, false
}
height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
// Lazily set data using 'votes'.
if votes.IsCommit() {
ps.ensureCatchupCommitRound(height, round, size)
}
ps.ensureVoteBitArrays(height, size)
psVotes := ps.getVoteBitArray(height, round, type_)
if psVotes == nil {
return nil, false // Not something worth sending
}
if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
ps.setHasVote(height, round, type_, index)
return votes.GetByIndex(index), true
}
return nil, false
}
func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *cmn.BitArray {
2016-09-16 09:20:07 -07:00
if !types.IsVoteTypeValid(type_) {
2017-08-15 21:43:55 -07:00
return nil
2016-09-05 17:33:02 -07:00
}
if ps.Height == height {
if ps.Round == round {
switch type_ {
case types.VoteTypePrevote:
return ps.Prevotes
case types.VoteTypePrecommit:
return ps.Precommits
}
}
if ps.CatchupCommitRound == round {
switch type_ {
case types.VoteTypePrevote:
return nil
case types.VoteTypePrecommit:
return ps.CatchupCommit
2016-09-05 17:33:02 -07:00
}
}
if ps.ProposalPOLRound == round {
switch type_ {
case types.VoteTypePrevote:
return ps.ProposalPOL
case types.VoteTypePrecommit:
return nil
}
}
return nil
}
if ps.Height == height+1 {
if ps.LastCommitRound == round {
switch type_ {
case types.VoteTypePrevote:
return nil
case types.VoteTypePrecommit:
return ps.LastCommit
}
}
return nil
}
return nil
}
2015-10-12 15:19:55 -07:00
// 'round': A round for which we have a +2/3 commit.
func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
if ps.Height != height {
return
}
2015-10-12 15:19:55 -07:00
/*
NOTE: This is wrong, 'round' could change.
2016-04-02 09:10:16 -07:00
e.g. if orig round is not the same as block LastCommit round.
2015-10-12 15:19:55 -07:00
if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
cmn.PanicSanity(cmn.Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
2015-10-12 15:19:55 -07:00
}
*/
if ps.CatchupCommitRound == round {
return // Nothing to do!
}
ps.CatchupCommitRound = round
if round == ps.Round {
ps.CatchupCommit = ps.Precommits
} else {
ps.CatchupCommit = cmn.NewBitArray(numValidators)
}
}
// EnsureVoteVitArrays ensures the bit-arrays have been allocated for tracking
// what votes this peer has received.
2015-06-25 14:05:18 -07:00
// NOTE: It's important to make sure that numValidators actually matches
// what the node sees as the number of validators for height.
func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
ps.ensureVoteBitArrays(height, numValidators)
}
func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
2015-05-07 17:35:58 -07:00
if ps.Height == height {
if ps.Prevotes == nil {
ps.Prevotes = cmn.NewBitArray(numValidators)
2015-05-07 17:35:58 -07:00
}
if ps.Precommits == nil {
ps.Precommits = cmn.NewBitArray(numValidators)
2015-05-07 17:35:58 -07:00
}
2015-06-19 15:30:10 -07:00
if ps.CatchupCommit == nil {
ps.CatchupCommit = cmn.NewBitArray(numValidators)
2015-05-07 17:35:58 -07:00
}
2015-06-22 19:04:31 -07:00
if ps.ProposalPOL == nil {
ps.ProposalPOL = cmn.NewBitArray(numValidators)
2015-06-22 19:04:31 -07:00
}
2015-05-07 17:35:58 -07:00
} else if ps.Height == height+1 {
2015-06-19 15:30:10 -07:00
if ps.LastCommit == nil {
ps.LastCommit = cmn.NewBitArray(numValidators)
2015-05-07 17:35:58 -07:00
}
}
2014-08-10 16:35:08 -07:00
}
// SetHasVote sets the given vote as known by the peer
func (ps *PeerState) SetHasVote(vote *types.Vote) {
2014-08-10 16:35:08 -07:00
ps.mtx.Lock()
defer ps.mtx.Unlock()
ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
}
func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
2017-09-12 17:49:22 -07:00
logger := ps.logger.With("peerRound", ps.Round, "height", height, "round", round)
2017-05-02 00:53:32 -07:00
logger.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
2016-09-05 17:33:02 -07:00
// NOTE: some may be nil BitArrays -> no side effects.
2017-08-15 21:43:55 -07:00
psVotes := ps.getVoteBitArray(height, round, type_)
if psVotes != nil {
psVotes.SetIndex(index, true)
}
}
// ApplyNewRoundStepMessage updates the peer state for the new round.
2015-08-26 15:56:34 -07:00
func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
2014-08-10 16:35:08 -07:00
ps.mtx.Lock()
defer ps.mtx.Unlock()
2014-09-14 15:37:32 -07:00
// Ignore duplicates or decreases
if CompareHRS(msg.Height, msg.Round, msg.Step, ps.Height, ps.Round, ps.Step) <= 0 {
2015-07-05 19:05:07 -07:00
return
}
// Just remember these values.
psHeight := ps.Height
psRound := ps.Round
//psStep := ps.Step
2015-06-19 15:30:10 -07:00
psCatchupCommitRound := ps.CatchupCommitRound
2015-06-22 19:04:31 -07:00
psCatchupCommit := ps.CatchupCommit
2014-09-14 15:37:32 -07:00
startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
ps.Height = msg.Height
ps.Round = msg.Round
ps.Step = msg.Step
ps.StartTime = startTime
if psHeight != msg.Height || psRound != msg.Round {
2014-10-25 14:27:53 -07:00
ps.Proposal = false
2015-06-22 19:04:31 -07:00
ps.ProposalBlockPartsHeader = types.PartSetHeader{}
ps.ProposalBlockParts = nil
ps.ProposalPOLRound = -1
ps.ProposalPOL = nil
// We'll update the BitArray capacity later.
ps.Prevotes = nil
ps.Precommits = nil
2014-10-25 14:27:53 -07:00
}
if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
2015-06-19 15:30:10 -07:00
// Peer caught up to CatchupCommitRound.
2015-06-25 14:05:18 -07:00
// Preserve psCatchupCommit!
// NOTE: We prefer to use prs.Precommits if
// pr.Round matches pr.CatchupCommitRound.
2015-06-19 15:30:10 -07:00
ps.Precommits = psCatchupCommit
}
if psHeight != msg.Height {
2015-06-19 15:30:10 -07:00
// Shift Precommits to LastCommit.
if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
ps.LastCommitRound = msg.LastCommitRound
ps.LastCommit = ps.Precommits
} else {
2015-06-19 15:30:10 -07:00
ps.LastCommitRound = msg.LastCommitRound
ps.LastCommit = nil
}
// We'll update the BitArray capacity later.
2015-06-19 15:30:10 -07:00
ps.CatchupCommitRound = -1
ps.CatchupCommit = nil
2014-09-14 15:37:32 -07:00
}
2014-08-10 16:35:08 -07:00
}
// ApplyCommitStepMessage updates the peer state for the new commit.
func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.Height != msg.Height {
return
}
2015-06-22 19:04:31 -07:00
ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
ps.ProposalBlockParts = msg.BlockParts
}
// ApplyProposalPOLMessage updates the peer state for the new proposal POL.
2016-09-05 17:33:02 -07:00
func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
2015-06-19 15:30:10 -07:00
if ps.Height != msg.Height {
return
}
2016-09-05 17:33:02 -07:00
if ps.ProposalPOLRound != msg.ProposalPOLRound {
return
}
2016-09-05 17:33:02 -07:00
// TODO: Merge onto existing ps.ProposalPOL?
// We might have sent some prevotes in the meantime.
ps.ProposalPOL = msg.ProposalPOL
}
// ApplyHasVoteMessage updates the peer state for the new vote.
2016-09-05 17:33:02 -07:00
func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
2015-06-22 19:04:31 -07:00
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.Height != msg.Height {
return
}
2016-09-05 17:33:02 -07:00
ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
}
// ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes
// it claims to have for the corresponding BlockID.
// `ourVotes` is a BitArray of votes we have for msg.BlockID
2016-09-05 17:33:02 -07:00
// NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
// we conservatively overwrite ps's votes w/ msg.Votes.
func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *cmn.BitArray) {
2016-09-05 17:33:02 -07:00
ps.mtx.Lock()
defer ps.mtx.Unlock()
votes := ps.getVoteBitArray(msg.Height, msg.Round, msg.Type)
if votes != nil {
if ourVotes == nil {
votes.Update(msg.Votes)
} else {
otherVotes := votes.Sub(ourVotes)
hasVotes := otherVotes.Or(msg.Votes)
votes.Update(hasVotes)
}
}
2015-06-22 19:04:31 -07:00
}
// String returns a string representation of the PeerState
func (ps *PeerState) String() string {
return ps.StringIndented("")
}
// StringIndented returns a string representation of the PeerState
func (ps *PeerState) StringIndented(indent string) string {
return fmt.Sprintf(`PeerState{
%s Key %v
%s PRS %v
%s}`,
2017-09-12 17:49:22 -07:00
indent, ps.Peer.Key(),
indent, ps.PeerRoundState.StringIndented(indent+" "),
indent)
}
2014-08-10 16:35:08 -07:00
//-----------------------------------------------------------------------------
// Messages
const (
2014-09-14 15:37:32 -07:00
msgTypeNewRoundStep = byte(0x01)
msgTypeCommitStep = byte(0x02)
msgTypeProposal = byte(0x11)
2015-06-22 19:04:31 -07:00
msgTypeProposalPOL = byte(0x12)
msgTypeBlockPart = byte(0x13) // both block & POL
msgTypeVote = byte(0x14)
msgTypeHasVote = byte(0x15)
2016-09-05 17:33:02 -07:00
msgTypeVoteSetMaj23 = byte(0x16)
msgTypeVoteSetBits = byte(0x17)
2017-08-09 22:09:04 -07:00
msgTypeProposalHeartbeat = byte(0x20)
2014-08-10 16:35:08 -07:00
)
// ConsensusMessage is a message that can be sent and received on the ConsensusReactor
2015-04-14 15:57:16 -07:00
type ConsensusMessage interface{}
2015-07-25 15:45:45 -07:00
var _ = wire.RegisterInterface(
2015-04-14 15:57:16 -07:00
struct{ ConsensusMessage }{},
2015-07-25 15:45:45 -07:00
wire.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
wire.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
wire.ConcreteType{&ProposalMessage{}, msgTypeProposal},
wire.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
wire.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
wire.ConcreteType{&VoteMessage{}, msgTypeVote},
wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
2016-09-05 17:33:02 -07:00
wire.ConcreteType{&VoteSetMaj23Message{}, msgTypeVoteSetMaj23},
wire.ConcreteType{&VoteSetBitsMessage{}, msgTypeVoteSetBits},
2017-08-09 22:09:04 -07:00
wire.ConcreteType{&ProposalHeartbeatMessage{}, msgTypeProposalHeartbeat},
2015-04-14 15:57:16 -07:00
)
// DecodeMessage decodes the given bytes into a ConsensusMessage.
2014-08-10 16:35:08 -07:00
// TODO: check for unnecessary extra bytes at the end.
2015-04-14 15:57:16 -07:00
func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
msgType = bz[0]
2015-11-10 13:10:43 -08:00
n := new(int)
r := bytes.NewReader(bz)
msgI := wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err)
msg = msgI.(struct{ ConsensusMessage }).ConsensusMessage
return
2014-08-10 16:35:08 -07:00
}
//-------------------------------------
// NewRoundStepMessage is sent for every step taken in the ConsensusState.
2015-06-19 15:30:10 -07:00
// For every height/round/step transition
2014-09-14 15:37:32 -07:00
type NewRoundStepMessage struct {
Height int
Round int
Step cstypes.RoundStepType
SecondsSinceStartTime int
LastCommitRound int
2014-08-10 16:35:08 -07:00
}
// String returns a string representation.
2014-09-14 15:37:32 -07:00
func (m *NewRoundStepMessage) String() string {
2015-06-19 15:30:10 -07:00
return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
m.Height, m.Round, m.Step, m.LastCommitRound)
2014-08-10 16:35:08 -07:00
}
//-------------------------------------
// CommitStepMessage is sent when a block is committed.
type CommitStepMessage struct {
Height int
2015-06-22 19:04:31 -07:00
BlockPartsHeader types.PartSetHeader
BlockParts *cmn.BitArray
}
// String returns a string representation.
func (m *CommitStepMessage) String() string {
2015-06-22 19:04:31 -07:00
return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
}
//-------------------------------------
// ProposalMessage is sent when a new block is proposed.
type ProposalMessage struct {
Proposal *types.Proposal
}
// String returns a string representation.
func (m *ProposalMessage) String() string {
return fmt.Sprintf("[Proposal %v]", m.Proposal)
}
//-------------------------------------
// ProposalPOLMessage is sent when a previous proposal is re-proposed.
2015-06-22 19:04:31 -07:00
type ProposalPOLMessage struct {
Height int
ProposalPOLRound int
ProposalPOL *cmn.BitArray
2015-06-22 19:04:31 -07:00
}
// String returns a string representation.
2015-06-22 19:04:31 -07:00
func (m *ProposalPOLMessage) String() string {
return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
}
2014-09-14 15:37:32 -07:00
2015-06-22 19:04:31 -07:00
//-------------------------------------
// BlockPartMessage is sent when gossipping a piece of the proposed block.
2015-06-22 19:04:31 -07:00
type BlockPartMessage struct {
Height int
Round int
Part *types.Part
2014-08-10 16:35:08 -07:00
}
// String returns a string representation.
2015-06-22 19:04:31 -07:00
func (m *BlockPartMessage) String() string {
2015-06-26 17:14:40 -07:00
return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
2014-08-10 16:35:08 -07:00
}
//-------------------------------------
// VoteMessage is sent when voting for a proposal (or lack thereof).
type VoteMessage struct {
Vote *types.Vote
2014-08-10 16:35:08 -07:00
}
// String returns a string representation.
func (m *VoteMessage) String() string {
return fmt.Sprintf("[Vote %v]", m.Vote)
2014-08-10 16:35:08 -07:00
}
//-------------------------------------
// HasVoteMessage is sent to indicate that a particular vote has been received.
type HasVoteMessage struct {
Height int
Round int
Type byte
Index int
}
// String returns a string representation.
func (m *HasVoteMessage) String() string {
2015-05-06 00:47:20 -07:00
return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
}
2016-09-05 17:33:02 -07:00
//-------------------------------------
// VoteSetMaj23Message is sent to indicate that a given BlockID has seen +2/3 votes.
2016-09-05 17:33:02 -07:00
type VoteSetMaj23Message struct {
Height int
Round int
Type byte
BlockID types.BlockID
}
// String returns a string representation.
2016-09-05 17:33:02 -07:00
func (m *VoteSetMaj23Message) String() string {
return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID)
}
//-------------------------------------
// VoteSetBitsMessage is sent to communicate the bit-array of votes seen for the BlockID.
2016-09-05 17:33:02 -07:00
type VoteSetBitsMessage struct {
Height int
Round int
Type byte
BlockID types.BlockID
Votes *cmn.BitArray
2016-09-05 17:33:02 -07:00
}
// String returns a string representation.
2016-09-05 17:33:02 -07:00
func (m *VoteSetBitsMessage) String() string {
return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
}
2017-07-20 12:09:44 -07:00
//-------------------------------------
2017-08-03 10:25:26 -07:00
// ProposalHeartbeatMessage is sent to signal that a node is alive and waiting for transactions for a proposal.
2017-07-29 11:15:10 -07:00
type ProposalHeartbeatMessage struct {
Heartbeat *types.Heartbeat
2017-07-20 12:09:44 -07:00
}
// String returns a string representation.
2017-07-29 11:15:10 -07:00
func (m *ProposalHeartbeatMessage) String() string {
return fmt.Sprintf("[HEARTBEAT %v]", m.Heartbeat)
2017-07-20 12:09:44 -07:00
}