From 124032e3e93b614fa9fdfc61ed366a9c4bd28b75 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 13 Jul 2017 13:19:44 -0400 Subject: [PATCH] NoEmptyBlocks config option --- config/config.go | 9 ++++++--- consensus/state.go | 6 +++++- mempool/mempool.go | 22 +++++++++++++++++++++- node/node.go | 4 ++++ 4 files changed, 36 insertions(+), 5 deletions(-) diff --git a/config/config.go b/config/config.go index e552b33b..a633b7fb 100644 --- a/config/config.go +++ b/config/config.go @@ -301,8 +301,9 @@ type ConsensusConfig struct { SkipTimeoutCommit bool `mapstructure:"skip_timeout_commit"` // BlockSize - MaxBlockSizeTxs int `mapstructure:"max_block_size_txs"` - MaxBlockSizeBytes int `mapstructure:"max_block_size_bytes"` + MaxBlockSizeTxs int `mapstructure:"max_block_size_txs"` + MaxBlockSizeBytes int `mapstructure:"max_block_size_bytes"` + NoEmptyBlocks bool `mapstructure:"no_empty_blocks"` // TODO: This probably shouldn't be exposed but it makes it // easy to write tests for the wal/replay @@ -357,7 +358,8 @@ func DefaultConsensusConfig() *ConsensusConfig { TimeoutCommit: 1000, SkipTimeoutCommit: false, MaxBlockSizeTxs: 10000, - MaxBlockSizeBytes: 1, // TODO + MaxBlockSizeBytes: 1, // TODO + NoEmptyBlocks: true, BlockPartSize: types.DefaultBlockPartSize, // TODO: we shouldnt be importing types PeerGossipSleepDuration: 100, PeerQueryMaj23SleepDuration: 2000, @@ -375,6 +377,7 @@ func TestConsensusConfig() *ConsensusConfig { config.TimeoutPrecommitDelta = 1 config.TimeoutCommit = 10 config.SkipTimeoutCommit = true + config.NoEmptyBlocks = false return config } diff --git a/consensus/state.go b/consensus/state.go index 6378297a..ff5b42c7 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -780,7 +780,11 @@ func (cs *ConsensusState) enterNewRound(height int, round int) { // Wait for txs to be available in the mempool // before we enterPropose - go cs.waitForTxs(height, round) + if cs.config.NoEmptyBlocks { + go cs.waitForTxs(height, round) + } else { + cs.enterPropose(height, round) + } } func (cs *ConsensusState) waitForTxs(height, round int) { diff --git a/mempool/mempool.go b/mempool/mempool.go index e3c9e216..160ab7d0 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -92,13 +92,18 @@ func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *M recheckEnd: nil, logger: log.NewNopLogger(), cache: newTxCache(cacheSize), - txsAvailable: make(chan struct{}, 1), } mempool.initWAL() proxyAppConn.SetResponseCallback(mempool.resCb) return mempool } +// FireOnTxsAvailable initializes the TxsAvailable channel, +// ensuring it will trigger once every height when transactions are available. +func (mem *Mempool) FireOnTxsAvailable() { + mem.txsAvailable = make(chan struct{}, 1) +} + // SetLogger sets the Logger. func (mem *Mempool) SetLogger(l log.Logger) { mem.logger = l @@ -277,10 +282,25 @@ func (mem *Mempool) alertIfTxsAvailable() { } } +// TxsAvailable returns a channel which fires once for every height, +// and only when transactions are available in the mempool. +// XXX: Will panic if mem.FireOnTxsAvailable() has not been called. func (mem *Mempool) TxsAvailable() chan struct{} { + if mem.txsAvailable == nil { + panic("mem.txsAvailable is nil") + } return mem.txsAvailable } +func (mem *Mempool) alertIfTxsAvailable() { + if mem.txsAvailable != nil && + !mem.notifiedTxsAvailable && mem.Size() > 0 { + + mem.notifiedTxsAvailable = true + mem.txsAvailable <- struct{}{} + } +} + // Reap returns a list of transactions currently in the mempool. // If maxTxs is -1, there is no cap on the number of returned transactions. func (mem *Mempool) Reap(maxTxs int) types.Txs { diff --git a/node/node.go b/node/node.go index 51390200..c76ae5fa 100644 --- a/node/node.go +++ b/node/node.go @@ -143,6 +143,10 @@ func NewNode(config *cfg.Config, privValidator *types.PrivValidator, clientCreat mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool) mempoolReactor.SetLogger(mempoolLogger) + if config.Consensus.NoEmptyBlocks { + mempool.FireOnTxsAvailable() + } + // Make ConsensusReactor consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool) consensusState.SetLogger(consensusLogger)