Merge pull request #584 from tendermint/no-empty-blocks
No empty blocks
This commit is contained in:
commit
7108c66e3b
|
@ -1,5 +1,11 @@
|
|||
# Changelog
|
||||
|
||||
## 0.10.3 (TBD)
|
||||
|
||||
FEATURES:
|
||||
- New `--consensus.create_empty_blocks` flag; when set to false, only creates blocks when there are txs or when the AppHash changes
|
||||
- New `consensus.create_empty_blocks_interval` config option; when greater than 0, will create an empty block after waiting that many seconds
|
||||
|
||||
## 0.10.2 (July 10, 2017)
|
||||
|
||||
FEATURES:
|
||||
|
|
|
@ -45,6 +45,9 @@ func AddNodeFlags(cmd *cobra.Command) {
|
|||
cmd.Flags().String("p2p.seeds", config.P2P.Seeds, "Comma delimited host:port seed nodes")
|
||||
cmd.Flags().Bool("p2p.skip_upnp", config.P2P.SkipUPNP, "Skip UPNP configuration")
|
||||
cmd.Flags().Bool("p2p.pex", config.P2P.PexReactor, "Enable Peer-Exchange (dev feature)")
|
||||
|
||||
// consensus flags
|
||||
cmd.Flags().Bool("consensus.create_empty_blocks", config.Consensus.CreateEmptyBlocks, "Set this to false to only produce blocks when there are txs or when the AppHash changes")
|
||||
}
|
||||
|
||||
// Users wishing to:
|
||||
|
|
|
@ -304,6 +304,10 @@ type ConsensusConfig struct {
|
|||
MaxBlockSizeTxs int `mapstructure:"max_block_size_txs"`
|
||||
MaxBlockSizeBytes int `mapstructure:"max_block_size_bytes"`
|
||||
|
||||
// EmptyBlocks mode and possible interval between empty blocks in seconds
|
||||
CreateEmptyBlocks bool `mapstructure:"create_empty_blocks"`
|
||||
CreateEmptyBlocksInterval int `mapstructure:"create_empty_blocks_interval"`
|
||||
|
||||
// TODO: This probably shouldn't be exposed but it makes it
|
||||
// easy to write tests for the wal/replay
|
||||
BlockPartSize int `mapstructure:"block_part_size"`
|
||||
|
@ -313,6 +317,16 @@ type ConsensusConfig struct {
|
|||
PeerQueryMaj23SleepDuration int `mapstructure:"peer_query_maj23_sleep_duration"`
|
||||
}
|
||||
|
||||
// WaitForTxs returns true if the consensus should wait for transactions before entering the propose step
|
||||
func (cfg *ConsensusConfig) WaitForTxs() bool {
|
||||
return !cfg.CreateEmptyBlocks || cfg.CreateEmptyBlocksInterval > 0
|
||||
}
|
||||
|
||||
// EmptyBlocks returns the amount of time to wait before proposing an empty block or starting the propose timer if there are no txs available
|
||||
func (cfg *ConsensusConfig) EmptyBlocksInterval() time.Duration {
|
||||
return time.Duration(cfg.CreateEmptyBlocksInterval) * time.Second
|
||||
}
|
||||
|
||||
// Propose returns the amount of time to wait for a proposal
|
||||
func (cfg *ConsensusConfig) Propose(round int) time.Duration {
|
||||
return time.Duration(cfg.TimeoutPropose+cfg.TimeoutProposeDelta*round) * time.Millisecond
|
||||
|
@ -357,7 +371,9 @@ func DefaultConsensusConfig() *ConsensusConfig {
|
|||
TimeoutCommit: 1000,
|
||||
SkipTimeoutCommit: false,
|
||||
MaxBlockSizeTxs: 10000,
|
||||
MaxBlockSizeBytes: 1, // TODO
|
||||
MaxBlockSizeBytes: 1, // TODO
|
||||
CreateEmptyBlocks: true,
|
||||
CreateEmptyBlocksInterval: 0,
|
||||
BlockPartSize: types.DefaultBlockPartSize, // TODO: we shouldnt be importing types
|
||||
PeerGossipSleepDuration: 100,
|
||||
PeerQueryMaj23SleepDuration: 2000,
|
||||
|
|
|
@ -293,6 +293,15 @@ func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *ty
|
|||
return nil
|
||||
}
|
||||
|
||||
func (privVal *ByzantinePrivValidator) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error {
|
||||
privVal.mtx.Lock()
|
||||
defer privVal.mtx.Unlock()
|
||||
|
||||
// Sign
|
||||
heartbeat.Signature = privVal.Sign(types.SignBytes(chainID, heartbeat))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (privVal *ByzantinePrivValidator) String() string {
|
||||
return Fmt("PrivValidator{%X}", privVal.Address)
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ import (
|
|||
|
||||
// genesis, chain_id, priv_val
|
||||
var config *cfg.Config // NOTE: must be reset for each _test.go file
|
||||
var ensureTimeout = time.Duration(2)
|
||||
var ensureTimeout = time.Second * 2
|
||||
|
||||
func ensureDir(dir string, mode os.FileMode) {
|
||||
if err := EnsureDir(dir, mode); err != nil {
|
||||
|
@ -240,8 +240,11 @@ func newConsensusStateWithConfig(thisConfig *cfg.Config, state *sm.State, pv *ty
|
|||
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
|
||||
|
||||
// Make Mempool
|
||||
mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem)
|
||||
mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem, 0)
|
||||
mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
|
||||
if thisConfig.Consensus.WaitForTxs() {
|
||||
mempool.EnableTxsAvailable()
|
||||
}
|
||||
|
||||
// Make ConsensusReactor
|
||||
cs := NewConsensusState(thisConfig.Consensus, state, proxyAppConnCon, blockStore, mempool)
|
||||
|
@ -294,12 +297,22 @@ func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
|
|||
//-------------------------------------------------------------------------------
|
||||
|
||||
func ensureNoNewStep(stepCh chan interface{}) {
|
||||
timeout := time.NewTicker(ensureTimeout * time.Second)
|
||||
timer := time.NewTimer(ensureTimeout)
|
||||
select {
|
||||
case <-timeout.C:
|
||||
case <-timer.C:
|
||||
break
|
||||
case <-stepCh:
|
||||
panic("We should be stuck waiting for more votes, not moving to the next step")
|
||||
panic("We should be stuck waiting, not moving to the next step")
|
||||
}
|
||||
}
|
||||
|
||||
func ensureNewStep(stepCh chan interface{}) {
|
||||
timer := time.NewTimer(ensureTimeout)
|
||||
select {
|
||||
case <-timer.C:
|
||||
panic("We shouldnt be stuck waiting")
|
||||
case <-stepCh:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,82 @@ func init() {
|
|||
config = ResetConfig("consensus_mempool_test")
|
||||
}
|
||||
|
||||
func TestNoProgressUntilTxsAvailable(t *testing.T) {
|
||||
config := ResetConfig("consensus_mempool_txs_available_test")
|
||||
config.Consensus.CreateEmptyBlocks = false
|
||||
state, privVals := randGenesisState(1, false, 10)
|
||||
cs := newConsensusStateWithConfig(config, state, privVals[0], NewCounterApplication())
|
||||
cs.mempool.EnableTxsAvailable()
|
||||
height, round := cs.Height, cs.Round
|
||||
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1)
|
||||
startTestRound(cs, height, round)
|
||||
|
||||
ensureNewStep(newBlockCh) // first block gets committed
|
||||
ensureNoNewStep(newBlockCh)
|
||||
deliverTxsRange(cs, 0, 2)
|
||||
ensureNewStep(newBlockCh) // commit txs
|
||||
ensureNewStep(newBlockCh) // commit updated app hash
|
||||
ensureNoNewStep(newBlockCh)
|
||||
|
||||
}
|
||||
|
||||
func TestProgressAfterCreateEmptyBlocksInterval(t *testing.T) {
|
||||
config := ResetConfig("consensus_mempool_txs_available_test")
|
||||
config.Consensus.CreateEmptyBlocksInterval = int(ensureTimeout.Seconds())
|
||||
state, privVals := randGenesisState(1, false, 10)
|
||||
cs := newConsensusStateWithConfig(config, state, privVals[0], NewCounterApplication())
|
||||
cs.mempool.EnableTxsAvailable()
|
||||
height, round := cs.Height, cs.Round
|
||||
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1)
|
||||
startTestRound(cs, height, round)
|
||||
|
||||
ensureNewStep(newBlockCh) // first block gets committed
|
||||
ensureNoNewStep(newBlockCh) // then we dont make a block ...
|
||||
ensureNewStep(newBlockCh) // until the CreateEmptyBlocksInterval has passed
|
||||
}
|
||||
|
||||
func TestProgressInHigherRound(t *testing.T) {
|
||||
config := ResetConfig("consensus_mempool_txs_available_test")
|
||||
config.Consensus.CreateEmptyBlocks = false
|
||||
state, privVals := randGenesisState(1, false, 10)
|
||||
cs := newConsensusStateWithConfig(config, state, privVals[0], NewCounterApplication())
|
||||
cs.mempool.EnableTxsAvailable()
|
||||
height, round := cs.Height, cs.Round
|
||||
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1)
|
||||
newRoundCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewRound(), 1)
|
||||
timeoutCh := subscribeToEvent(cs.evsw, "tester", types.EventStringTimeoutPropose(), 1)
|
||||
cs.setProposal = func(proposal *types.Proposal) error {
|
||||
if cs.Height == 2 && cs.Round == 0 {
|
||||
// dont set the proposal in round 0 so we timeout and
|
||||
// go to next round
|
||||
cs.Logger.Info("Ignoring set proposal at height 2, round 0")
|
||||
return nil
|
||||
}
|
||||
return cs.defaultSetProposal(proposal)
|
||||
}
|
||||
startTestRound(cs, height, round)
|
||||
|
||||
ensureNewStep(newRoundCh) // first round at first height
|
||||
ensureNewStep(newBlockCh) // first block gets committed
|
||||
ensureNewStep(newRoundCh) // first round at next height
|
||||
deliverTxsRange(cs, 0, 2) // we deliver txs, but dont set a proposal so we get the next round
|
||||
<-timeoutCh
|
||||
ensureNewStep(newRoundCh) // wait for the next round
|
||||
ensureNewStep(newBlockCh) // now we can commit the block
|
||||
}
|
||||
|
||||
func deliverTxsRange(cs *ConsensusState, start, end int) {
|
||||
// Deliver some txs.
|
||||
for i := start; i < end; i++ {
|
||||
txBytes := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(txBytes, uint64(i))
|
||||
err := cs.mempool.CheckTx(txBytes, nil)
|
||||
if err != nil {
|
||||
panic(Fmt("Error after CheckTx: %v", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTxConcurrentWithCommit(t *testing.T) {
|
||||
|
||||
state, privVals := randGenesisState(1, false, 10)
|
||||
|
@ -22,21 +98,8 @@ func TestTxConcurrentWithCommit(t *testing.T) {
|
|||
height, round := cs.Height, cs.Round
|
||||
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1)
|
||||
|
||||
deliverTxsRange := func(start, end int) {
|
||||
// Deliver some txs.
|
||||
for i := start; i < end; i++ {
|
||||
txBytes := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(txBytes, uint64(i))
|
||||
err := cs.mempool.CheckTx(txBytes, nil)
|
||||
if err != nil {
|
||||
panic(Fmt("Error after CheckTx: %v", err))
|
||||
}
|
||||
// time.Sleep(time.Microsecond * time.Duration(rand.Int63n(3000)))
|
||||
}
|
||||
}
|
||||
|
||||
NTxs := 10000
|
||||
go deliverTxsRange(0, NTxs)
|
||||
go deliverTxsRange(cs, 0, NTxs)
|
||||
|
||||
startTestRound(cs, height, round)
|
||||
ticker := time.NewTicker(time.Second * 20)
|
||||
|
|
|
@ -311,6 +311,16 @@ func (conR *ConsensusReactor) registerEventCallbacks() {
|
|||
edv := data.Unwrap().(types.EventDataVote)
|
||||
conR.broadcastHasVoteMessage(edv.Vote)
|
||||
})
|
||||
|
||||
types.AddListenerForEvent(conR.evsw, "conR", types.EventStringProposalHeartbeat(), func(data types.TMEventData) {
|
||||
heartbeat := data.Unwrap().(types.EventDataProposalHeartbeat)
|
||||
conR.broadcastProposalHeartbeatMessage(heartbeat)
|
||||
})
|
||||
}
|
||||
|
||||
func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(heartbeat types.EventDataProposalHeartbeat) {
|
||||
msg := &ProposalHeartbeatMessage{heartbeat.Heartbeat}
|
||||
conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
|
||||
}
|
||||
|
||||
func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) {
|
||||
|
@ -1305,3 +1315,15 @@ type VoteSetBitsMessage struct {
|
|||
func (m *VoteSetBitsMessage) String() string {
|
||||
return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// ProposalHeartbeatMessage is sent to signal that a node is alive and waiting for transactions for a proposal.
|
||||
type ProposalHeartbeatMessage struct {
|
||||
Heartbeat *types.Heartbeat
|
||||
}
|
||||
|
||||
// String returns a string representation.
|
||||
func (m *ProposalHeartbeatMessage) String() string {
|
||||
return fmt.Sprintf("[HEARTBEAT %v]", m.Heartbeat)
|
||||
}
|
||||
|
|
|
@ -82,7 +82,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte
|
|||
"blockID", v.BlockID, "peer", peerKey)
|
||||
}
|
||||
|
||||
cs.handleMsg(m, cs.RoundState)
|
||||
cs.handleMsg(m)
|
||||
case timeoutInfo:
|
||||
cs.Logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
|
||||
cs.handleTimeout(m, cs.RoundState)
|
||||
|
|
|
@ -10,18 +10,24 @@ import (
|
|||
"time"
|
||||
|
||||
fail "github.com/ebuchman/fail-test"
|
||||
|
||||
wire "github.com/tendermint/go-wire"
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
)
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Config
|
||||
|
||||
const (
|
||||
proposalHeartbeatIntervalSeconds = 2
|
||||
)
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Errors
|
||||
|
||||
|
@ -179,6 +185,7 @@ type PrivValidator interface {
|
|||
GetAddress() []byte
|
||||
SignVote(chainID string, vote *types.Vote) error
|
||||
SignProposal(chainID string, proposal *types.Proposal) error
|
||||
SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error
|
||||
}
|
||||
|
||||
// ConsensusState handles execution of the consensus algorithm.
|
||||
|
@ -605,7 +612,8 @@ func (cs *ConsensusState) newStep() {
|
|||
// receiveRoutine handles messages which may cause state transitions.
|
||||
// it's argument (n) is the number of messages to process before exiting - use 0 to run forever
|
||||
// It keeps the RoundState and is the only thing that updates it.
|
||||
// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities
|
||||
// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities.
|
||||
// ConsensusState must be locked before any internal state is updated.
|
||||
func (cs *ConsensusState) receiveRoutine(maxSteps int) {
|
||||
for {
|
||||
if maxSteps > 0 {
|
||||
|
@ -619,15 +627,17 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
|
|||
var mi msgInfo
|
||||
|
||||
select {
|
||||
case height := <-cs.mempool.TxsAvailable():
|
||||
cs.handleTxsAvailable(height)
|
||||
case mi = <-cs.peerMsgQueue:
|
||||
cs.wal.Save(mi)
|
||||
// handles proposals, block parts, votes
|
||||
// may generate internal events (votes, complete proposals, 2/3 majorities)
|
||||
cs.handleMsg(mi, rs)
|
||||
cs.handleMsg(mi)
|
||||
case mi = <-cs.internalMsgQueue:
|
||||
cs.wal.Save(mi)
|
||||
// handles proposals, block parts, votes
|
||||
cs.handleMsg(mi, rs)
|
||||
cs.handleMsg(mi)
|
||||
case ti := <-cs.timeoutTicker.Chan(): // tockChan:
|
||||
cs.wal.Save(ti)
|
||||
// if the timeout is relevant to the rs
|
||||
|
@ -651,7 +661,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
|
|||
}
|
||||
|
||||
// state transitions on complete-proposal, 2/3-any, 2/3-one
|
||||
func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
|
||||
func (cs *ConsensusState) handleMsg(mi msgInfo) {
|
||||
cs.mtx.Lock()
|
||||
defer cs.mtx.Unlock()
|
||||
|
||||
|
@ -708,6 +718,8 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
|
|||
// NewRound event fired from enterNewRound.
|
||||
// XXX: should we fire timeout here (for timeout commit)?
|
||||
cs.enterNewRound(ti.Height, 0)
|
||||
case RoundStepNewRound:
|
||||
cs.enterPropose(ti.Height, 0)
|
||||
case RoundStepPropose:
|
||||
types.FireEventTimeoutPropose(cs.evsw, cs.RoundStateEvent())
|
||||
cs.enterPrevote(ti.Height, ti.Round)
|
||||
|
@ -723,6 +735,13 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
|
|||
|
||||
}
|
||||
|
||||
func (cs *ConsensusState) handleTxsAvailable(height int) {
|
||||
cs.mtx.Lock()
|
||||
defer cs.mtx.Unlock()
|
||||
// we only need to do this for round 0
|
||||
cs.enterPropose(height, 0)
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// State functions
|
||||
// Used internally by handleTimeout and handleMsg to make state transitions
|
||||
|
@ -770,11 +789,66 @@ func (cs *ConsensusState) enterNewRound(height int, round int) {
|
|||
|
||||
types.FireEventNewRound(cs.evsw, cs.RoundStateEvent())
|
||||
|
||||
// Immediately go to enterPropose.
|
||||
cs.enterPropose(height, round)
|
||||
// Wait for txs to be available in the mempool
|
||||
// before we enterPropose in round 0. If the last block changed the app hash,
|
||||
// we may need an empty "proof" block, and enterPropose immediately.
|
||||
waitForTxs := cs.config.WaitForTxs() && round == 0 && !cs.needProofBlock(height)
|
||||
if waitForTxs {
|
||||
if cs.config.CreateEmptyBlocksInterval > 0 {
|
||||
cs.scheduleTimeout(cs.config.EmptyBlocksInterval(), height, round, RoundStepNewRound)
|
||||
}
|
||||
go cs.proposalHeartbeat(height, round)
|
||||
} else {
|
||||
cs.enterPropose(height, round)
|
||||
}
|
||||
}
|
||||
|
||||
// Enter: from enterNewRound(height,round).
|
||||
// needProofBlock returns true on the first height (so the genesis app hash is signed right away)
|
||||
// and where the last block (height-1) caused the app hash to change
|
||||
func (cs *ConsensusState) needProofBlock(height int) bool {
|
||||
if height == 1 {
|
||||
return true
|
||||
}
|
||||
|
||||
lastBlockMeta := cs.blockStore.LoadBlockMeta(height - 1)
|
||||
if !bytes.Equal(cs.state.AppHash, lastBlockMeta.Header.AppHash) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (cs *ConsensusState) proposalHeartbeat(height, round int) {
|
||||
counter := 0
|
||||
addr := cs.privValidator.GetAddress()
|
||||
valIndex, v := cs.Validators.GetByAddress(addr)
|
||||
if v == nil {
|
||||
// not a validator
|
||||
valIndex = -1
|
||||
}
|
||||
for {
|
||||
rs := cs.GetRoundState()
|
||||
// if we've already moved on, no need to send more heartbeats
|
||||
if rs.Step > RoundStepNewRound || rs.Round > round || rs.Height > height {
|
||||
return
|
||||
}
|
||||
heartbeat := &types.Heartbeat{
|
||||
Height: rs.Height,
|
||||
Round: rs.Round,
|
||||
Sequence: counter,
|
||||
ValidatorAddress: addr,
|
||||
ValidatorIndex: valIndex,
|
||||
}
|
||||
cs.privValidator.SignHeartbeat(cs.state.ChainID, heartbeat)
|
||||
heartbeatEvent := types.EventDataProposalHeartbeat{heartbeat}
|
||||
types.FireEventProposalHeartbeat(cs.evsw, heartbeatEvent)
|
||||
counter += 1
|
||||
time.Sleep(proposalHeartbeatIntervalSeconds * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// Enter (CreateEmptyBlocks): from enterNewRound(height,round)
|
||||
// Enter (CreateEmptyBlocks, CreateEmptyBlocksInterval > 0 ): after enterNewRound(height,round), after timeout of CreateEmptyBlocksInterval
|
||||
// Enter (!CreateEmptyBlocks) : after enterNewRound(height,round), once txs are in the mempool
|
||||
func (cs *ConsensusState) enterPropose(height int, round int) {
|
||||
if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPropose <= cs.Step) {
|
||||
cs.Logger.Debug(cmn.Fmt("enterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
|
||||
|
@ -804,7 +878,7 @@ func (cs *ConsensusState) enterPropose(height int, round int) {
|
|||
return
|
||||
}
|
||||
|
||||
if !bytes.Equal(cs.Validators.GetProposer().Address, cs.privValidator.GetAddress()) {
|
||||
if !cs.isProposer() {
|
||||
cs.Logger.Info("enterPropose: Not our turn to propose", "proposer", cs.Validators.GetProposer().Address, "privValidator", cs.privValidator)
|
||||
if cs.Validators.HasAddress(cs.privValidator.GetAddress()) {
|
||||
cs.Logger.Debug("This node is a validator")
|
||||
|
@ -818,6 +892,10 @@ func (cs *ConsensusState) enterPropose(height int, round int) {
|
|||
}
|
||||
}
|
||||
|
||||
func (cs *ConsensusState) isProposer() bool {
|
||||
return bytes.Equal(cs.Validators.GetProposer().Address, cs.privValidator.GetAddress())
|
||||
}
|
||||
|
||||
func (cs *ConsensusState) defaultDecideProposal(height, round int) {
|
||||
var block *types.Block
|
||||
var blockParts *types.PartSet
|
||||
|
|
|
@ -56,14 +56,16 @@ const cacheSize = 100000
|
|||
type Mempool struct {
|
||||
config *cfg.MempoolConfig
|
||||
|
||||
proxyMtx sync.Mutex
|
||||
proxyAppConn proxy.AppConnMempool
|
||||
txs *clist.CList // concurrent linked-list of good txs
|
||||
counter int64 // simple incrementing counter
|
||||
height int // the last block Update()'d to
|
||||
rechecking int32 // for re-checking filtered txs on Update()
|
||||
recheckCursor *clist.CElement // next expected response
|
||||
recheckEnd *clist.CElement // re-checking stops here
|
||||
proxyMtx sync.Mutex
|
||||
proxyAppConn proxy.AppConnMempool
|
||||
txs *clist.CList // concurrent linked-list of good txs
|
||||
counter int64 // simple incrementing counter
|
||||
height int // the last block Update()'d to
|
||||
rechecking int32 // for re-checking filtered txs on Update()
|
||||
recheckCursor *clist.CElement // next expected response
|
||||
recheckEnd *clist.CElement // re-checking stops here
|
||||
notifiedTxsAvailable bool // true if fired on txsAvailable for this height
|
||||
txsAvailable chan int // fires the next height once for each height, when the mempool is not empty
|
||||
|
||||
// Keep a cache of already-seen txs.
|
||||
// This reduces the pressure on the proxyApp.
|
||||
|
@ -76,13 +78,13 @@ type Mempool struct {
|
|||
}
|
||||
|
||||
// NewMempool returns a new Mempool with the given configuration and connection to an application.
|
||||
func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *Mempool {
|
||||
func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int) *Mempool {
|
||||
mempool := &Mempool{
|
||||
config: config,
|
||||
proxyAppConn: proxyAppConn,
|
||||
txs: clist.New(),
|
||||
counter: 0,
|
||||
height: 0,
|
||||
height: height,
|
||||
rechecking: 0,
|
||||
recheckCursor: nil,
|
||||
recheckEnd: nil,
|
||||
|
@ -94,6 +96,13 @@ func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *M
|
|||
return mempool
|
||||
}
|
||||
|
||||
// EnableTxsAvailable initializes the TxsAvailable channel,
|
||||
// ensuring it will trigger once every height when transactions are available.
|
||||
// NOTE: not thread safe - should only be called once, on startup
|
||||
func (mem *Mempool) EnableTxsAvailable() {
|
||||
mem.txsAvailable = make(chan int, 1)
|
||||
}
|
||||
|
||||
// SetLogger sets the Logger.
|
||||
func (mem *Mempool) SetLogger(l log.Logger) {
|
||||
mem.logger = l
|
||||
|
@ -215,6 +224,7 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
|
|||
tx: req.GetCheckTx().Tx,
|
||||
}
|
||||
mem.txs.PushBack(memTx)
|
||||
mem.notifyTxsAvailable()
|
||||
} else {
|
||||
// ignore bad transaction
|
||||
mem.logger.Info("Bad Transaction", "res", r)
|
||||
|
@ -256,12 +266,33 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
|
|||
// Done!
|
||||
atomic.StoreInt32(&mem.rechecking, 0)
|
||||
mem.logger.Info("Done rechecking txs")
|
||||
|
||||
mem.notifyTxsAvailable()
|
||||
}
|
||||
default:
|
||||
// ignore other messages
|
||||
}
|
||||
}
|
||||
|
||||
// TxsAvailable returns a channel which fires once for every height,
|
||||
// and only when transactions are available in the mempool.
|
||||
// NOTE: the returned channel may be nil if EnableTxsAvailable was not called.
|
||||
func (mem *Mempool) TxsAvailable() chan int {
|
||||
return mem.txsAvailable
|
||||
}
|
||||
|
||||
func (mem *Mempool) notifyTxsAvailable() {
|
||||
if mem.Size() == 0 {
|
||||
panic("notified txs available but mempool is empty!")
|
||||
}
|
||||
if mem.txsAvailable != nil &&
|
||||
!mem.notifiedTxsAvailable {
|
||||
|
||||
mem.notifiedTxsAvailable = true
|
||||
mem.txsAvailable <- mem.height + 1
|
||||
}
|
||||
}
|
||||
|
||||
// Reap returns a list of transactions currently in the mempool.
|
||||
// If maxTxs is -1, there is no cap on the number of returned transactions.
|
||||
func (mem *Mempool) Reap(maxTxs int) types.Txs {
|
||||
|
@ -307,13 +338,15 @@ func (mem *Mempool) Update(height int, txs types.Txs) {
|
|||
|
||||
// Set height
|
||||
mem.height = height
|
||||
mem.notifiedTxsAvailable = false
|
||||
|
||||
// Remove transactions that are already in txs.
|
||||
goodTxs := mem.filterTxs(txsMap)
|
||||
// Recheck mempool txs if any txs were committed in the block
|
||||
// NOTE/XXX: in some apps a tx could be invalidated due to EndBlock,
|
||||
// so we really still do need to recheck, but this is for debugging
|
||||
if mem.config.Recheck && (mem.config.RecheckEmpty || len(txs) > 0) {
|
||||
mem.logger.Info("Recheck txs", "numtxs", len(goodTxs))
|
||||
mem.logger.Info("Recheck txs", "numtxs", len(goodTxs), "height", height)
|
||||
mem.recheckTxs(goodTxs)
|
||||
// At this point, mem.txs are being rechecked.
|
||||
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.
|
||||
|
|
|
@ -1,34 +1,115 @@
|
|||
package mempool
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/abci/example/counter"
|
||||
"github.com/tendermint/abci/example/dummy"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
)
|
||||
|
||||
func TestSerialReap(t *testing.T) {
|
||||
func newMempoolWithApp(t *testing.T, cc proxy.ClientCreator) *Mempool {
|
||||
config := cfg.ResetTestRoot("mempool_test")
|
||||
|
||||
app := counter.NewCounterApplication(true)
|
||||
app.SetOption("serial", "on")
|
||||
cc := proxy.NewLocalClientCreator(app)
|
||||
appConnMem, _ := cc.NewABCIClient()
|
||||
appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
|
||||
if _, err := appConnMem.Start(); err != nil {
|
||||
t.Fatalf("Error starting ABCI client: %v", err.Error())
|
||||
}
|
||||
mempool := NewMempool(config.Mempool, appConnMem, 0)
|
||||
mempool.SetLogger(log.TestingLogger())
|
||||
return mempool
|
||||
}
|
||||
|
||||
func ensureNoFire(t *testing.T, ch chan int, timeoutMS int) {
|
||||
timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatal("Expected not to fire")
|
||||
case <-timer.C:
|
||||
}
|
||||
}
|
||||
|
||||
func ensureFire(t *testing.T, ch chan int, timeoutMS int) {
|
||||
timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
|
||||
select {
|
||||
case <-ch:
|
||||
case <-timer.C:
|
||||
t.Fatal("Expected to fire")
|
||||
}
|
||||
}
|
||||
|
||||
func sendTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
|
||||
txs := make(types.Txs, count)
|
||||
for i := 0; i < count; i++ {
|
||||
txBytes := make([]byte, 20)
|
||||
txs[i] = txBytes
|
||||
rand.Read(txBytes)
|
||||
err := mempool.CheckTx(txBytes, nil)
|
||||
if err != nil {
|
||||
t.Fatal("Error after CheckTx: %v", err)
|
||||
}
|
||||
}
|
||||
return txs
|
||||
}
|
||||
|
||||
func TestTxsAvailable(t *testing.T) {
|
||||
app := dummy.NewDummyApplication()
|
||||
cc := proxy.NewLocalClientCreator(app)
|
||||
mempool := newMempoolWithApp(t, cc)
|
||||
mempool.EnableTxsAvailable()
|
||||
|
||||
timeoutMS := 500
|
||||
|
||||
// with no txs, it shouldnt fire
|
||||
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
|
||||
|
||||
// send a bunch of txs, it should only fire once
|
||||
txs := sendTxs(t, mempool, 100)
|
||||
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
|
||||
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
|
||||
|
||||
// call update with half the txs.
|
||||
// it should fire once now for the new height
|
||||
// since there are still txs left
|
||||
committedTxs, txs := txs[:50], txs[50:]
|
||||
mempool.Update(1, committedTxs)
|
||||
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
|
||||
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
|
||||
|
||||
// send a bunch more txs. we already fired for this height so it shouldnt fire again
|
||||
moreTxs := sendTxs(t, mempool, 50)
|
||||
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
|
||||
|
||||
// now call update with all the txs. it should not fire as there are no txs left
|
||||
committedTxs = append(txs, moreTxs...)
|
||||
mempool.Update(2, committedTxs)
|
||||
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
|
||||
|
||||
// send a bunch more txs, it should only fire once
|
||||
sendTxs(t, mempool, 100)
|
||||
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
|
||||
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
|
||||
}
|
||||
|
||||
func TestSerialReap(t *testing.T) {
|
||||
app := counter.NewCounterApplication(true)
|
||||
app.SetOption("serial", "on")
|
||||
cc := proxy.NewLocalClientCreator(app)
|
||||
|
||||
mempool := newMempoolWithApp(t, cc)
|
||||
appConnCon, _ := cc.NewABCIClient()
|
||||
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
|
||||
if _, err := appConnCon.Start(); err != nil {
|
||||
t.Fatalf("Error starting ABCI client: %v", err.Error())
|
||||
}
|
||||
mempool := NewMempool(config.Mempool, appConnMem)
|
||||
mempool.SetLogger(log.TestingLogger())
|
||||
|
||||
deliverTxsRange := func(start, end int) {
|
||||
// Deliver some txs.
|
||||
|
|
|
@ -137,11 +137,15 @@ func NewNode(config *cfg.Config, privValidator *types.PrivValidator, clientCreat
|
|||
|
||||
// Make MempoolReactor
|
||||
mempoolLogger := logger.With("module", "mempool")
|
||||
mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool())
|
||||
mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight)
|
||||
mempool.SetLogger(mempoolLogger)
|
||||
mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
|
||||
mempoolReactor.SetLogger(mempoolLogger)
|
||||
|
||||
if config.Consensus.WaitForTxs() {
|
||||
mempool.EnableTxsAvailable()
|
||||
}
|
||||
|
||||
// Make ConsensusReactor
|
||||
consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
|
||||
consensusState.SetLogger(consensusLogger)
|
||||
|
|
|
@ -31,6 +31,14 @@ type CanonicalJSONVote struct {
|
|||
Type byte `json:"type"`
|
||||
}
|
||||
|
||||
type CanonicalJSONHeartbeat struct {
|
||||
Height int `json:"height"`
|
||||
Round int `json:"round"`
|
||||
Sequence int `json:"sequence"`
|
||||
ValidatorAddress data.Bytes `json:"validator_address"`
|
||||
ValidatorIndex int `json:"validator_index"`
|
||||
}
|
||||
|
||||
//------------------------------------
|
||||
// Messages including a "chain id" can only be applied to one chain, hence "Once"
|
||||
|
||||
|
@ -44,6 +52,11 @@ type CanonicalJSONOnceVote struct {
|
|||
Vote CanonicalJSONVote `json:"vote"`
|
||||
}
|
||||
|
||||
type CanonicalJSONOnceHeartbeat struct {
|
||||
ChainID string `json:"chain_id"`
|
||||
Heartbeat CanonicalJSONHeartbeat `json:"heartbeat"`
|
||||
}
|
||||
|
||||
//-----------------------------------
|
||||
// Canonicalize the structs
|
||||
|
||||
|
@ -79,3 +92,13 @@ func CanonicalVote(vote *Vote) CanonicalJSONVote {
|
|||
vote.Type,
|
||||
}
|
||||
}
|
||||
|
||||
func CanonicalHeartbeat(heartbeat *Heartbeat) CanonicalJSONHeartbeat {
|
||||
return CanonicalJSONHeartbeat{
|
||||
heartbeat.Height,
|
||||
heartbeat.Round,
|
||||
heartbeat.Sequence,
|
||||
heartbeat.ValidatorAddress,
|
||||
heartbeat.ValidatorIndex,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,8 @@ func EventStringRelock() string { return "Relock" }
|
|||
func EventStringTimeoutWait() string { return "TimeoutWait" }
|
||||
func EventStringVote() string { return "Vote" }
|
||||
|
||||
func EventStringProposalHeartbeat() string { return "ProposalHeartbeat" }
|
||||
|
||||
//----------------------------------------
|
||||
|
||||
var (
|
||||
|
@ -39,6 +41,8 @@ var (
|
|||
EventDataNameTx = "tx"
|
||||
EventDataNameRoundState = "round_state"
|
||||
EventDataNameVote = "vote"
|
||||
|
||||
EventDataNameProposalHeartbeat = "proposer_heartbeat"
|
||||
)
|
||||
|
||||
//----------------------------------------
|
||||
|
@ -84,6 +88,8 @@ const (
|
|||
|
||||
EventDataTypeRoundState = byte(0x11)
|
||||
EventDataTypeVote = byte(0x12)
|
||||
|
||||
EventDataTypeProposalHeartbeat = byte(0x20)
|
||||
)
|
||||
|
||||
var tmEventDataMapper = data.NewMapper(TMEventData{}).
|
||||
|
@ -91,7 +97,8 @@ var tmEventDataMapper = data.NewMapper(TMEventData{}).
|
|||
RegisterImplementation(EventDataNewBlockHeader{}, EventDataNameNewBlockHeader, EventDataTypeNewBlockHeader).
|
||||
RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx).
|
||||
RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState).
|
||||
RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote)
|
||||
RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote).
|
||||
RegisterImplementation(EventDataProposalHeartbeat{}, EventDataNameProposalHeartbeat, EventDataTypeProposalHeartbeat)
|
||||
|
||||
// Most event messages are basic types (a block, a transaction)
|
||||
// but some (an input to a call tx or a receive) are more exotic
|
||||
|
@ -115,6 +122,10 @@ type EventDataTx struct {
|
|||
Error string `json:"error"` // this is redundant information for now
|
||||
}
|
||||
|
||||
type EventDataProposalHeartbeat struct {
|
||||
Heartbeat *Heartbeat
|
||||
}
|
||||
|
||||
// NOTE: This goes into the replay WAL
|
||||
type EventDataRoundState struct {
|
||||
Height int `json:"height"`
|
||||
|
@ -135,6 +146,8 @@ func (_ EventDataTx) AssertIsTMEventData() {}
|
|||
func (_ EventDataRoundState) AssertIsTMEventData() {}
|
||||
func (_ EventDataVote) AssertIsTMEventData() {}
|
||||
|
||||
func (_ EventDataProposalHeartbeat) AssertIsTMEventData() {}
|
||||
|
||||
//----------------------------------------
|
||||
// Wrappers for type safety
|
||||
|
||||
|
@ -232,3 +245,7 @@ func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) {
|
|||
func FireEventLock(fireable events.Fireable, rs EventDataRoundState) {
|
||||
fireEvent(fireable, EventStringLock(), TMEventData{rs})
|
||||
}
|
||||
|
||||
func FireEventProposalHeartbeat(fireable events.Fireable, rs EventDataProposalHeartbeat) {
|
||||
fireEvent(fireable, EventStringProposalHeartbeat(), TMEventData{rs})
|
||||
}
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
package types
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/tendermint/go-crypto"
|
||||
"github.com/tendermint/go-wire"
|
||||
"github.com/tendermint/go-wire/data"
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
)
|
||||
|
||||
type Heartbeat struct {
|
||||
ValidatorAddress data.Bytes `json:"validator_address"`
|
||||
ValidatorIndex int `json:"validator_index"`
|
||||
Height int `json:"height"`
|
||||
Round int `json:"round"`
|
||||
Sequence int `json:"sequence"`
|
||||
Signature crypto.Signature `json:"signature"`
|
||||
}
|
||||
|
||||
func (heartbeat *Heartbeat) WriteSignBytes(chainID string, w io.Writer, n *int, err *error) {
|
||||
wire.WriteJSON(CanonicalJSONOnceHeartbeat{
|
||||
chainID,
|
||||
CanonicalHeartbeat(heartbeat),
|
||||
}, w, n, err)
|
||||
}
|
||||
|
||||
func (heartbeat *Heartbeat) Copy() *Heartbeat {
|
||||
heartbeatCopy := *heartbeat
|
||||
return &heartbeatCopy
|
||||
}
|
||||
|
||||
func (heartbeat *Heartbeat) String() string {
|
||||
if heartbeat == nil {
|
||||
return "nil-heartbeat"
|
||||
}
|
||||
|
||||
return fmt.Sprintf("Heartbeat{%v:%X %v/%02d (%v) %v}",
|
||||
heartbeat.ValidatorIndex, cmn.Fingerprint(heartbeat.ValidatorAddress),
|
||||
heartbeat.Height, heartbeat.Round, heartbeat.Sequence, heartbeat.Signature)
|
||||
}
|
|
@ -252,6 +252,13 @@ func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signByt
|
|||
|
||||
}
|
||||
|
||||
func (privVal *PrivValidator) SignHeartbeat(chainID string, heartbeat *Heartbeat) error {
|
||||
privVal.mtx.Lock()
|
||||
defer privVal.mtx.Unlock()
|
||||
heartbeat.Signature = privVal.Sign(SignBytes(chainID, heartbeat))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (privVal *PrivValidator) String() string {
|
||||
return fmt.Sprintf("PrivValidator{%v LH:%v, LR:%v, LS:%v}", privVal.Address, privVal.LastHeight, privVal.LastRound, privVal.LastStep)
|
||||
}
|
||||
|
|
|
@ -23,6 +23,9 @@ type Mempool interface {
|
|||
Reap(int) Txs
|
||||
Update(height int, txs Txs)
|
||||
Flush()
|
||||
|
||||
TxsAvailable() chan int
|
||||
EnableTxsAvailable()
|
||||
}
|
||||
|
||||
type MockMempool struct {
|
||||
|
@ -35,6 +38,8 @@ func (m MockMempool) CheckTx(tx Tx, cb func(*abci.Response)) error { return nil
|
|||
func (m MockMempool) Reap(n int) Txs { return Txs{} }
|
||||
func (m MockMempool) Update(height int, txs Txs) {}
|
||||
func (m MockMempool) Flush() {}
|
||||
func (m MockMempool) TxsAvailable() chan int { return make(chan int) }
|
||||
func (m MockMempool) EnableTxsAvailable() {}
|
||||
|
||||
//------------------------------------------------------
|
||||
// blockstore
|
||||
|
|
Loading…
Reference in New Issue