diff --git a/Makefile b/Makefile index ac1956dd..ea66d820 100644 --- a/Makefile +++ b/Makefile @@ -43,6 +43,10 @@ get_deps: grep -v /vendor/ | sort | uniq | \ xargs go get +get_vendor_deps: + go get github.com/Masterminds/glide + glide install + update_deps: go get -d -u github.com/tendermint/tendermint/... diff --git a/circle.yml b/circle.yml index f022d5a6..98dd2228 100644 --- a/circle.yml +++ b/circle.yml @@ -16,8 +16,8 @@ checkout: dependencies: override: - - "cd $REPO && make get_deps" + - "cd $REPO && make get_vendor_deps" test: override: - - "cd $REPO && make test" + - "cd $REPO && make test_race" diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 689ba794..c53f8bfb 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -86,13 +86,13 @@ func ResetConfig(localPath string) cfg.Config { mapConfig.SetDefault("block_size", 10000) mapConfig.SetDefault("disable_data_hash", false) - mapConfig.SetDefault("timeout_propose", 100) - mapConfig.SetDefault("timeout_propose_delta", 1) - mapConfig.SetDefault("timeout_prevote", 1) - mapConfig.SetDefault("timeout_prevote_delta", 1) - mapConfig.SetDefault("timeout_precommit", 1) - mapConfig.SetDefault("timeout_precommit_delta", 1) - mapConfig.SetDefault("timeout_commit", 1) + mapConfig.SetDefault("timeout_propose", 3000) + mapConfig.SetDefault("timeout_propose_delta", 1000) + mapConfig.SetDefault("timeout_prevote", 2000) + mapConfig.SetDefault("timeout_prevote_delta", 1000) + mapConfig.SetDefault("timeout_precommit", 2000) + mapConfig.SetDefault("timeout_precommit_delta", 1000) + mapConfig.SetDefault("timeout_commit", 1000) mapConfig.SetDefault("mempool_recheck", true) mapConfig.SetDefault("mempool_recheck_empty", true) mapConfig.SetDefault("mempool_broadcast", true) diff --git a/consensus/common_test.go b/consensus/common_test.go index a7fce7d1..693edbca 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -99,18 +99,18 @@ func changeProposer(t *testing.T, perspectiveOf *ConsensusState, newProposer *va _, v1 := perspectiveOf.Validators.GetByAddress(perspectiveOf.privValidator.Address) v1.Accum, v1.VotingPower = 0, 0 if updated := perspectiveOf.Validators.Update(v1); !updated { - t.Fatal("failed to update validator") + panic("failed to update validator") } _, v2 := perspectiveOf.Validators.GetByAddress(newProposer.Address) v2.Accum, v2.VotingPower = 100, 100 if updated := perspectiveOf.Validators.Update(v2); !updated { - t.Fatal("failed to update validator") + panic("failed to update validator") } // make the proposal propBlock, _ := perspectiveOf.createProposalBlock() if propBlock == nil { - t.Fatal("Failed to create proposal block with cs2") + panic("Failed to create proposal block with cs2") } return propBlock } @@ -120,7 +120,7 @@ func fixVotingPower(t *testing.T, cs1 *ConsensusState, addr2 []byte) { _, v2 := cs1.Validators.GetByAddress(addr2) v1.Accum, v1.VotingPower = v2.Accum, v2.VotingPower if updated := cs1.Validators.Update(v1); !updated { - t.Fatal("failed to update validator") + panic("failed to update validator") } } @@ -128,13 +128,16 @@ func addVoteToFromMany(to *ConsensusState, votes []*types.Vote, froms ...*valida if len(votes) != len(froms) { panic("len(votes) and len(froms) must match") } + for i, from := range froms { addVoteToFrom(to, from, votes[i]) } } func addVoteToFrom(to *ConsensusState, from *validatorStub, vote *types.Vote) { + to.mtx.Lock() // NOTE: wont need this when the vote comes with the index! valIndex, _ := to.Validators.GetByAddress(from.PrivValidator.Address) + to.mtx.Unlock() to.peerMsgQueue <- msgInfo{Msg: &VoteMessage{valIndex, vote}} // added, err := to.TryAddVote(valIndex, vote, "") @@ -158,16 +161,32 @@ func signVoteMany(voteType byte, hash []byte, header types.PartSetHeader, vss .. } // add vote to one cs from another -func signAddVoteToFromMany(voteType byte, to *ConsensusState, hash []byte, header types.PartSetHeader, froms ...*validatorStub) { +// if voteCh is not nil, read all votes +func signAddVoteToFromMany(voteType byte, to *ConsensusState, hash []byte, header types.PartSetHeader, voteCh chan interface{}, froms ...*validatorStub) { + var wg chan struct{} // when done reading all votes + if voteCh != nil { + wg = readVotes(voteCh, len(froms)) + } for _, from := range froms { vote := signVote(from, voteType, hash, header) addVoteToFrom(to, from, vote) } + + if voteCh != nil { + <-wg + } } -func signAddVoteToFrom(voteType byte, to *ConsensusState, from *validatorStub, hash []byte, header types.PartSetHeader) *types.Vote { +func signAddVoteToFrom(voteType byte, to *ConsensusState, from *validatorStub, hash []byte, header types.PartSetHeader, voteCh chan interface{}) *types.Vote { + var wg chan struct{} // when done reading all votes + if voteCh != nil { + wg = readVotes(voteCh, 1) + } vote := signVote(from, voteType, hash, header) addVoteToFrom(to, from, vote) + if voteCh != nil { + <-wg + } return vote } @@ -357,6 +376,17 @@ func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { return voteCh } +func readVotes(ch chan interface{}, reads int) chan struct{} { + wg := make(chan struct{}) + go func() { + for i := 0; i < reads; i++ { + <-ch // read the precommit event + } + close(wg) + }() + return wg +} + func randGenesisState(numValidators int, randPower bool, minPower int64) (*sm.State, []*types.PrivValidator) { db := dbm.NewMemDB() genDoc, privValidators := randGenesisDoc(numValidators, randPower, minPower) diff --git a/consensus/height_vote_set.go b/consensus/height_vote_set.go index 93fa94e8..583ef3f6 100644 --- a/consensus/height_vote_set.go +++ b/consensus/height_vote_set.go @@ -38,18 +38,28 @@ type HeightVoteSet struct { func NewHeightVoteSet(chainID string, height int, valSet *types.ValidatorSet) *HeightVoteSet { hvs := &HeightVoteSet{ - chainID: chainID, - height: height, - valSet: valSet, - roundVoteSets: make(map[int]RoundVoteSet), - peerCatchupRounds: make(map[string]int), + chainID: chainID, } - hvs.addRound(0) - hvs.round = 0 + hvs.Reset(height, valSet) return hvs } +func (hvs *HeightVoteSet) Reset(height int, valSet *types.ValidatorSet) { + hvs.mtx.Lock() + defer hvs.mtx.Unlock() + + hvs.height = height + hvs.valSet = valSet + hvs.roundVoteSets = make(map[int]RoundVoteSet) + hvs.peerCatchupRounds = make(map[string]int) + + hvs.addRound(0) + hvs.round = 0 +} + func (hvs *HeightVoteSet) Height() int { + hvs.mtx.Lock() + defer hvs.mtx.Unlock() return hvs.height } diff --git a/consensus/height_vote_set_test.go b/consensus/height_vote_set_test.go index 28502716..b5153259 100644 --- a/consensus/height_vote_set_test.go +++ b/consensus/height_vote_set_test.go @@ -1,10 +1,11 @@ package consensus import ( + "testing" + + . "github.com/tendermint/go-common" "github.com/tendermint/tendermint/config/tendermint_test" "github.com/tendermint/tendermint/types" - - "testing" ) func init() { @@ -45,7 +46,7 @@ func makeVoteHR(t *testing.T, height, round int, privVal *types.PrivValidator) * chainID := config.GetString("chain_id") err := privVal.SignVote(chainID, vote) if err != nil { - t.Fatalf("Error signing vote: %v", err) + panic(Fmt("Error signing vote: %v", err)) return nil } return vote diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 14098b86..2a063be7 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -31,7 +31,7 @@ func TestTxConcurrentWithCommit(t *testing.T) { binary.BigEndian.PutUint64(txBytes, uint64(i)) err := cs.mempool.CheckTx(txBytes, nil) if err != nil { - t.Fatal("Error after CheckTx: %v", err) + panic(Fmt("Error after CheckTx: %v", err)) } // time.Sleep(time.Microsecond * time.Duration(rand.Int63n(3000))) } @@ -41,13 +41,13 @@ func TestTxConcurrentWithCommit(t *testing.T) { go appendTxsRange(0, NTxs) startTestRound(cs, height, round) - ticker := time.NewTicker(time.Second * 5) + ticker := time.NewTicker(time.Second * 20) for nTxs := 0; nTxs < NTxs; { select { case b := <-newBlockCh: nTxs += b.(types.EventDataNewBlock).Block.Header.NumTxs case <-ticker.C: - t.Fatal("Timed out waiting to commit blocks with transactions") + panic("Timed out waiting to commit blocks with transactions") } } } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index d46f68cd..46e862ed 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + . "github.com/tendermint/go-common" "github.com/tendermint/tendermint/types" ) @@ -59,12 +60,12 @@ func TestReplayCatchup(t *testing.T) { // write the needed wal to file f, err := ioutil.TempFile(os.TempDir(), "replay_test_") if err != nil { - t.Fatal(err) + panic(err) } name := f.Name() _, err = f.WriteString(testLog) if err != nil { - t.Fatal(err) + panic(err) } f.Close() @@ -83,14 +84,14 @@ func TestReplayCatchup(t *testing.T) { // open wal and run catchup messages openWAL(t, cs, name) if err := cs.catchupReplay(cs.Height); err != nil { - t.Fatalf("Error on catchup replay %v", err) + panic(Fmt("Error on catchup replay %v", err)) } after := time.After(time.Second * 15) select { case <-newBlockCh: case <-after: - t.Fatal("Timed out waiting for new block") + panic("Timed out waiting for new block") } } @@ -98,7 +99,7 @@ func openWAL(t *testing.T, cs *ConsensusState, file string) { // open the wal wal, err := NewWAL(file, config.GetBool("cswal_light")) if err != nil { - t.Fatal(err) + panic(err) } wal.exists = true cs.wal = wal diff --git a/consensus/state.go b/consensus/state.go index 37209f68..50488f8d 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -268,7 +268,8 @@ func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) { } func (cs *ConsensusState) String() string { - return Fmt("ConsensusState(H:%v R:%v S:%v", cs.Height, cs.Round, cs.Step) + // better not to access shared variables + return Fmt("ConsensusState") //(H:%v R:%v S:%v", cs.Height, cs.Round, cs.Step) } func (cs *ConsensusState) GetState() *sm.State { @@ -322,6 +323,7 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { func (cs *ConsensusState) OnStop() { cs.QuitService.OnStop() + if cs.wal != nil && cs.IsRunning() { cs.wal.Wait() } @@ -1214,10 +1216,13 @@ func (cs *ConsensusState) finalizeCommit(height int) { // Create a copy of the state for staging stateCopy := cs.state.Copy() + // event cache for txs + eventCache := events.NewEventCache(cs.evsw) + // Run the block on the State: // + update validator sets // + run txs on the proxyAppConn - err := stateCopy.ExecBlock(cs.evsw, cs.proxyAppConn, block, blockParts.Header()) + err := stateCopy.ExecBlock(eventCache, cs.proxyAppConn, block, blockParts.Header()) if err != nil { // TODO: handle this gracefully. PanicQ(Fmt("Exec failed for application: %v", err)) @@ -1230,6 +1235,9 @@ func (cs *ConsensusState) finalizeCommit(height int) { PanicQ(Fmt("Commit failed for application: %v", err)) } + // txs committed, bad ones removed from mepool; fire events + eventCache.Flush() + // Save to blockStore. if cs.blockStore.Height() < block.Height { precommits := cs.Votes.Precommits(cs.CommitRound) diff --git a/consensus/state_test.go b/consensus/state_test.go index 797e5d04..9a8d6090 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -8,6 +8,7 @@ import ( "github.com/tendermint/tendermint/config/tendermint_test" //"github.com/tendermint/go-events" + . "github.com/tendermint/go-common" "github.com/tendermint/tendermint/types" ) @@ -67,21 +68,21 @@ func TestProposerSelection0(t *testing.T) { // lets commit a block and ensure proposer for the next height is correct prop := cs1.GetRoundState().Validators.Proposer() if !bytes.Equal(prop.Address, cs1.privValidator.Address) { - t.Fatalf("expected proposer to be validator %d. Got %X", 0, prop.Address) + panic(Fmt("expected proposer to be validator %d. Got %X", 0, prop.Address)) } // wait for complete proposal <-proposalCh rs := cs1.GetRoundState() - signAddVoteToFromMany(types.VoteTypePrecommit, cs1, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), vss[1:]...) + signAddVoteToFromMany(types.VoteTypePrecommit, cs1, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), nil, vss[1:]...) // wait for new round so next validator is set <-newRoundCh prop = cs1.GetRoundState().Validators.Proposer() if !bytes.Equal(prop.Address, vss[1].Address) { - t.Fatalf("expected proposer to be validator %d. Got %X", 1, prop.Address) + panic(Fmt("expected proposer to be validator %d. Got %X", 1, prop.Address)) } } @@ -102,11 +103,11 @@ func TestProposerSelection2(t *testing.T) { for i := 0; i < len(vss); i++ { prop := cs1.GetRoundState().Validators.Proposer() if !bytes.Equal(prop.Address, vss[(i+2)%len(vss)].Address) { - t.Fatalf("expected proposer to be validator %d. Got %X", (i+2)%len(vss), prop.Address) + panic(Fmt("expected proposer to be validator %d. Got %X", (i+2)%len(vss), prop.Address)) } rs := cs1.GetRoundState() - signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, rs.ProposalBlockParts.Header(), vss[1:]...) + signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, rs.ProposalBlockParts.Header(), nil, vss[1:]...) <-newRoundCh // wait for the new round event each round incrementRound(vss[1:]...) @@ -130,7 +131,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) { select { case <-timeoutCh: case <-ticker.C: - t.Fatal("Expected EnterPropose to timeout") + panic("Expected EnterPropose to timeout") } @@ -170,7 +171,7 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { ticker := time.NewTicker(cs.timeoutParams.ensureProposeTimeout()) select { case <-timeoutCh: - t.Fatal("Expected EnterPropose not to timeout") + panic("Expected EnterPropose not to timeout") case <-ticker.C: } @@ -200,7 +201,7 @@ func TestBadProposal(t *testing.T) { propBlockParts := propBlock.MakePartSet() proposal := types.NewProposal(cs2.Height, round, propBlockParts.Header(), -1) if err := cs2.SignProposal(config.GetString("chain_id"), proposal); err != nil { - t.Fatal("failed to sign bad proposal", err) + panic("failed to sign bad proposal: " + err.Error()) } // set the proposal block @@ -218,14 +219,13 @@ func TestBadProposal(t *testing.T) { validatePrevote(t, cs1, round, vss[0], nil) // add bad prevote from cs2 and wait for it - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, propBlock.Hash(), propBlock.MakePartSet().Header()) - <-voteCh + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, propBlock.Hash(), propBlock.MakePartSet().Header(), voteCh) // wait for precommit <-voteCh validatePrecommit(t, cs1, round, 0, vss[0], nil, nil) - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, propBlock.Hash(), propBlock.MakePartSet().Header()) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, propBlock.Hash(), propBlock.MakePartSet().Header(), voteCh) } //---------------------------------------------------------------------------------------------------- @@ -292,12 +292,11 @@ func TestFullRound2(t *testing.T) { <-voteCh // prevote // we should be stuck in limbo waiting for more prevotes - - propBlockHash, propPartsHeader := cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header() + rs := cs1.GetRoundState() + propBlockHash, propPartsHeader := rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header() // prevote arrives from cs2: - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, propBlockHash, propPartsHeader) - <-voteCh + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, propBlockHash, propPartsHeader, voteCh) <-voteCh //precommit @@ -307,8 +306,7 @@ func TestFullRound2(t *testing.T) { // we should be stuck in limbo waiting for more precommits // precommit arrives from cs2: - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, propBlockHash, propPartsHeader) - <-voteCh + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, propBlockHash, propPartsHeader, voteCh) // wait to finish commit, propose in next height <-newBlockCh @@ -346,8 +344,7 @@ func TestLockNoPOL(t *testing.T) { // we should now be stuck in limbo forever, waiting for more prevotes // prevote arrives from cs2: - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) - <-voteCh // prevote + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), voteCh) <-voteCh // precommit @@ -360,8 +357,7 @@ func TestLockNoPOL(t *testing.T) { hash := make([]byte, len(theBlockHash)) copy(hash, theBlockHash) hash[0] = byte((hash[0] + 1) % 255) - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) - <-voteCh // precommit + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header(), voteCh) // (note we're entering precommit for a second time this round) // but with invalid args. then we enterPrecommitWait, and the timeout to new round @@ -382,7 +378,7 @@ func TestLockNoPOL(t *testing.T) { rs = re.(types.EventDataRoundState).RoundState.(*RoundState) if rs.ProposalBlock != nil { - t.Fatal("Expected proposal block to be nil") + panic("Expected proposal block to be nil") } // wait to finish prevote @@ -392,8 +388,7 @@ func TestLockNoPOL(t *testing.T) { validatePrevote(t, cs1, 1, vss[0], rs.LockedBlock.Hash()) // add a conflicting prevote from the other validator - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) - <-voteCh + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header(), voteCh) // now we're going to enter prevote again, but with invalid args // and then prevote wait, which should timeout. then wait for precommit @@ -407,8 +402,7 @@ func TestLockNoPOL(t *testing.T) { // add conflicting precommit from cs2 // NOTE: in practice we should never get to a point where there are precommits for different blocks at the same round - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) - <-voteCh + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header(), voteCh) // (note we're entering precommit for a second time this round, but with invalid args // then we enterPrecommitWait and timeout into NewRound @@ -427,29 +421,27 @@ func TestLockNoPOL(t *testing.T) { // now we're on a new round and are the proposer if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) { - t.Fatalf("Expected proposal block to be locked block. Got %v, Expected %v", rs.ProposalBlock, rs.LockedBlock) + panic(Fmt("Expected proposal block to be locked block. Got %v, Expected %v", rs.ProposalBlock, rs.LockedBlock)) } <-voteCh // prevote validatePrevote(t, cs1, 2, vss[0], rs.LockedBlock.Hash()) - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) - <-voteCh + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header(), voteCh) <-timeoutWaitCh // prevote wait <-voteCh // precommit - validatePrecommit(t, cs1, 2, 0, vss[0], nil, theBlockHash) // precommit nil but be locked on proposal - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) // NOTE: conflicting precommits at same height - <-voteCh + validatePrecommit(t, cs1, 2, 0, vss[0], nil, theBlockHash) // precommit nil but be locked on proposal + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header(), voteCh) // NOTE: conflicting precommits at same height <-timeoutWaitCh // before we time out into new round, set next proposal block prop, propBlock := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) if prop == nil || propBlock == nil { - t.Fatal("Failed to create proposal block with cs2") + panic("Failed to create proposal block with cs2") } incrementRound(cs2) @@ -470,15 +462,13 @@ func TestLockNoPOL(t *testing.T) { // prevote for locked block (not proposal) validatePrevote(t, cs1, 0, vss[0], cs1.LockedBlock.Hash()) - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, propBlock.Hash(), propBlock.MakePartSet().Header()) - <-voteCh + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, propBlock.Hash(), propBlock.MakePartSet().Header(), voteCh) <-timeoutWaitCh <-voteCh - validatePrecommit(t, cs1, 2, 0, vss[0], nil, theBlockHash) // precommit nil but locked on proposal - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, propBlock.Hash(), propBlock.MakePartSet().Header()) // NOTE: conflicting precommits at same height - <-voteCh + validatePrecommit(t, cs1, 2, 0, vss[0], nil, theBlockHash) // precommit nil but locked on proposal + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, propBlock.Hash(), propBlock.MakePartSet().Header(), voteCh) // NOTE: conflicting precommits at same height } // 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka @@ -510,20 +500,19 @@ func TestLockPOLRelock(t *testing.T) { re := <-proposalCh rs := re.(types.EventDataRoundState).RoundState.(*RoundState) theBlockHash := rs.ProposalBlock.Hash() + theBlockPartsHeader := rs.ProposalBlockParts.Header() <-voteCh // prevote - signAddVoteToFromMany(types.VoteTypePrevote, cs1, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), cs2, cs3, cs4) - _, _, _ = <-voteCh, <-voteCh, <-voteCh // prevotes + signAddVoteToFromMany(types.VoteTypePrevote, cs1, theBlockHash, theBlockPartsHeader, voteCh, cs2, cs3, cs4) <-voteCh // our precommit // the proposed block should now be locked and our precommit added validatePrecommit(t, cs1, 0, 0, vss[0], theBlockHash, theBlockHash) // add precommits from the rest - signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs4) - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) - _, _, _ = <-voteCh, <-voteCh, <-voteCh // precommits + signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, voteCh, cs2, cs4) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, theBlockHash, theBlockPartsHeader, voteCh) // before we timeout to the new round set the new proposal prop, propBlock := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) @@ -560,12 +549,13 @@ func TestLockPOLRelock(t *testing.T) { validatePrevote(t, cs1, 0, vss[0], theBlockHash) // now lets add prevotes from everyone else for the new block - signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlockHash, propBlockParts.Header(), cs2, cs3, cs4) - _, _, _ = <-voteCh, <-voteCh, <-voteCh // prevotes + signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlockHash, propBlockParts.Header(), voteCh, cs2, cs3, cs4) // now either we go to PrevoteWait or Precommit select { case <-timeoutWaitCh: // we're in PrevoteWait, go to Precommit + // XXX: there's no guarantee we see the polka, this might be a precommit for nil, + // in which case the test fails! <-voteCh case <-voteCh: // we went straight to Precommit } @@ -573,19 +563,18 @@ func TestLockPOLRelock(t *testing.T) { // we should have unlocked and locked on the new block validatePrecommit(t, cs1, 1, 1, vss[0], propBlockHash, propBlockHash) - signAddVoteToFromMany(types.VoteTypePrecommit, cs1, propBlockHash, propBlockParts.Header(), cs2, cs3) - _, _ = <-voteCh, <-voteCh + signAddVoteToFromMany(types.VoteTypePrecommit, cs1, propBlockHash, propBlockParts.Header(), voteCh, cs2, cs3) be := <-newBlockCh b := be.(types.EventDataNewBlockHeader) re = <-newRoundCh rs = re.(types.EventDataRoundState).RoundState.(*RoundState) if rs.Height != 2 { - t.Fatal("Expected height to increment") + panic("Expected height to increment") } if !bytes.Equal(b.Header.Hash(), propBlockHash) { - t.Fatal("Expected new block to be proposal block") + panic("Expected new block to be proposal block") } } @@ -618,16 +607,18 @@ func TestLockPOLUnlock(t *testing.T) { <-voteCh // prevote - signAddVoteToFromMany(types.VoteTypePrevote, cs1, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), cs2, cs3, cs4) + signAddVoteToFromMany(types.VoteTypePrevote, cs1, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), nil, cs2, cs3, cs4) <-voteCh //precommit // the proposed block should now be locked and our precommit added validatePrecommit(t, cs1, 0, 0, vss[0], theBlockHash, theBlockHash) + rs = cs1.GetRoundState() + // add precommits from the rest - signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs4) - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) + signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, nil, cs2, cs4) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), nil) // before we time out into new round, set next proposal block prop, propBlock := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) @@ -663,7 +654,7 @@ func TestLockPOLUnlock(t *testing.T) { <-voteCh validatePrevote(t, cs1, 0, vss[0], lockedBlockHash) // now lets add prevotes from everyone else for nil (a polka!) - signAddVoteToFromMany(types.VoteTypePrevote, cs1, nil, types.PartSetHeader{}, cs2, cs3, cs4) + signAddVoteToFromMany(types.VoteTypePrevote, cs1, nil, types.PartSetHeader{}, nil, cs2, cs3, cs4) // the polka makes us unlock and precommit nil <-unlockCh @@ -673,7 +664,7 @@ func TestLockPOLUnlock(t *testing.T) { // NOTE: since we don't relock on nil, the lock round is 0 validatePrecommit(t, cs1, 1, 0, vss[0], nil, nil) - signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs3) + signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, nil, cs2, cs3) <-newRoundCh } @@ -711,13 +702,13 @@ func TestLockPOLSafety1(t *testing.T) { _, v1 := cs1.Validators.GetByAddress(vss[0].Address) v1.VotingPower = 1 if updated := cs1.Validators.Update(v1); !updated { - t.Fatal("failed to update validator") + panic("failed to update validator") }*/ log.Warn("old prop", "hash", fmt.Sprintf("%X", propBlock.Hash())) // we do see them precommit nil - signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs3, cs4) + signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, nil, cs2, cs3, cs4) prop, propBlock := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) propBlockHash := propBlock.Hash() @@ -746,7 +737,7 @@ func TestLockPOLSafety1(t *testing.T) { rs = re.(types.EventDataRoundState).RoundState.(*RoundState) if rs.LockedBlock != nil { - t.Fatal("we should not be locked!") + panic("we should not be locked!") } log.Warn("new prop", "hash", fmt.Sprintf("%X", propBlockHash)) // go to prevote, prevote for proposal block @@ -754,14 +745,14 @@ func TestLockPOLSafety1(t *testing.T) { validatePrevote(t, cs1, 1, vss[0], propBlockHash) // now we see the others prevote for it, so we should lock on it - signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlockHash, propBlockParts.Header(), cs2, cs3, cs4) + signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlockHash, propBlockParts.Header(), nil, cs2, cs3, cs4) <-voteCh // precommit // we should have precommitted validatePrecommit(t, cs1, 1, 1, vss[0], propBlockHash, propBlockHash) - signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs3) + signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, nil, cs2, cs3) <-timeoutWaitCh @@ -840,15 +831,15 @@ func TestLockPOLSafety2(t *testing.T) { <-voteCh // prevote - signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlockHash1, propBlockParts1.Header(), cs2, cs3, cs4) + signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlockHash1, propBlockParts1.Header(), nil, cs2, cs3, cs4) <-voteCh // precommit // the proposed block should now be locked and our precommit added validatePrecommit(t, cs1, 1, 1, vss[0], propBlockHash1, propBlockHash1) // add precommits from the rest - signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs4) - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, propBlockHash1, propBlockParts1.Header()) + signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, nil, cs2, cs4) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, propBlockHash1, propBlockParts1.Header(), nil) incrementRound(cs2, cs3, cs4) @@ -858,7 +849,7 @@ func TestLockPOLSafety2(t *testing.T) { // in round 2 we see the polkad block from round 0 newProp := types.NewProposal(height, 2, propBlockParts0.Header(), 0) if err := cs3.SignProposal(config.GetString("chain_id"), newProp); err != nil { - t.Fatal(err) + panic(err) } cs1.SetProposalAndBlock(newProp, propBlock0, propBlockParts0, "some peer") addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4) // add the pol votes @@ -877,7 +868,7 @@ func TestLockPOLSafety2(t *testing.T) { select { case <-unlockCh: - t.Fatal("validator unlocked using an old polka") + panic("validator unlocked using an old polka") case <-voteCh: // prevote our locked block } @@ -910,9 +901,9 @@ func TestSlashingPrevotes(t *testing.T) { // we should now be stuck in limbo forever, waiting for more prevotes // add one for a different block should cause us to go into prevote wait - hash := cs1.ProposalBlock.Hash() + hash := rs.ProposalBlock.Hash() hash[0] = byte(hash[0]+1) % 255 - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, rs.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, rs.ProposalBlockParts.Header(), nil) <-timeoutWaitCh @@ -920,7 +911,7 @@ func TestSlashingPrevotes(t *testing.T) { // away and ignore more prevotes (and thus fail to slash!) // add the conflicting vote - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(),nil) // XXX: Check for existence of Dupeout info } @@ -942,7 +933,7 @@ func TestSlashingPrecommits(t *testing.T) { <-voteCh // prevote // add prevote from cs2 - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), nil) <-voteCh // precommit @@ -950,13 +941,13 @@ func TestSlashingPrecommits(t *testing.T) { // add one for a different block should cause us to go into prevote wait hash := rs.ProposalBlock.Hash() hash[0] = byte(hash[0]+1) % 255 - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlockParts.Header(),nil) // NOTE: we have to send the vote for different block first so we don't just go into precommit round right // away and ignore more prevotes (and thus fail to slash!) // add precommit from cs2 - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(),nil) // XXX: Check for existence of Dupeout info } @@ -990,15 +981,15 @@ func TestHalt1(t *testing.T) { <-voteCh // prevote - signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlock.Hash(), propBlockParts.Header(), cs3, cs4) + signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlock.Hash(), propBlockParts.Header(), nil, cs3, cs4) <-voteCh // precommit // the proposed block should now be locked and our precommit added validatePrecommit(t, cs1, 0, 0, vss[0], propBlock.Hash(), propBlock.Hash()) // add precommits from the rest - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, nil, types.PartSetHeader{}) // didnt receive proposal - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, propBlock.Hash(), propBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, nil, types.PartSetHeader{}, nil) // didnt receive proposal + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, propBlock.Hash(), propBlockParts.Header(), nil) // we receive this later, but cs3 might receive it earlier and with ours will go to commit! precommit4 := signVote(cs4, types.VoteTypePrecommit, propBlock.Hash(), propBlockParts.Header()) @@ -1028,6 +1019,6 @@ func TestHalt1(t *testing.T) { rs = re.(types.EventDataRoundState).RoundState.(*RoundState) if rs.Height != 2 { - t.Fatal("expected height to increment") + panic("expected height to increment") } } diff --git a/consensus/wal_test.go b/consensus/wal_test.go index 808a5e41..648692e4 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -6,6 +6,8 @@ import ( "path" "strings" "testing" + + . "github.com/tendermint/go-common" ) var testTxt = `{"time":"2016-01-16T04:42:00.390Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrevote"}]} @@ -18,7 +20,7 @@ var testTxt = `{"time":"2016-01-16T04:42:00.390Z","msg":[1,{"height":28219,"roun func TestSeek(t *testing.T) { f, err := ioutil.TempFile(os.TempDir(), "seek_test_") if err != nil { - t.Fatal(err) + panic(err) } stat, _ := f.Stat() @@ -26,13 +28,13 @@ func TestSeek(t *testing.T) { _, err = f.WriteString(testTxt) if err != nil { - t.Fatal(err) + panic(err) } f.Close() wal, err := NewWAL(path.Join(os.TempDir(), name), config.GetBool("cswal_light")) if err != nil { - t.Fatal(err) + panic(err) } keyWord := "Precommit" @@ -43,7 +45,7 @@ func TestSeek(t *testing.T) { return false }) if err != nil { - t.Fatal(err) + panic(err) } // confirm n @@ -58,18 +60,18 @@ func TestSeek(t *testing.T) { // n is lines from the end. spl = spl[i:] if n != len(spl) { - t.Fatalf("Wrong nLines. Got %d, expected %d", n, len(spl)) + panic(Fmt("Wrong nLines. Got %d, expected %d", n, len(spl))) } b, err := ioutil.ReadAll(wal.fp) if err != nil { - t.Fatal(err) + panic(err) } // first char is a \n spl2 := strings.Split(strings.Trim(string(b), "\n"), "\n") for i, s := range spl { if s != spl2[i] { - t.Fatalf("Mismatch. Got %s, expected %s", spl2[i], s) + panic(Fmt("Mismatch. Got %s, expected %s", spl2[i], s)) } } diff --git a/glide.lock b/glide.lock index 1cc00875..03693aa4 100644 --- a/glide.lock +++ b/glide.lock @@ -54,9 +54,9 @@ imports: - name: github.com/tendermint/go-clist version: 634527f5b60fd7c71ca811262493df2ad65ee0ca - name: github.com/tendermint/go-common - version: dcfa46af1341d03b80d32e4901019d1668b978b9 + version: dee6622bf7f811d3ba8638a3f5ffaf8d679aa9d9 - name: github.com/tendermint/go-config - version: cfcef384d64b94e50909596e39b32ffb3cc20573 + version: e64b424499acd0eb9856b88e10c0dff41628c0d6 - name: github.com/tendermint/go-crypto version: 41cfb7b677f4e16cdfd22b6ce0946c89919fbc7b - name: github.com/tendermint/go-db @@ -68,7 +68,7 @@ imports: - name: github.com/tendermint/go-merkle version: 05042c6ab9cad51d12e4cecf717ae68e3b1409a8 - name: github.com/tendermint/go-p2p - version: 5bd7692323ec60d6461678f09b5024a952164151 + version: 929cf433b9c8e987af5f7f3ca3ce717e1e3eda53 subpackages: - upnp - name: github.com/tendermint/go-rpc @@ -84,7 +84,7 @@ imports: subpackages: - term - name: github.com/tendermint/tmsp - version: f41bc5f11969e22b357d94b4247403fd62d40445 + version: 49a67aee8a7984a68eabe2c45ff6eb0ff51e31f9 subpackages: - client - example/dummy @@ -115,7 +115,7 @@ imports: subpackages: - unix - name: google.golang.org/grpc - version: 88aeffff979aa77aa502cb011423d0a08fa12c5a + version: daeb9cc0f2607997cce611a1458e71b981ce5986 subpackages: - codes - credentials diff --git a/mempool/mempool.go b/mempool/mempool.go index 87a185ef..6cd2227c 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -60,7 +60,7 @@ type Mempool struct { // Keep a cache of already-seen txs. // This reduces the pressure on the proxyApp. cacheMap map[string]struct{} - cacheList *list.List + cacheList *list.List // to remove oldest tx when cache gets too big } func NewMempool(config cfg.Config, proxyAppConn proxy.AppConn) *Mempool { @@ -81,6 +81,7 @@ func NewMempool(config cfg.Config, proxyAppConn proxy.AppConn) *Mempool { return mempool } +// consensus must be able to hold lock to safely update func (mem *Mempool) Lock() { mem.proxyMtx.Lock() } @@ -89,10 +90,25 @@ func (mem *Mempool) Unlock() { mem.proxyMtx.Unlock() } +// Number of transactions in the mempool clist func (mem *Mempool) Size() int { return mem.txs.Len() } +// Remove all transactions from mempool and cache +func (mem *Mempool) Flush() { + mem.proxyMtx.Lock() + defer mem.proxyMtx.Unlock() + + mem.cacheMap = make(map[string]struct{}, cacheSize) + mem.cacheList.Init() + + for e := mem.txs.Front(); e != nil; e = e.Next() { + mem.txs.Remove(e) + e.DetachPrev() + } +} + // Return the first element of mem.txs for peer goroutines to call .NextWait() on. // Blocks until txs has elements. func (mem *Mempool) TxsFrontWait() *clist.CElement { @@ -125,6 +141,8 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) { if mem.cacheList.Len() >= cacheSize { popped := mem.cacheList.Front() poppedTx := popped.Value.(types.Tx) + // NOTE: the tx may have already been removed from the map + // but deleting a non-existant element is fine delete(mem.cacheMap, string(poppedTx)) mem.cacheList.Remove(popped) } @@ -144,6 +162,13 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) { return nil } +func (mem *Mempool) removeTxFromCacheMap(tx []byte) { + mem.proxyMtx.Lock() + // NOTE tx not removed from cacheList + delete(mem.cacheMap, string(tx)) + mem.proxyMtx.Unlock() +} + // TMSP callback function func (mem *Mempool) resCb(req *tmsp.Request, res *tmsp.Response) { if mem.recheckCursor == nil { @@ -165,8 +190,14 @@ func (mem *Mempool) resCbNormal(req *tmsp.Request, res *tmsp.Response) { } mem.txs.PushBack(memTx) } else { - log.Info("Bad Transaction", "res", r) // ignore bad transaction + log.Info("Bad Transaction", "res", r) + + // remove from cache (it might be good later) + // note this is an async callback, + // so we need to grab the lock in removeTxFromCacheMap + mem.removeTxFromCacheMap(req.GetCheckTx().Tx) + // TODO: handle other retcodes } default: @@ -188,6 +219,9 @@ func (mem *Mempool) resCbRecheck(req *tmsp.Request, res *tmsp.Response) { // Tx became invalidated due to newly committed block. mem.txs.Remove(mem.recheckCursor) mem.recheckCursor.DetachPrev() + + // remove from cache (it might be good later) + mem.removeTxFromCacheMap(req.GetCheckTx().Tx) } if mem.recheckCursor == mem.recheckEnd { mem.recheckCursor = nil @@ -270,10 +304,13 @@ func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { goodTxs := make([]types.Tx, 0, mem.txs.Len()) for e := mem.txs.Front(); e != nil; e = e.Next() { memTx := e.Value.(*mempoolTx) + // Remove the tx if it's alredy in a block. if _, ok := blockTxsMap[string(memTx.tx)]; ok { - // Remove the tx since already in block. + // remove from clist mem.txs.Remove(e) e.DetachPrev() + + // NOTE: we don't remove committed txs from the cache. continue } // Good tx! diff --git a/node/node.go b/node/node.go index 3d2c9196..231b6b65 100644 --- a/node/node.go +++ b/node/node.go @@ -177,6 +177,7 @@ func (n *Node) AddListener(l p2p.Listener) { func (n *Node) StartRPC() ([]net.Listener, error) { rpccore.SetConfig(n.config) + rpccore.SetEventSwitch(n.evsw) rpccore.SetBlockStore(n.blockStore) rpccore.SetConsensusState(n.consensusState) rpccore.SetConsensusReactor(n.consensusReactor) diff --git a/rpc/core/dev.go b/rpc/core/dev.go index 2b8dfb8f..6ae2014b 100644 --- a/rpc/core/dev.go +++ b/rpc/core/dev.go @@ -9,6 +9,11 @@ import ( ctypes "github.com/tendermint/tendermint/rpc/core/types" ) +func UnsafeFlushMempool() (*ctypes.ResultUnsafeFlushMempool, error) { + mempoolReactor.Mempool.Flush() + return &ctypes.ResultUnsafeFlushMempool{}, nil +} + func UnsafeSetConfig(typ, key, value string) (*ctypes.ResultUnsafeSetConfig, error) { switch typ { case "string": diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index b0e5c0c4..bef85c79 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -2,14 +2,18 @@ package core import ( "fmt" + "time" + + "github.com/tendermint/go-events" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" tmsp "github.com/tendermint/tmsp/types" ) //----------------------------------------------------------------------------- +// NOTE: tx should be signed, but this is only checked at the app level (not by Tendermint!) -// NOTE: tx must be signed +// Returns right away, with no response func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { err := mempoolReactor.BroadcastTx(tx, nil) if err != nil { @@ -18,7 +22,7 @@ func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { return &ctypes.ResultBroadcastTx{}, nil } -// Note: tx must be signed +// Returns with the response from CheckTx func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *tmsp.Response, 1) err := mempoolReactor.BroadcastTx(tx, func(res *tmsp.Response) { @@ -36,8 +40,70 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { }, nil } +// CONTRACT: returns error==nil iff the tx is included in a block. +// +// If CheckTx fails, return with the response from CheckTx AND an error. +// Else, block until the tx is included in a block, +// and return the result of AppendTx (with no error). +// Even if AppendTx fails, so long as the tx is included in a block this function +// will not return an error - it is the caller's responsibility to check res.Code. +// The function times out after five minutes and returns the result of CheckTx and an error. +// TODO: smarter timeout logic or someway to cancel (tx not getting committed is a sign of a larger problem!) +func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + + // subscribe to tx being committed in block + appendTxResCh := make(chan *tmsp.Response, 1) + eventSwitch.AddListenerForEvent("rpc", types.EventStringTx(tx), func(data events.EventData) { + appendTxResCh <- data.(*tmsp.Response) + }) + + // broadcast the tx and register checktx callback + checkTxResCh := make(chan *tmsp.Response, 1) + err := mempoolReactor.BroadcastTx(tx, func(res *tmsp.Response) { + checkTxResCh <- res + }) + if err != nil { + return nil, fmt.Errorf("Error broadcasting transaction: %v", err) + } + checkTxRes := <-checkTxResCh + checkTxR := checkTxRes.GetCheckTx() + if r := checkTxR; r.Code != tmsp.CodeType_OK { + // CheckTx failed! + return &ctypes.ResultBroadcastTx{ + Code: r.Code, + Data: r.Data, + Log: r.Log, + }, fmt.Errorf("Check tx failed with non-zero code: %s. Data: %X; Log: %s", r.Code.String(), r.Data, r.Log) + } + + // Wait for the tx to be included in a block, + // timeout after something reasonable. + timer := time.NewTimer(60 * 5 * time.Second) + select { + case appendTxRes := <-appendTxResCh: + // The tx was included in a block. + // NOTE we don't return an error regardless of the AppendTx code; + // clients must check this to see if they need to send a new tx! + r := appendTxRes.GetAppendTx() + return &ctypes.ResultBroadcastTx{ + Code: r.Code, + Data: r.Data, + Log: r.Log, + }, nil + case <-timer.C: + r := checkTxR + return &ctypes.ResultBroadcastTx{ + Code: r.Code, + Data: r.Data, + Log: r.Log, + }, fmt.Errorf("Timed out waiting for transaction to be included in a block") + } + + panic("Should never happen!") +} + func UnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { - txs := mempoolReactor.Mempool.Reap(0) + txs := mempoolReactor.Mempool.Reap(-1) return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, nil } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index de196e81..464f7fda 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -4,12 +4,14 @@ import ( cfg "github.com/tendermint/go-config" "github.com/tendermint/go-p2p" + "github.com/tendermint/go-events" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/types" ) +var eventSwitch *events.EventSwitch var blockStore *bc.BlockStore var consensusState *consensus.ConsensusState var consensusReactor *consensus.ConsensusReactor @@ -24,6 +26,10 @@ func SetConfig(c cfg.Config) { config = c } +func SetEventSwitch(evsw *events.EventSwitch) { + eventSwitch = evsw +} + func SetBlockStore(bs *bc.BlockStore) { blockStore = bs } diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 585a18cd..78a0b618 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -19,11 +19,13 @@ var Routes = map[string]*rpc.RPCFunc{ "block": rpc.NewRPCFunc(BlockResult, "height"), "validators": rpc.NewRPCFunc(ValidatorsResult, ""), "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusStateResult, ""), + "broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommitResult, "tx"), "broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSyncResult, "tx"), "broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsyncResult, "tx"), "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""), "num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxsResult, ""), + "unsafe_flush_mempool": rpc.NewRPCFunc(UnsafeFlushMempool, ""), "unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfigResult, "type,key,value"), "unsafe_start_cpu_profiler": rpc.NewRPCFunc(UnsafeStartCPUProfilerResult, "filename"), "unsafe_stop_cpu_profiler": rpc.NewRPCFunc(UnsafeStopCPUProfilerResult, ""), @@ -126,6 +128,14 @@ func NumUnconfirmedTxsResult() (ctypes.TMResult, error) { } } +func BroadcastTxCommitResult(tx []byte) (ctypes.TMResult, error) { + if r, err := BroadcastTxCommit(tx); err != nil { + return nil, err + } else { + return r, nil + } +} + func BroadcastTxSyncResult(tx []byte) (ctypes.TMResult, error) { if r, err := BroadcastTxSync(tx); err != nil { return nil, err @@ -142,6 +152,14 @@ func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) { } } +func UnsafeFlushMempoolResult() (ctypes.TMResult, error) { + if r, err := UnsafeFlushMempool(); err != nil { + return nil, err + } else { + return r, nil + } +} + func UnsafeSetConfigResult(typ, key, value string) (ctypes.TMResult, error) { if r, err := UnsafeSetConfig(typ, key, value); err != nil { return nil, err diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index d08892e8..c1eebb6e 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -68,6 +68,8 @@ type ResultUnconfirmedTxs struct { Txs []types.Tx `json:"txs"` } +type ResultUnsafeFlushMempool struct{} + type ResultUnsafeSetConfig struct{} type ResultUnsafeProfile struct{} @@ -115,6 +117,7 @@ const ( ResultTypeUnsafeStartCPUProfiler = byte(0xa1) ResultTypeUnsafeStopCPUProfiler = byte(0xa2) ResultTypeUnsafeWriteHeapProfile = byte(0xa3) + ResultTypeUnsafeFlushMempool = byte(0xa4) ) type TMResult interface { @@ -141,4 +144,5 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeStartCPUProfiler}, wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeStopCPUProfiler}, wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeWriteHeapProfile}, + wire.ConcreteType{&ResultUnsafeFlushMempool{}, ResultTypeUnsafeFlushMempool}, ) diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index a5b243be..47d2ead1 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -1,11 +1,15 @@ package rpctest import ( + "bytes" + "crypto/rand" "fmt" "testing" + . "github.com/tendermint/go-common" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" + tmsp "github.com/tendermint/tmsp/types" ) //-------------------------------------------------------------------------------- @@ -19,7 +23,7 @@ func TestURIStatus(t *testing.T) { tmResult := new(ctypes.TMResult) _, err := clientURI.Call("status", map[string]interface{}{}, tmResult) if err != nil { - t.Fatal(err) + panic(err) } testStatus(t, tmResult) } @@ -28,7 +32,7 @@ func TestJSONStatus(t *testing.T) { tmResult := new(ctypes.TMResult) _, err := clientJSON.Call("status", []interface{}{}, tmResult) if err != nil { - t.Fatal(err) + panic(err) } testStatus(t, tmResult) } @@ -37,38 +41,102 @@ func testStatus(t *testing.T, statusI interface{}) { tmRes := statusI.(*ctypes.TMResult) status := (*tmRes).(*ctypes.ResultStatus) if status.NodeInfo.Network != chainID { - t.Fatal(fmt.Errorf("ChainID mismatch: got %s expected %s", + panic(Fmt("ChainID mismatch: got %s expected %s", status.NodeInfo.Network, chainID)) } } -// TODO -/* -func testBroadcastTx(t *testing.T, typ string) { - amt := int64(100) - toAddr := user[1].Address - tx := makeDefaultSendTxSigned(t, typ, toAddr, amt) - receipt := broadcastTx(t, typ, tx) - if receipt.CreatesContract > 0 { - t.Fatal("This tx does not create a contract") +//-------------------------------------------------------------------------------- +// broadcast tx sync + +func testTx() []byte { + buf := make([]byte, 16) + _, err := rand.Read(buf) + if err != nil { + panic(err) } - if len(receipt.TxHash) == 0 { - t.Fatal("Failed to compute tx hash") + return buf +} + +func TestURIBroadcastTxSync(t *testing.T) { + config.Set("block_size", 0) + defer config.Set("block_size", -1) + tmResult := new(ctypes.TMResult) + tx := testTx() + _, err := clientURI.Call("broadcast_tx_sync", map[string]interface{}{"tx": tx}, tmResult) + if err != nil { + panic(err) } - pool := node.MempoolReactor().Mempool - txs := pool.GetProposalTxs() - if len(txs) != mempoolCount { - t.Fatalf("The mem pool has %d txs. Expected %d", len(txs), mempoolCount) + testBroadcastTxSync(t, tmResult, tx) +} + +func TestJSONBroadcastTxSync(t *testing.T) { + config.Set("block_size", 0) + defer config.Set("block_size", -1) + tmResult := new(ctypes.TMResult) + tx := testTx() + _, err := clientJSON.Call("broadcast_tx_sync", []interface{}{tx}, tmResult) + if err != nil { + panic(err) } - tx2 := txs[mempoolCount-1].(*types.SendTx) - n, err := new(int64), new(error) - buf1, buf2 := new(bytes.Buffer), new(bytes.Buffer) - tx.WriteSignBytes(chainID, buf1, n, err) - tx2.WriteSignBytes(chainID, buf2, n, err) - if bytes.Compare(buf1.Bytes(), buf2.Bytes()) != 0 { - t.Fatal("inconsistent hashes for mempool tx and sent tx") + testBroadcastTxSync(t, tmResult, tx) +} + +func testBroadcastTxSync(t *testing.T, resI interface{}, tx []byte) { + tmRes := resI.(*ctypes.TMResult) + res := (*tmRes).(*ctypes.ResultBroadcastTx) + if res.Code != tmsp.CodeType_OK { + panic(Fmt("BroadcastTxSync got non-zero exit code: %v. %X; %s", res.Code, res.Data, res.Log)) } -}*/ + mem := node.MempoolReactor().Mempool + if mem.Size() != 1 { + panic(Fmt("Mempool size should have been 1. Got %d", mem.Size())) + } + + txs := mem.Reap(1) + if !bytes.Equal(txs[0], tx) { + panic(Fmt("Tx in mempool does not match test tx. Got %X, expected %X", txs[0], testTx)) + } + + mem.Flush() +} + +//-------------------------------------------------------------------------------- +// broadcast tx commit + +func TestURIBroadcastTxCommit(t *testing.T) { + tmResult := new(ctypes.TMResult) + tx := testTx() + _, err := clientURI.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult) + if err != nil { + panic(err) + } + testBroadcastTxCommit(t, tmResult, tx) +} + +func TestJSONBroadcastTxCommit(t *testing.T) { + tmResult := new(ctypes.TMResult) + tx := testTx() + _, err := clientJSON.Call("broadcast_tx_commit", []interface{}{tx}, tmResult) + if err != nil { + panic(err) + } + testBroadcastTxCommit(t, tmResult, tx) +} + +func testBroadcastTxCommit(t *testing.T, resI interface{}, tx []byte) { + tmRes := resI.(*ctypes.TMResult) + res := (*tmRes).(*ctypes.ResultBroadcastTx) + if res.Code != tmsp.CodeType_OK { + panic(Fmt("BroadcastTxCommit got non-zero exit code: %v. %X; %s", res.Code, res.Data, res.Log)) + } + mem := node.MempoolReactor().Mempool + if mem.Size() != 0 { + panic(Fmt("Mempool size should have been 0. Got %d", mem.Size())) + } + + // TODO: find tx in block +} //-------------------------------------------------------------------------------- // Test the websocket service @@ -179,7 +247,7 @@ func TestURIUnsafeSetConfig(t *testing.T) { "value": testCase[2], }, tmResult) if err != nil { - t.Fatal(err) + panic(err) } } testUnsafeSetConfig(t) @@ -190,7 +258,7 @@ func TestJSONUnsafeSetConfig(t *testing.T) { tmResult := new(ctypes.TMResult) _, err := clientJSON.Call("unsafe_set_config", []interface{}{testCase[0], testCase[1], testCase[2]}, tmResult) if err != nil { - t.Fatal(err) + panic(err) } } testUnsafeSetConfig(t) @@ -199,16 +267,16 @@ func TestJSONUnsafeSetConfig(t *testing.T) { func testUnsafeSetConfig(t *testing.T) { s := config.GetString("key1") if s != stringVal { - t.Fatalf("got %v, expected %v", s, stringVal) + panic(Fmt("got %v, expected %v", s, stringVal)) } i := config.GetInt("key2") if i != intVal { - t.Fatalf("got %v, expected %v", i, intVal) + panic(Fmt("got %v, expected %v", i, intVal)) } b := config.GetBool("key3") if b != boolVal { - t.Fatalf("got %v, expected %v", b, boolVal) + panic(Fmt("got %v, expected %v", b, boolVal)) } } diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index c209324b..59709ada 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -20,7 +20,6 @@ import ( var ( config cfg.Config node *nm.Node - mempoolCount = 0 chainID string rpcAddr string requestAddr string @@ -76,7 +75,7 @@ func newNode(ready chan struct{}) { func newWSClient(t *testing.T) *client.WSClient { wsc := client.NewWSClient(websocketAddr, websocketEndpoint) if _, err := wsc.Start(); err != nil { - t.Fatal(err) + panic(err) } return wsc } @@ -84,14 +83,14 @@ func newWSClient(t *testing.T) *client.WSClient { // subscribe to an event func subscribe(t *testing.T, wsc *client.WSClient, eventid string) { if err := wsc.Subscribe(eventid); err != nil { - t.Fatal(err) + panic(err) } } // unsubscribe from an event func unsubscribe(t *testing.T, wsc *client.WSClient, eventid string) { if err := wsc.Unsubscribe(eventid); err != nil { - t.Fatal(err) + panic(err) } } @@ -138,7 +137,7 @@ func waitForEvent(t *testing.T, wsc *client.WSClient, eventid string, dieOnTimeo case <-timeout.C: if dieOnTimeout { wsc.Stop() - t.Fatalf("%s event was not received in time", eventid) + panic(Fmt("%s event was not received in time", eventid)) } // else that's great, we didn't hear the event // and we shouldn't have @@ -147,14 +146,13 @@ func waitForEvent(t *testing.T, wsc *client.WSClient, eventid string, dieOnTimeo // message was received and expected // run the check if err := check(eventid, eventData); err != nil { - t.Fatal(err) // Show the stack trace. + panic(err) // Show the stack trace. } } else { wsc.Stop() - t.Fatalf("%s event was not expected", eventid) + panic(Fmt("%s event was not expected", eventid)) } case err := <-errCh: - t.Fatal(err) panic(err) // Show the stack trace. } diff --git a/state/execution.go b/state/execution.go index 230bceb2..1acd0493 100644 --- a/state/execution.go +++ b/state/execution.go @@ -18,7 +18,7 @@ func (s *State) ValidateBlock(block *types.Block) error { // Execute the block to mutate State. // Validates block and then executes Data.Txs in the block. -func (s *State) ExecBlock(evsw *events.EventSwitch, proxyAppConn proxy.AppConn, block *types.Block, blockPartsHeader types.PartSetHeader) error { +func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConn, block *types.Block, blockPartsHeader types.PartSetHeader) error { // Validate the block. err := s.validateBlock(block) @@ -34,7 +34,7 @@ func (s *State) ExecBlock(evsw *events.EventSwitch, proxyAppConn proxy.AppConn, nextValSet := valSet.Copy() // Execute the block txs - err = s.execBlockOnProxyApp(evsw, proxyAppConn, block) + err = s.execBlockOnProxyApp(eventCache, proxyAppConn, block) if err != nil { // There was some error in proxyApp // TODO Report error and wait for proxyApp to be available. @@ -55,7 +55,7 @@ func (s *State) ExecBlock(evsw *events.EventSwitch, proxyAppConn proxy.AppConn, // Executes block's transactions on proxyAppConn. // TODO: Generate a bitmap or otherwise store tx validity in state. -func (s *State) execBlockOnProxyApp(evsw *events.EventSwitch, proxyAppConn proxy.AppConn, block *types.Block) error { +func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn proxy.AppConn, block *types.Block) error { var validTxs, invalidTxs = 0, 0 @@ -73,6 +73,9 @@ func (s *State) execBlockOnProxyApp(evsw *events.EventSwitch, proxyAppConn proxy log.Debug("Invalid tx", "code", r.AppendTx.Code, "log", r.AppendTx.Log) invalidTxs += 1 } + // NOTE: if we count we can access the tx from the block instead of + // pulling it from the req + eventCache.FireEvent(types.EventStringTx(req.GetAppendTx().Tx), res) } } proxyAppConn.SetResponseCallback(proxyCb) diff --git a/test/rpc/clean.sh b/test/rpc/clean.sh new file mode 100644 index 00000000..1b34033f --- /dev/null +++ b/test/rpc/clean.sh @@ -0,0 +1,3 @@ +killall tendermint +killall dummy +killall counter diff --git a/test/rpc/counter_test.sh b/test/rpc/counter_test.sh new file mode 100644 index 00000000..809a2212 --- /dev/null +++ b/test/rpc/counter_test.sh @@ -0,0 +1,69 @@ +#! /bin/bash + +##################### +# counter over socket +##################### +TESTNAME=$1 + +# Send some txs + +function sendTx() { + TX=$1 + RESPONSE=`curl -s localhost:46657/broadcast_tx_commit?tx=\"$TX\"` + CODE=`echo $RESPONSE | jq .result[1].code` + ERROR=`echo $RESPONSE | jq .error` + ERROR=$(echo "$ERROR" | tr -d '"') # remove surrounding quotes +} + +# 0 should pass once and get in block, with no error +TX=00 +sendTx $TX +if [[ $CODE != 0 ]]; then + echo "Got non-zero exit code for $TX. $RESPONSE" + exit 1 +fi +if [[ "$ERROR" != "" ]]; then + echo "Unexpected error. Tx $TX should have been included in a block. $ERROR" + exit 1 +fi + + + +# second time should get rejected by the mempool (return error and non-zero code) +sendTx $TX +if [[ $CODE == 0 ]]; then + echo "Got zero exit code for $TX. Expected tx to be rejected by mempool. $RESPONSE" + exit 1 +fi +if [[ "$ERROR" == "" ]]; then + echo "Expected to get an error - tx $TX should have been rejected from mempool" + echo "$RESPONSE" + exit 1 +fi + + +# now, TX=01 should pass, with no error +TX=01 +sendTx $TX +if [[ $CODE != 0 ]]; then + echo "Got non-zero exit code for $TX. $RESPONSE" + exit 1 +fi +if [[ "$ERROR" != "" ]]; then + echo "Unexpected error. Tx $TX should have been accepted in block. $ERROR" + exit 1 +fi + +# now, TX=03 should get in a block (passes CheckTx, no error), but is invalid +TX=03 +sendTx $TX +if [[ $CODE == 0 ]]; then + echo "Got zero exit code for $TX. Should have been bad nonce. $RESPONSE" + exit 1 +fi +if [[ "$ERROR" != "" ]]; then + echo "Unexpected error. Tx $TX should have been included in a block. $ERROR" + exit 1 +fi + +echo "Passed Test: $TESTNAME" diff --git a/test/rpc/dummy_test.sh b/test/rpc/dummy_test.sh new file mode 100644 index 00000000..9410c88d --- /dev/null +++ b/test/rpc/dummy_test.sh @@ -0,0 +1,34 @@ +#! /bin/bash + +function toHex() { + echo -n $1 | hexdump -ve '1/1 "%.2X"' +} + +##################### +# dummy with curl +##################### +TESTNAME=$1 + +# store key value pair +KEY="abcd" +VALUE="dcba" +curl localhost:46657/broadcast_tx_commit?tx=\"$(toHex $KEY=$VALUE)\" +echo "" + +# we should be able to look up the key +RESPONSE=`tmsp-cli query $KEY` +A=`echo $RESPONSE | grep exists=true` +if [[ $? != 0 ]]; then + echo "Failed to find 'exists=true' for $KEY. Response:" + echo "$RESPONSE" +fi + +# we should not be able to look up the value +RESPONSE=`tmsp-cli query $VALUE` +A=`echo $RESPONSE | grep exists=true` +if [[ $? == 0 ]]; then + echo "Found 'exists=true' for $VALUE when we should not have. Response:" + echo "$RESPONSE" +fi + +echo "Passed Test: $TESTNAME" diff --git a/test/rpc/test.sh b/test/rpc/test.sh new file mode 100644 index 00000000..f31b19c5 --- /dev/null +++ b/test/rpc/test.sh @@ -0,0 +1,57 @@ +#! /bin/bash +set -e + +#- dummy over socket, curl +#- counter over socket, curl +#- counter over grpc, curl +#- counter over grpc, grpc + +# TODO: install everything + +function dummy_over_socket(){ + dummy > /dev/null & + tendermint node > tendermint.log & + sleep 3 + + bash dummy_test.sh "Dummy over Socket" + + killall dummy tendermint +} + + +function counter_over_socket() { + counter --serial > /dev/null & + tendermint node > tendermint.log & + sleep 3 + + bash counter_test.sh "Counter over Socket" + + killall counter tendermint +} + +function counter_over_grpc() { + counter --serial --tmsp grpc > /dev/null & + tendermint node --tmsp grpc > tendermint.log & + sleep 3 + + bash counter_test.sh "Counter over GRPC" + + killall counter tendermint +} + +case "$1" in + "dummy_over_socket") + dummy_over_socket + ;; + "counter_over_socket") + counter_over_socket + ;; + "counter_over_grpc") + counter_over_grpc + ;; + *) + dummy_over_socket + counter_over_socket + counter_over_grpc +esac + diff --git a/types/events.go b/types/events.go index 3328911c..68313ff2 100644 --- a/types/events.go +++ b/types/events.go @@ -2,6 +2,7 @@ package types import ( // for registering TMEventData as events.EventData + . "github.com/tendermint/go-common" "github.com/tendermint/go-events" "github.com/tendermint/go-wire" ) @@ -14,6 +15,7 @@ func EventStringUnbond() string { return "Unbond" } func EventStringRebond() string { return "Rebond" } func EventStringDupeout() string { return "Dupeout" } func EventStringFork() string { return "Fork" } +func EventStringTx(tx Tx) string { return Fmt("Tx:%X", tx.Hash()) } func EventStringNewBlock() string { return "NewBlock" } func EventStringNewBlockHeader() string { return "NewBlockHeader" } diff --git a/types/tx.go b/types/tx.go index 60699d53..a67206d4 100644 --- a/types/tx.go +++ b/types/tx.go @@ -6,6 +6,14 @@ import ( type Tx []byte +// NOTE: this is the hash of the go-wire encoded Tx. +// Tx has no types at this level, so just length-prefixed. +// Alternatively, it may make sense to add types here and let +// []byte be type 0x1 so we can have versioned txs if need be in the future. +func (tx Tx) Hash() []byte { + return merkle.SimpleHashFromBinary(tx) +} + type Txs []Tx func (txs Txs) Hash() []byte { @@ -15,7 +23,7 @@ func (txs Txs) Hash() []byte { case 0: return nil case 1: - return merkle.SimpleHashFromBinary(txs[0]) + return txs[0].Hash() default: left := Txs(txs[:(len(txs)+1)/2]).Hash() right := Txs(txs[(len(txs)+1)/2:]).Hash()