diff --git a/CHANGELOG.md b/CHANGELOG.md index b460dc39..0be66e3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 0.10.3 (TBD) + +FEATURES: +- New `--consensus.create_empty_blocks` flag; when set to false, only creates blocks when there are txs or when the AppHash changes +- New `consensus.create_empty_blocks_interval` config option; when greater than 0, will create an empty block after waiting that many seconds + ## 0.10.2 (July 10, 2017) FEATURES: diff --git a/cmd/tendermint/commands/run_node.go b/cmd/tendermint/commands/run_node.go index e99b8609..e92911da 100644 --- a/cmd/tendermint/commands/run_node.go +++ b/cmd/tendermint/commands/run_node.go @@ -45,6 +45,9 @@ func AddNodeFlags(cmd *cobra.Command) { cmd.Flags().String("p2p.seeds", config.P2P.Seeds, "Comma delimited host:port seed nodes") cmd.Flags().Bool("p2p.skip_upnp", config.P2P.SkipUPNP, "Skip UPNP configuration") cmd.Flags().Bool("p2p.pex", config.P2P.PexReactor, "Enable Peer-Exchange (dev feature)") + + // consensus flags + cmd.Flags().Bool("consensus.create_empty_blocks", config.Consensus.CreateEmptyBlocks, "Set this to false to only produce blocks when there are txs or when the AppHash changes") } // Users wishing to: diff --git a/config/config.go b/config/config.go index e552b33b..cc1c7d71 100644 --- a/config/config.go +++ b/config/config.go @@ -304,6 +304,10 @@ type ConsensusConfig struct { MaxBlockSizeTxs int `mapstructure:"max_block_size_txs"` MaxBlockSizeBytes int `mapstructure:"max_block_size_bytes"` + // EmptyBlocks mode and possible interval between empty blocks in seconds + CreateEmptyBlocks bool `mapstructure:"create_empty_blocks"` + CreateEmptyBlocksInterval int `mapstructure:"create_empty_blocks_interval"` + // TODO: This probably shouldn't be exposed but it makes it // easy to write tests for the wal/replay BlockPartSize int `mapstructure:"block_part_size"` @@ -313,6 +317,16 @@ type ConsensusConfig struct { PeerQueryMaj23SleepDuration int `mapstructure:"peer_query_maj23_sleep_duration"` } +// WaitForTxs returns true if the consensus should wait for transactions before entering the propose step +func (cfg *ConsensusConfig) WaitForTxs() bool { + return !cfg.CreateEmptyBlocks || cfg.CreateEmptyBlocksInterval > 0 +} + +// EmptyBlocks returns the amount of time to wait before proposing an empty block or starting the propose timer if there are no txs available +func (cfg *ConsensusConfig) EmptyBlocksInterval() time.Duration { + return time.Duration(cfg.CreateEmptyBlocksInterval) * time.Second +} + // Propose returns the amount of time to wait for a proposal func (cfg *ConsensusConfig) Propose(round int) time.Duration { return time.Duration(cfg.TimeoutPropose+cfg.TimeoutProposeDelta*round) * time.Millisecond @@ -357,7 +371,9 @@ func DefaultConsensusConfig() *ConsensusConfig { TimeoutCommit: 1000, SkipTimeoutCommit: false, MaxBlockSizeTxs: 10000, - MaxBlockSizeBytes: 1, // TODO + MaxBlockSizeBytes: 1, // TODO + CreateEmptyBlocks: true, + CreateEmptyBlocksInterval: 0, BlockPartSize: types.DefaultBlockPartSize, // TODO: we shouldnt be importing types PeerGossipSleepDuration: 100, PeerQueryMaj23SleepDuration: 2000, diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 3a68a2f5..94a03c7a 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -293,6 +293,15 @@ func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *ty return nil } +func (privVal *ByzantinePrivValidator) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error { + privVal.mtx.Lock() + defer privVal.mtx.Unlock() + + // Sign + heartbeat.Signature = privVal.Sign(types.SignBytes(chainID, heartbeat)) + return nil +} + func (privVal *ByzantinePrivValidator) String() string { return Fmt("PrivValidator{%X}", privVal.Address) } diff --git a/consensus/common_test.go b/consensus/common_test.go index 103294ab..bf805e09 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -31,7 +31,7 @@ import ( // genesis, chain_id, priv_val var config *cfg.Config // NOTE: must be reset for each _test.go file -var ensureTimeout = time.Duration(2) +var ensureTimeout = time.Second * 2 func ensureDir(dir string, mode os.FileMode) { if err := EnsureDir(dir, mode); err != nil { @@ -240,8 +240,11 @@ func newConsensusStateWithConfig(thisConfig *cfg.Config, state *sm.State, pv *ty proxyAppConnCon := abcicli.NewLocalClient(mtx, app) // Make Mempool - mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem) + mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem, 0) mempool.SetLogger(log.TestingLogger().With("module", "mempool")) + if thisConfig.Consensus.WaitForTxs() { + mempool.EnableTxsAvailable() + } // Make ConsensusReactor cs := NewConsensusState(thisConfig.Consensus, state, proxyAppConnCon, blockStore, mempool) @@ -294,12 +297,22 @@ func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { //------------------------------------------------------------------------------- func ensureNoNewStep(stepCh chan interface{}) { - timeout := time.NewTicker(ensureTimeout * time.Second) + timer := time.NewTimer(ensureTimeout) select { - case <-timeout.C: + case <-timer.C: break case <-stepCh: - panic("We should be stuck waiting for more votes, not moving to the next step") + panic("We should be stuck waiting, not moving to the next step") + } +} + +func ensureNewStep(stepCh chan interface{}) { + timer := time.NewTimer(ensureTimeout) + select { + case <-timer.C: + panic("We shouldnt be stuck waiting") + case <-stepCh: + break } } diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 327ad733..0f726b39 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -15,6 +15,82 @@ func init() { config = ResetConfig("consensus_mempool_test") } +func TestNoProgressUntilTxsAvailable(t *testing.T) { + config := ResetConfig("consensus_mempool_txs_available_test") + config.Consensus.CreateEmptyBlocks = false + state, privVals := randGenesisState(1, false, 10) + cs := newConsensusStateWithConfig(config, state, privVals[0], NewCounterApplication()) + cs.mempool.EnableTxsAvailable() + height, round := cs.Height, cs.Round + newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) + startTestRound(cs, height, round) + + ensureNewStep(newBlockCh) // first block gets committed + ensureNoNewStep(newBlockCh) + deliverTxsRange(cs, 0, 2) + ensureNewStep(newBlockCh) // commit txs + ensureNewStep(newBlockCh) // commit updated app hash + ensureNoNewStep(newBlockCh) + +} + +func TestProgressAfterCreateEmptyBlocksInterval(t *testing.T) { + config := ResetConfig("consensus_mempool_txs_available_test") + config.Consensus.CreateEmptyBlocksInterval = int(ensureTimeout.Seconds()) + state, privVals := randGenesisState(1, false, 10) + cs := newConsensusStateWithConfig(config, state, privVals[0], NewCounterApplication()) + cs.mempool.EnableTxsAvailable() + height, round := cs.Height, cs.Round + newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) + startTestRound(cs, height, round) + + ensureNewStep(newBlockCh) // first block gets committed + ensureNoNewStep(newBlockCh) // then we dont make a block ... + ensureNewStep(newBlockCh) // until the CreateEmptyBlocksInterval has passed +} + +func TestProgressInHigherRound(t *testing.T) { + config := ResetConfig("consensus_mempool_txs_available_test") + config.Consensus.CreateEmptyBlocks = false + state, privVals := randGenesisState(1, false, 10) + cs := newConsensusStateWithConfig(config, state, privVals[0], NewCounterApplication()) + cs.mempool.EnableTxsAvailable() + height, round := cs.Height, cs.Round + newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) + newRoundCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewRound(), 1) + timeoutCh := subscribeToEvent(cs.evsw, "tester", types.EventStringTimeoutPropose(), 1) + cs.setProposal = func(proposal *types.Proposal) error { + if cs.Height == 2 && cs.Round == 0 { + // dont set the proposal in round 0 so we timeout and + // go to next round + cs.Logger.Info("Ignoring set proposal at height 2, round 0") + return nil + } + return cs.defaultSetProposal(proposal) + } + startTestRound(cs, height, round) + + ensureNewStep(newRoundCh) // first round at first height + ensureNewStep(newBlockCh) // first block gets committed + ensureNewStep(newRoundCh) // first round at next height + deliverTxsRange(cs, 0, 2) // we deliver txs, but dont set a proposal so we get the next round + <-timeoutCh + ensureNewStep(newRoundCh) // wait for the next round + ensureNewStep(newBlockCh) // now we can commit the block +} + +func deliverTxsRange(cs *ConsensusState, start, end int) { + // Deliver some txs. + for i := start; i < end; i++ { + txBytes := make([]byte, 8) + binary.BigEndian.PutUint64(txBytes, uint64(i)) + err := cs.mempool.CheckTx(txBytes, nil) + if err != nil { + panic(Fmt("Error after CheckTx: %v", err)) + } + } +} + func TestTxConcurrentWithCommit(t *testing.T) { state, privVals := randGenesisState(1, false, 10) @@ -22,21 +98,8 @@ func TestTxConcurrentWithCommit(t *testing.T) { height, round := cs.Height, cs.Round newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) - deliverTxsRange := func(start, end int) { - // Deliver some txs. - for i := start; i < end; i++ { - txBytes := make([]byte, 8) - binary.BigEndian.PutUint64(txBytes, uint64(i)) - err := cs.mempool.CheckTx(txBytes, nil) - if err != nil { - panic(Fmt("Error after CheckTx: %v", err)) - } - // time.Sleep(time.Microsecond * time.Duration(rand.Int63n(3000))) - } - } - NTxs := 10000 - go deliverTxsRange(0, NTxs) + go deliverTxsRange(cs, 0, NTxs) startTestRound(cs, height, round) ticker := time.NewTicker(time.Second * 20) diff --git a/consensus/reactor.go b/consensus/reactor.go index ce25442f..3e12a314 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -311,6 +311,16 @@ func (conR *ConsensusReactor) registerEventCallbacks() { edv := data.Unwrap().(types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote) }) + + types.AddListenerForEvent(conR.evsw, "conR", types.EventStringProposalHeartbeat(), func(data types.TMEventData) { + heartbeat := data.Unwrap().(types.EventDataProposalHeartbeat) + conR.broadcastProposalHeartbeatMessage(heartbeat) + }) +} + +func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(heartbeat types.EventDataProposalHeartbeat) { + msg := &ProposalHeartbeatMessage{heartbeat.Heartbeat} + conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg}) } func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) { @@ -1305,3 +1315,15 @@ type VoteSetBitsMessage struct { func (m *VoteSetBitsMessage) String() string { return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes) } + +//------------------------------------- + +// ProposalHeartbeatMessage is sent to signal that a node is alive and waiting for transactions for a proposal. +type ProposalHeartbeatMessage struct { + Heartbeat *types.Heartbeat +} + +// String returns a string representation. +func (m *ProposalHeartbeatMessage) String() string { + return fmt.Sprintf("[HEARTBEAT %v]", m.Heartbeat) +} diff --git a/consensus/replay.go b/consensus/replay.go index c4ed684c..0c7c27e9 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -82,7 +82,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte "blockID", v.BlockID, "peer", peerKey) } - cs.handleMsg(m, cs.RoundState) + cs.handleMsg(m) case timeoutInfo: cs.Logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) cs.handleTimeout(m, cs.RoundState) diff --git a/consensus/state.go b/consensus/state.go index 415f6af3..48c91d27 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -10,18 +10,24 @@ import ( "time" fail "github.com/ebuchman/fail-test" + wire "github.com/tendermint/go-wire" + cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/log" + cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" - cmn "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tmlibs/log" ) //----------------------------------------------------------------------------- // Config +const ( + proposalHeartbeatIntervalSeconds = 2 +) + //----------------------------------------------------------------------------- // Errors @@ -179,6 +185,7 @@ type PrivValidator interface { GetAddress() []byte SignVote(chainID string, vote *types.Vote) error SignProposal(chainID string, proposal *types.Proposal) error + SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error } // ConsensusState handles execution of the consensus algorithm. @@ -605,7 +612,8 @@ func (cs *ConsensusState) newStep() { // receiveRoutine handles messages which may cause state transitions. // it's argument (n) is the number of messages to process before exiting - use 0 to run forever // It keeps the RoundState and is the only thing that updates it. -// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities +// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities. +// ConsensusState must be locked before any internal state is updated. func (cs *ConsensusState) receiveRoutine(maxSteps int) { for { if maxSteps > 0 { @@ -619,15 +627,17 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { var mi msgInfo select { + case height := <-cs.mempool.TxsAvailable(): + cs.handleTxsAvailable(height) case mi = <-cs.peerMsgQueue: cs.wal.Save(mi) // handles proposals, block parts, votes // may generate internal events (votes, complete proposals, 2/3 majorities) - cs.handleMsg(mi, rs) + cs.handleMsg(mi) case mi = <-cs.internalMsgQueue: cs.wal.Save(mi) // handles proposals, block parts, votes - cs.handleMsg(mi, rs) + cs.handleMsg(mi) case ti := <-cs.timeoutTicker.Chan(): // tockChan: cs.wal.Save(ti) // if the timeout is relevant to the rs @@ -651,7 +661,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { } // state transitions on complete-proposal, 2/3-any, 2/3-one -func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { +func (cs *ConsensusState) handleMsg(mi msgInfo) { cs.mtx.Lock() defer cs.mtx.Unlock() @@ -708,6 +718,8 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { // NewRound event fired from enterNewRound. // XXX: should we fire timeout here (for timeout commit)? cs.enterNewRound(ti.Height, 0) + case RoundStepNewRound: + cs.enterPropose(ti.Height, 0) case RoundStepPropose: types.FireEventTimeoutPropose(cs.evsw, cs.RoundStateEvent()) cs.enterPrevote(ti.Height, ti.Round) @@ -723,6 +735,13 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { } +func (cs *ConsensusState) handleTxsAvailable(height int) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + // we only need to do this for round 0 + cs.enterPropose(height, 0) +} + //----------------------------------------------------------------------------- // State functions // Used internally by handleTimeout and handleMsg to make state transitions @@ -770,11 +789,66 @@ func (cs *ConsensusState) enterNewRound(height int, round int) { types.FireEventNewRound(cs.evsw, cs.RoundStateEvent()) - // Immediately go to enterPropose. - cs.enterPropose(height, round) + // Wait for txs to be available in the mempool + // before we enterPropose in round 0. If the last block changed the app hash, + // we may need an empty "proof" block, and enterPropose immediately. + waitForTxs := cs.config.WaitForTxs() && round == 0 && !cs.needProofBlock(height) + if waitForTxs { + if cs.config.CreateEmptyBlocksInterval > 0 { + cs.scheduleTimeout(cs.config.EmptyBlocksInterval(), height, round, RoundStepNewRound) + } + go cs.proposalHeartbeat(height, round) + } else { + cs.enterPropose(height, round) + } } -// Enter: from enterNewRound(height,round). +// needProofBlock returns true on the first height (so the genesis app hash is signed right away) +// and where the last block (height-1) caused the app hash to change +func (cs *ConsensusState) needProofBlock(height int) bool { + if height == 1 { + return true + } + + lastBlockMeta := cs.blockStore.LoadBlockMeta(height - 1) + if !bytes.Equal(cs.state.AppHash, lastBlockMeta.Header.AppHash) { + return true + } + return false +} + +func (cs *ConsensusState) proposalHeartbeat(height, round int) { + counter := 0 + addr := cs.privValidator.GetAddress() + valIndex, v := cs.Validators.GetByAddress(addr) + if v == nil { + // not a validator + valIndex = -1 + } + for { + rs := cs.GetRoundState() + // if we've already moved on, no need to send more heartbeats + if rs.Step > RoundStepNewRound || rs.Round > round || rs.Height > height { + return + } + heartbeat := &types.Heartbeat{ + Height: rs.Height, + Round: rs.Round, + Sequence: counter, + ValidatorAddress: addr, + ValidatorIndex: valIndex, + } + cs.privValidator.SignHeartbeat(cs.state.ChainID, heartbeat) + heartbeatEvent := types.EventDataProposalHeartbeat{heartbeat} + types.FireEventProposalHeartbeat(cs.evsw, heartbeatEvent) + counter += 1 + time.Sleep(proposalHeartbeatIntervalSeconds * time.Second) + } +} + +// Enter (CreateEmptyBlocks): from enterNewRound(height,round) +// Enter (CreateEmptyBlocks, CreateEmptyBlocksInterval > 0 ): after enterNewRound(height,round), after timeout of CreateEmptyBlocksInterval +// Enter (!CreateEmptyBlocks) : after enterNewRound(height,round), once txs are in the mempool func (cs *ConsensusState) enterPropose(height int, round int) { if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPropose <= cs.Step) { cs.Logger.Debug(cmn.Fmt("enterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) @@ -804,7 +878,7 @@ func (cs *ConsensusState) enterPropose(height int, round int) { return } - if !bytes.Equal(cs.Validators.GetProposer().Address, cs.privValidator.GetAddress()) { + if !cs.isProposer() { cs.Logger.Info("enterPropose: Not our turn to propose", "proposer", cs.Validators.GetProposer().Address, "privValidator", cs.privValidator) if cs.Validators.HasAddress(cs.privValidator.GetAddress()) { cs.Logger.Debug("This node is a validator") @@ -818,6 +892,10 @@ func (cs *ConsensusState) enterPropose(height int, round int) { } } +func (cs *ConsensusState) isProposer() bool { + return bytes.Equal(cs.Validators.GetProposer().Address, cs.privValidator.GetAddress()) +} + func (cs *ConsensusState) defaultDecideProposal(height, round int) { var block *types.Block var blockParts *types.PartSet diff --git a/mempool/mempool.go b/mempool/mempool.go index e804f9b3..6113910b 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -56,14 +56,16 @@ const cacheSize = 100000 type Mempool struct { config *cfg.MempoolConfig - proxyMtx sync.Mutex - proxyAppConn proxy.AppConnMempool - txs *clist.CList // concurrent linked-list of good txs - counter int64 // simple incrementing counter - height int // the last block Update()'d to - rechecking int32 // for re-checking filtered txs on Update() - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here + proxyMtx sync.Mutex + proxyAppConn proxy.AppConnMempool + txs *clist.CList // concurrent linked-list of good txs + counter int64 // simple incrementing counter + height int // the last block Update()'d to + rechecking int32 // for re-checking filtered txs on Update() + recheckCursor *clist.CElement // next expected response + recheckEnd *clist.CElement // re-checking stops here + notifiedTxsAvailable bool // true if fired on txsAvailable for this height + txsAvailable chan int // fires the next height once for each height, when the mempool is not empty // Keep a cache of already-seen txs. // This reduces the pressure on the proxyApp. @@ -76,13 +78,13 @@ type Mempool struct { } // NewMempool returns a new Mempool with the given configuration and connection to an application. -func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *Mempool { +func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int) *Mempool { mempool := &Mempool{ config: config, proxyAppConn: proxyAppConn, txs: clist.New(), counter: 0, - height: 0, + height: height, rechecking: 0, recheckCursor: nil, recheckEnd: nil, @@ -94,6 +96,13 @@ func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *M return mempool } +// EnableTxsAvailable initializes the TxsAvailable channel, +// ensuring it will trigger once every height when transactions are available. +// NOTE: not thread safe - should only be called once, on startup +func (mem *Mempool) EnableTxsAvailable() { + mem.txsAvailable = make(chan int, 1) +} + // SetLogger sets the Logger. func (mem *Mempool) SetLogger(l log.Logger) { mem.logger = l @@ -215,6 +224,7 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { tx: req.GetCheckTx().Tx, } mem.txs.PushBack(memTx) + mem.notifyTxsAvailable() } else { // ignore bad transaction mem.logger.Info("Bad Transaction", "res", r) @@ -256,12 +266,33 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) { // Done! atomic.StoreInt32(&mem.rechecking, 0) mem.logger.Info("Done rechecking txs") + + mem.notifyTxsAvailable() } default: // ignore other messages } } +// TxsAvailable returns a channel which fires once for every height, +// and only when transactions are available in the mempool. +// NOTE: the returned channel may be nil if EnableTxsAvailable was not called. +func (mem *Mempool) TxsAvailable() chan int { + return mem.txsAvailable +} + +func (mem *Mempool) notifyTxsAvailable() { + if mem.Size() == 0 { + panic("notified txs available but mempool is empty!") + } + if mem.txsAvailable != nil && + !mem.notifiedTxsAvailable { + + mem.notifiedTxsAvailable = true + mem.txsAvailable <- mem.height + 1 + } +} + // Reap returns a list of transactions currently in the mempool. // If maxTxs is -1, there is no cap on the number of returned transactions. func (mem *Mempool) Reap(maxTxs int) types.Txs { @@ -307,13 +338,15 @@ func (mem *Mempool) Update(height int, txs types.Txs) { // Set height mem.height = height + mem.notifiedTxsAvailable = false + // Remove transactions that are already in txs. goodTxs := mem.filterTxs(txsMap) // Recheck mempool txs if any txs were committed in the block // NOTE/XXX: in some apps a tx could be invalidated due to EndBlock, // so we really still do need to recheck, but this is for debugging if mem.config.Recheck && (mem.config.RecheckEmpty || len(txs) > 0) { - mem.logger.Info("Recheck txs", "numtxs", len(goodTxs)) + mem.logger.Info("Recheck txs", "numtxs", len(goodTxs), "height", height) mem.recheckTxs(goodTxs) // At this point, mem.txs are being rechecked. // mem.recheckCursor re-scans mem.txs and possibly removes some txs. diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 6451adb2..2e7a575c 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -1,34 +1,115 @@ package mempool import ( + "crypto/rand" "encoding/binary" "testing" + "time" "github.com/tendermint/abci/example/counter" + "github.com/tendermint/abci/example/dummy" + "github.com/tendermint/tmlibs/log" + cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" - "github.com/tendermint/tmlibs/log" ) -func TestSerialReap(t *testing.T) { +func newMempoolWithApp(t *testing.T, cc proxy.ClientCreator) *Mempool { config := cfg.ResetTestRoot("mempool_test") - app := counter.NewCounterApplication(true) - app.SetOption("serial", "on") - cc := proxy.NewLocalClientCreator(app) appConnMem, _ := cc.NewABCIClient() appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) if _, err := appConnMem.Start(); err != nil { t.Fatalf("Error starting ABCI client: %v", err.Error()) } + mempool := NewMempool(config.Mempool, appConnMem, 0) + mempool.SetLogger(log.TestingLogger()) + return mempool +} + +func ensureNoFire(t *testing.T, ch chan int, timeoutMS int) { + timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) + select { + case <-ch: + t.Fatal("Expected not to fire") + case <-timer.C: + } +} + +func ensureFire(t *testing.T, ch chan int, timeoutMS int) { + timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) + select { + case <-ch: + case <-timer.C: + t.Fatal("Expected to fire") + } +} + +func sendTxs(t *testing.T, mempool *Mempool, count int) types.Txs { + txs := make(types.Txs, count) + for i := 0; i < count; i++ { + txBytes := make([]byte, 20) + txs[i] = txBytes + rand.Read(txBytes) + err := mempool.CheckTx(txBytes, nil) + if err != nil { + t.Fatal("Error after CheckTx: %v", err) + } + } + return txs +} + +func TestTxsAvailable(t *testing.T) { + app := dummy.NewDummyApplication() + cc := proxy.NewLocalClientCreator(app) + mempool := newMempoolWithApp(t, cc) + mempool.EnableTxsAvailable() + + timeoutMS := 500 + + // with no txs, it shouldnt fire + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) + + // send a bunch of txs, it should only fire once + txs := sendTxs(t, mempool, 100) + ensureFire(t, mempool.TxsAvailable(), timeoutMS) + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) + + // call update with half the txs. + // it should fire once now for the new height + // since there are still txs left + committedTxs, txs := txs[:50], txs[50:] + mempool.Update(1, committedTxs) + ensureFire(t, mempool.TxsAvailable(), timeoutMS) + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) + + // send a bunch more txs. we already fired for this height so it shouldnt fire again + moreTxs := sendTxs(t, mempool, 50) + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) + + // now call update with all the txs. it should not fire as there are no txs left + committedTxs = append(txs, moreTxs...) + mempool.Update(2, committedTxs) + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) + + // send a bunch more txs, it should only fire once + sendTxs(t, mempool, 100) + ensureFire(t, mempool.TxsAvailable(), timeoutMS) + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) +} + +func TestSerialReap(t *testing.T) { + app := counter.NewCounterApplication(true) + app.SetOption("serial", "on") + cc := proxy.NewLocalClientCreator(app) + + mempool := newMempoolWithApp(t, cc) appConnCon, _ := cc.NewABCIClient() appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) if _, err := appConnCon.Start(); err != nil { t.Fatalf("Error starting ABCI client: %v", err.Error()) } - mempool := NewMempool(config.Mempool, appConnMem) - mempool.SetLogger(log.TestingLogger()) deliverTxsRange := func(start, end int) { // Deliver some txs. diff --git a/node/node.go b/node/node.go index 672e384b..e55731f4 100644 --- a/node/node.go +++ b/node/node.go @@ -137,11 +137,15 @@ func NewNode(config *cfg.Config, privValidator *types.PrivValidator, clientCreat // Make MempoolReactor mempoolLogger := logger.With("module", "mempool") - mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool()) + mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight) mempool.SetLogger(mempoolLogger) mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool) mempoolReactor.SetLogger(mempoolLogger) + if config.Consensus.WaitForTxs() { + mempool.EnableTxsAvailable() + } + // Make ConsensusReactor consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool) consensusState.SetLogger(consensusLogger) diff --git a/types/canonical_json.go b/types/canonical_json.go index 2e8583a4..5f1a0aca 100644 --- a/types/canonical_json.go +++ b/types/canonical_json.go @@ -31,6 +31,14 @@ type CanonicalJSONVote struct { Type byte `json:"type"` } +type CanonicalJSONHeartbeat struct { + Height int `json:"height"` + Round int `json:"round"` + Sequence int `json:"sequence"` + ValidatorAddress data.Bytes `json:"validator_address"` + ValidatorIndex int `json:"validator_index"` +} + //------------------------------------ // Messages including a "chain id" can only be applied to one chain, hence "Once" @@ -44,6 +52,11 @@ type CanonicalJSONOnceVote struct { Vote CanonicalJSONVote `json:"vote"` } +type CanonicalJSONOnceHeartbeat struct { + ChainID string `json:"chain_id"` + Heartbeat CanonicalJSONHeartbeat `json:"heartbeat"` +} + //----------------------------------- // Canonicalize the structs @@ -79,3 +92,13 @@ func CanonicalVote(vote *Vote) CanonicalJSONVote { vote.Type, } } + +func CanonicalHeartbeat(heartbeat *Heartbeat) CanonicalJSONHeartbeat { + return CanonicalJSONHeartbeat{ + heartbeat.Height, + heartbeat.Round, + heartbeat.Sequence, + heartbeat.ValidatorAddress, + heartbeat.ValidatorIndex, + } +} diff --git a/types/events.go b/types/events.go index 8c29c444..79e17fe0 100644 --- a/types/events.go +++ b/types/events.go @@ -31,6 +31,8 @@ func EventStringRelock() string { return "Relock" } func EventStringTimeoutWait() string { return "TimeoutWait" } func EventStringVote() string { return "Vote" } +func EventStringProposalHeartbeat() string { return "ProposalHeartbeat" } + //---------------------------------------- var ( @@ -39,6 +41,8 @@ var ( EventDataNameTx = "tx" EventDataNameRoundState = "round_state" EventDataNameVote = "vote" + + EventDataNameProposalHeartbeat = "proposer_heartbeat" ) //---------------------------------------- @@ -84,6 +88,8 @@ const ( EventDataTypeRoundState = byte(0x11) EventDataTypeVote = byte(0x12) + + EventDataTypeProposalHeartbeat = byte(0x20) ) var tmEventDataMapper = data.NewMapper(TMEventData{}). @@ -91,7 +97,8 @@ var tmEventDataMapper = data.NewMapper(TMEventData{}). RegisterImplementation(EventDataNewBlockHeader{}, EventDataNameNewBlockHeader, EventDataTypeNewBlockHeader). RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx). RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState). - RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote) + RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote). + RegisterImplementation(EventDataProposalHeartbeat{}, EventDataNameProposalHeartbeat, EventDataTypeProposalHeartbeat) // Most event messages are basic types (a block, a transaction) // but some (an input to a call tx or a receive) are more exotic @@ -115,6 +122,10 @@ type EventDataTx struct { Error string `json:"error"` // this is redundant information for now } +type EventDataProposalHeartbeat struct { + Heartbeat *Heartbeat +} + // NOTE: This goes into the replay WAL type EventDataRoundState struct { Height int `json:"height"` @@ -135,6 +146,8 @@ func (_ EventDataTx) AssertIsTMEventData() {} func (_ EventDataRoundState) AssertIsTMEventData() {} func (_ EventDataVote) AssertIsTMEventData() {} +func (_ EventDataProposalHeartbeat) AssertIsTMEventData() {} + //---------------------------------------- // Wrappers for type safety @@ -232,3 +245,7 @@ func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) { func FireEventLock(fireable events.Fireable, rs EventDataRoundState) { fireEvent(fireable, EventStringLock(), TMEventData{rs}) } + +func FireEventProposalHeartbeat(fireable events.Fireable, rs EventDataProposalHeartbeat) { + fireEvent(fireable, EventStringProposalHeartbeat(), TMEventData{rs}) +} diff --git a/types/heartbeat.go b/types/heartbeat.go new file mode 100644 index 00000000..378f6202 --- /dev/null +++ b/types/heartbeat.go @@ -0,0 +1,42 @@ +package types + +import ( + "fmt" + "io" + + "github.com/tendermint/go-crypto" + "github.com/tendermint/go-wire" + "github.com/tendermint/go-wire/data" + cmn "github.com/tendermint/tmlibs/common" +) + +type Heartbeat struct { + ValidatorAddress data.Bytes `json:"validator_address"` + ValidatorIndex int `json:"validator_index"` + Height int `json:"height"` + Round int `json:"round"` + Sequence int `json:"sequence"` + Signature crypto.Signature `json:"signature"` +} + +func (heartbeat *Heartbeat) WriteSignBytes(chainID string, w io.Writer, n *int, err *error) { + wire.WriteJSON(CanonicalJSONOnceHeartbeat{ + chainID, + CanonicalHeartbeat(heartbeat), + }, w, n, err) +} + +func (heartbeat *Heartbeat) Copy() *Heartbeat { + heartbeatCopy := *heartbeat + return &heartbeatCopy +} + +func (heartbeat *Heartbeat) String() string { + if heartbeat == nil { + return "nil-heartbeat" + } + + return fmt.Sprintf("Heartbeat{%v:%X %v/%02d (%v) %v}", + heartbeat.ValidatorIndex, cmn.Fingerprint(heartbeat.ValidatorAddress), + heartbeat.Height, heartbeat.Round, heartbeat.Sequence, heartbeat.Signature) +} diff --git a/types/priv_validator.go b/types/priv_validator.go index 8c9a011c..69082493 100644 --- a/types/priv_validator.go +++ b/types/priv_validator.go @@ -252,6 +252,13 @@ func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signByt } +func (privVal *PrivValidator) SignHeartbeat(chainID string, heartbeat *Heartbeat) error { + privVal.mtx.Lock() + defer privVal.mtx.Unlock() + heartbeat.Signature = privVal.Sign(SignBytes(chainID, heartbeat)) + return nil +} + func (privVal *PrivValidator) String() string { return fmt.Sprintf("PrivValidator{%v LH:%v, LR:%v, LS:%v}", privVal.Address, privVal.LastHeight, privVal.LastRound, privVal.LastStep) } diff --git a/types/services.go b/types/services.go index ee20487e..0008b68e 100644 --- a/types/services.go +++ b/types/services.go @@ -23,6 +23,9 @@ type Mempool interface { Reap(int) Txs Update(height int, txs Txs) Flush() + + TxsAvailable() chan int + EnableTxsAvailable() } type MockMempool struct { @@ -35,6 +38,8 @@ func (m MockMempool) CheckTx(tx Tx, cb func(*abci.Response)) error { return nil func (m MockMempool) Reap(n int) Txs { return Txs{} } func (m MockMempool) Update(height int, txs Txs) {} func (m MockMempool) Flush() {} +func (m MockMempool) TxsAvailable() chan int { return make(chan int) } +func (m MockMempool) EnableTxsAvailable() {} //------------------------------------------------------ // blockstore