diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 4543f22a..f7c7586e 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -221,7 +221,7 @@ FOR_LOOP: // We need both to sync the first block. break SYNC_LOOP } - firstParts := first.MakePartSet(bcR.config.GetInt("block_part_size")) + firstParts := first.MakePartSet(bcR.config.GetInt("block_part_size")) // TODO: put part size in parts header? firstPartsHeader := firstParts.Header() // Finally, verify the first block using the second's commit // NOTE: we can probably make this more efficient, but note that calling @@ -236,6 +236,7 @@ FOR_LOOP: } else { bcR.pool.PopRequest() // TODO: use ApplyBlock instead of Exec/Commit/SetAppHash/Save + // TODO: should we be firing events? need to fire NewBlock events manually ... err := bcR.state.ExecBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader) if err != nil { // TODO This is bad, are we zombie? diff --git a/blockchain/store.go b/blockchain/store.go index 565e131a..3b422642 100644 --- a/blockchain/store.go +++ b/blockchain/store.go @@ -163,6 +163,7 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s bs.db.Set(calcBlockCommitKey(height-1), blockCommitBytes) // Save seen commit (seen +2/3 precommits for block) + // NOTE: we can delete this at a later height seenCommitBytes := wire.BinaryBytes(seenCommit) bs.db.Set(calcSeenCommitKey(height), seenCommitBytes) diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go new file mode 100644 index 00000000..f049fa3e --- /dev/null +++ b/consensus/byzantine_test.go @@ -0,0 +1,278 @@ +package consensus + +import ( + "sync" + "testing" + "time" + + "github.com/tendermint/tendermint/config/tendermint_test" + + . "github.com/tendermint/go-common" + cfg "github.com/tendermint/go-config" + "github.com/tendermint/go-crypto" + "github.com/tendermint/go-events" + "github.com/tendermint/go-p2p" + "github.com/tendermint/tendermint/types" +) + +func init() { + config = tendermint_test.ResetConfig("consensus_byzantine_test") +} + +//---------------------------------------------- +// byzantine failures + +// 4 validators. 1 is byzantine. The other three are partitioned into A (1 val) and B (2 vals). +// byzantine validator sends conflicting proposals into A and B, +// and prevotes/precommits on both of them. +// B sees a commit, A doesn't. +// Byzantine validator refuses to prevote. +// Heal partition and ensure A sees the commit +func TestByzantine(t *testing.T) { + resetConfigTimeouts() + N := 4 + css := randConsensusNet(N) + + switches := make([]*p2p.Switch, N) + for i := 0; i < N; i++ { + switches[i] = p2p.NewSwitch(cfg.NewMapConfig(nil)) + } + + reactors := make([]p2p.Reactor, N) + eventChans := make([]chan interface{}, N) + for i := 0; i < N; i++ { + if i == 0 { + css[i].privValidator = NewByzantinePrivValidator(css[i].privValidator.(*types.PrivValidator)) + // make byzantine + css[i].decideProposal = func(j int) func(int, int) { + return func(height, round int) { + byzantineDecideProposalFunc(height, round, css[j], switches[j]) + } + }(i) + css[i].doPrevote = func(height, round int) {} + } + + eventSwitch := events.NewEventSwitch() + _, err := eventSwitch.Start() + if err != nil { + t.Fatalf("Failed to start switch: %v", err) + } + eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1) + + conR := NewConsensusReactor(css[i], false) + conR.SetEventSwitch(eventSwitch) + + var conRI p2p.Reactor + conRI = conR + if i == 0 { + conRI = NewByzantineReactor(conR) + } + reactors[i] = conRI + } + + p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch { + // ignore new switch s, we already made ours + switches[i].AddReactor("CONSENSUS", reactors[i]) + return switches[i] + }, func(sws []*p2p.Switch, i, j int) { + // the network starts partitioned with globally active adversary + if i != 0 { + return + } + p2p.Connect2Switches(sws, i, j) + }) + + // byz proposer sends one block to peers[0] + // and the other block to peers[1] and peers[2]. + // note peers and switches order don't match. + peers := switches[0].Peers().List() + ind0 := getSwitchIndex(switches, peers[0]) + ind1 := getSwitchIndex(switches, peers[1]) + ind2 := getSwitchIndex(switches, peers[2]) + + // connect the 2 peers in the larger partition + p2p.Connect2Switches(switches, ind1, ind2) + + // wait for someone in the big partition to make a block + + select { + case <-eventChans[ind2]: + } + + log.Notice("A block has been committed. Healing partition") + + // connect the partitions + p2p.Connect2Switches(switches, ind0, ind1) + p2p.Connect2Switches(switches, ind0, ind2) + + // wait till everyone makes the first new block + // (one of them already has) + wg := new(sync.WaitGroup) + wg.Add(2) + for i := 1; i < N-1; i++ { + go func(j int) { + <-eventChans[j] + wg.Done() + }(i) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + tick := time.NewTicker(time.Second * 10) + select { + case <-done: + case <-tick.C: + for i, reactor := range reactors { + t.Log(Fmt("Consensus Reactor %v", i)) + t.Log(Fmt("%v", reactor)) + } + t.Fatalf("Timed out waiting for all validators to commit first block") + } +} + +//------------------------------- +// byzantine consensus functions + +func byzantineDecideProposalFunc(height, round int, cs *ConsensusState, sw *p2p.Switch) { + // byzantine user should create two proposals and try to split the vote. + // Avoid sending on internalMsgQueue and running consensus state. + + // Create a new proposal block from state/txs from the mempool. + block1, blockParts1 := cs.createProposalBlock() + polRound, polBlockID := cs.Votes.POLInfo() + proposal1 := types.NewProposal(height, round, blockParts1.Header(), polRound, polBlockID) + cs.privValidator.SignProposal(cs.state.ChainID, proposal1) // byzantine doesnt err + + // Create a new proposal block from state/txs from the mempool. + block2, blockParts2 := cs.createProposalBlock() + polRound, polBlockID = cs.Votes.POLInfo() + proposal2 := types.NewProposal(height, round, blockParts2.Header(), polRound, polBlockID) + cs.privValidator.SignProposal(cs.state.ChainID, proposal2) // byzantine doesnt err + + block1Hash := block1.Hash() + block2Hash := block2.Hash() + + // broadcast conflicting proposals/block parts to peers + peers := sw.Peers().List() + log.Notice("Byzantine: broadcasting conflicting proposals", "peers", len(peers)) + for i, peer := range peers { + if i < len(peers)/2 { + go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1) + } else { + go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2) + } + } +} + +func sendProposalAndParts(height, round int, cs *ConsensusState, peer *p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) { + // proposal + msg := &ProposalMessage{Proposal: proposal} + peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) + + // parts + for i := 0; i < parts.Total(); i++ { + part := parts.GetPart(i) + msg := &BlockPartMessage{ + Height: height, // This tells peer that this part applies to us. + Round: round, // This tells peer that this part applies to us. + Part: part, + } + peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) + } + + // votes + cs.mtx.Lock() + prevote, _ := cs.signVote(types.VoteTypePrevote, blockHash, parts.Header()) + precommit, _ := cs.signVote(types.VoteTypePrecommit, blockHash, parts.Header()) + cs.mtx.Unlock() + + peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{prevote}}) + peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{precommit}}) +} + +//---------------------------------------- +// byzantine consensus reactor + +type ByzantineReactor struct { + Service + reactor *ConsensusReactor +} + +func NewByzantineReactor(conR *ConsensusReactor) *ByzantineReactor { + return &ByzantineReactor{ + Service: conR, + reactor: conR, + } +} + +func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) } +func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() } +func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) { + if !br.reactor.IsRunning() { + return + } + + // Create peerState for peer + peerState := NewPeerState(peer) + peer.Data.Set(types.PeerStateKey, peerState) + + // Send our state to peer. + // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). + if !br.reactor.fastSync { + br.reactor.sendNewRoundStepMessage(peer) + } +} +func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { + br.reactor.RemovePeer(peer, reason) +} +func (br *ByzantineReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) { + br.reactor.Receive(chID, peer, msgBytes) +} + +//---------------------------------------- +// byzantine privValidator + +type ByzantinePrivValidator struct { + Address []byte `json:"address"` + types.Signer `json:"-"` + + mtx sync.Mutex +} + +// Return a priv validator that will sign anything +func NewByzantinePrivValidator(pv *types.PrivValidator) *ByzantinePrivValidator { + return &ByzantinePrivValidator{ + Address: pv.Address, + Signer: pv.Signer, + } +} + +func (privVal *ByzantinePrivValidator) GetAddress() []byte { + return privVal.Address +} + +func (privVal *ByzantinePrivValidator) SignVote(chainID string, vote *types.Vote) error { + privVal.mtx.Lock() + defer privVal.mtx.Unlock() + + // Sign + vote.Signature = privVal.Sign(types.SignBytes(chainID, vote)).(crypto.SignatureEd25519) + return nil +} + +func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *types.Proposal) error { + privVal.mtx.Lock() + defer privVal.mtx.Unlock() + + // Sign + proposal.Signature = privVal.Sign(types.SignBytes(chainID, proposal)).(crypto.SignatureEd25519) + return nil +} + +func (privVal *ByzantinePrivValidator) String() string { + return Fmt("PrivValidator{%X}", privVal.Address) +} diff --git a/consensus/common.go b/consensus/common.go index 02b2c4a4..1f78c585 100644 --- a/consensus/common.go +++ b/consensus/common.go @@ -4,7 +4,7 @@ import ( "github.com/tendermint/tendermint/types" ) -// NOTE: this is blocking +// NOTE: if chanCap=0, this blocks on the event being consumed func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap int) chan interface{} { // listen for event ch := make(chan interface{}, chanCap) @@ -13,3 +13,14 @@ func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap }) return ch } + +// NOTE: this blocks on receiving a response after the event is consumed +func subscribeToEventRespond(evsw types.EventSwitch, receiver, eventID string) chan interface{} { + // listen for event + ch := make(chan interface{}) + types.AddListenerForEvent(evsw, receiver, eventID, func(data types.TMEventData) { + ch <- data + <-ch + }) + return ch +} diff --git a/consensus/common_test.go b/consensus/common_test.go index 297b842e..7f15ab5f 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -3,6 +3,7 @@ package consensus import ( "bytes" "fmt" + "io/ioutil" "sort" "sync" "testing" @@ -11,6 +12,7 @@ import ( . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" dbm "github.com/tendermint/go-db" + "github.com/tendermint/go-p2p" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/config/tendermint_test" mempl "github.com/tendermint/tendermint/mempool" @@ -33,6 +35,8 @@ type validatorStub struct { *types.PrivValidator } +var testMinPower = 10 + func NewValidatorStub(privValidator *types.PrivValidator, valIndex int) *validatorStub { return &validatorStub{ Index: valIndex, @@ -266,6 +270,31 @@ func randConsensusNet(nValidators int) []*ConsensusState { return css } +// nPeers = nValidators + nNotValidator +func randConsensusNetWithPeers(nValidators int, nPeers int) []*ConsensusState { + genDoc, privVals := randGenesisDoc(nValidators, false, int64(testMinPower)) + css := make([]*ConsensusState, nPeers) + for i := 0; i < nPeers; i++ { + db := dbm.NewMemDB() // each state needs its own db + state := sm.MakeGenesisState(db, genDoc) + state.Save() + thisConfig := tendermint_test.ResetConfig(Fmt("consensus_reactor_test_%d", i)) + EnsureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal + var privVal *types.PrivValidator + if i < nValidators { + privVal = privVals[i] + } else { + privVal = types.GenPrivValidator() + _, tempFilePath := Tempfile("priv_validator_") + privVal.SetFile(tempFilePath) + } + + dir, _ := ioutil.TempDir("/tmp", "persistent-dummy") + css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, dummy.NewPersistentDummyApplication(dir)) + } + return css +} + func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { voteCh0 := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1) voteCh := make(chan interface{}) @@ -325,3 +354,16 @@ func startTestRound(cs *ConsensusState, height, round int) { cs.enterNewRound(height, round) cs.startRoutines(0) } + +//-------------------------------- +// reactor stuff + +func getSwitchIndex(switches []*p2p.Switch, peer *p2p.Peer) int { + for i, s := range switches { + if bytes.Equal(peer.NodeInfo.PubKey.Address(), s.NodeInfo().PubKey.Address()) { + return i + } + } + panic("didnt find peer in switches") + return -1 +} diff --git a/consensus/reactor.go b/consensus/reactor.go index 60d5a8f5..6190fc52 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -287,11 +287,6 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } } -// Sets our private validator account for signing votes. -func (conR *ConsensusReactor) SetPrivValidator(priv PrivValidator) { - conR.conS.SetPrivValidator(priv) -} - // implements events.Eventable func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { conR.evsw = evsw @@ -954,7 +949,7 @@ func (ps *PeerState) SetHasVote(vote *types.Vote) { func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) { log := log.New("peer", ps.Peer, "peerRound", ps.Round, "height", height, "round", round) - log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index) + log.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index) // NOTE: some may be nil BitArrays -> no side effects. ps.getVoteBitArray(height, round, type_).SetIndex(index, true) diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 6b76d2aa..504bd97f 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -1,20 +1,18 @@ package consensus import ( - "bytes" + "fmt" "sync" "testing" "time" "github.com/tendermint/tendermint/config/tendermint_test" - . "github.com/tendermint/go-common" - cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-crypto" "github.com/tendermint/go-events" "github.com/tendermint/go-logger" "github.com/tendermint/go-p2p" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmsp/example/dummy" ) func init() { @@ -22,7 +20,7 @@ func init() { } func resetConfigTimeouts() { - logger.SetLogLevel("notice") + logger.SetLogLevel("info") //config.Set("log_level", "notice") config.Set("timeout_propose", 2000) // config.Set("timeout_propose_delta", 500) @@ -30,9 +28,13 @@ func resetConfigTimeouts() { // config.Set("timeout_prevote_delta", 500) // config.Set("timeout_precommit", 1000) // config.Set("timeout_precommit_delta", 500) - // config.Set("timeout_commit", 1000) + config.Set("timeout_commit", 1000) } +//---------------------------------------------- +// in-process testnets + +// Ensure a testnet makes blocks func TestReactor(t *testing.T) { resetConfigTimeouts() N := 4 @@ -41,7 +43,6 @@ func TestReactor(t *testing.T) { eventChans := make([]chan interface{}, N) for i := 0; i < N; i++ { reactors[i] = NewConsensusReactor(css[i], false) - reactors[i].SetPrivValidator(css[i].privValidator) eventSwitch := events.NewEventSwitch() _, err := eventSwitch.Start() @@ -52,19 +53,120 @@ func TestReactor(t *testing.T) { reactors[i].SetEventSwitch(eventSwitch) eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1) } + // make connected switches and start all reactors p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch { s.AddReactor("CONSENSUS", reactors[i]) return s }, p2p.Connect2Switches) // wait till everyone makes the first new block + timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) { + <-eventChans[j] + wg.Done() + }) +} + +//------------------------------------------------------------- +// ensure we can make blocks despite cycling a validator set + +func TestValidatorSetChanges(t *testing.T) { + resetConfigTimeouts() + nPeers := 8 + nVals := 4 + css := randConsensusNetWithPeers(nVals, nPeers) + reactors := make([]*ConsensusReactor, nPeers) + eventChans := make([]chan interface{}, nPeers) + for i := 0; i < nPeers; i++ { + reactors[i] = NewConsensusReactor(css[i], false) + + eventSwitch := events.NewEventSwitch() + _, err := eventSwitch.Start() + if err != nil { + t.Fatalf("Failed to start switch: %v", err) + } + + reactors[i].SetEventSwitch(eventSwitch) + eventChans[i] = subscribeToEventRespond(eventSwitch, "tester", types.EventStringNewBlock()) + } + p2p.MakeConnectedSwitches(nPeers, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("CONSENSUS", reactors[i]) + return s + }, p2p.Connect2Switches) + + // map of active validators + activeVals := make(map[string]struct{}) + for i := 0; i < nVals; i++ { + activeVals[string(css[i].privValidator.GetAddress())] = struct{}{} + } + + // wait till everyone makes block 1 + timeoutWaitGroup(t, nPeers, func(wg *sync.WaitGroup, j int) { + <-eventChans[j] + eventChans[j] <- struct{}{} + wg.Done() + }) + + newValidatorPubKey := css[nVals].privValidator.(*types.PrivValidator).PubKey + newValidatorTx := dummy.MakeValSetChangeTx(newValidatorPubKey.Bytes(), uint64(testMinPower)) + + // wait till everyone makes block 2 + // ensure the commit includes all validators + // send newValTx to change vals in block 3 + waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx) + + // wait till everyone makes block 3. + // it includes the commit for block 2, which is by the original validator set + waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) + + // wait till everyone makes block 4. + // it includes the commit for block 3, which is by the original validator set + waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) + + // the commits for block 4 should be with the updated validator set + activeVals[string(newValidatorPubKey.Address())] = struct{}{} + + // wait till everyone makes block 5 + // it includes the commit for block 4, which should have the updated validator set + waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) + + // TODO: test more changes! +} + +func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) { + timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) { + newBlock := <-eventChans[j] + err := validateBlock(newBlock.(types.EventDataNewBlock).Block, activeVals) + if err != nil { + t.Fatal(err) + } + for _, tx := range txs { + css[j].mempool.CheckTx(tx, nil) + } + + eventChans[j] <- struct{}{} + wg.Done() + }) +} + +// expects high synchrony! +func validateBlock(block *types.Block, activeVals map[string]struct{}) error { + if block.LastCommit.Size() != len(activeVals) { + return fmt.Errorf("Commit size doesn't match number of active validators. Got %d, expected %d", block.LastCommit.Size(), len(activeVals)) + } + + for _, vote := range block.LastCommit.Precommits { + if _, ok := activeVals[string(vote.ValidatorAddress)]; !ok { + return fmt.Errorf("Found vote for unactive validator %X", vote.ValidatorAddress) + } + } + return nil +} + +func timeoutWaitGroup(t *testing.T, n int, f func(*sync.WaitGroup, int)) { wg := new(sync.WaitGroup) - wg.Add(N) - for i := 0; i < N; i++ { - go func(j int) { - <-eventChans[j] - wg.Done() - }(i) + wg.Add(n) + for i := 0; i < n; i++ { + go f(wg, i) } // Make wait into a channel @@ -78,274 +180,6 @@ func TestReactor(t *testing.T) { select { case <-done: case <-tick.C: - t.Fatalf("Timed out waiting for all validators to commit first block") + t.Fatalf("Timed out waiting for all validators to commit a block") } } - -// 4 validators. 1 is byzantine. The other three are partitioned into A (1 val) and B (2 vals). -// byzantine validator sends conflicting proposals into A and B, -// and prevotes/precommits on both of them. -// B sees a commit, A doesn't. -// Byzantine validator refuses to prevote. -// Heal partition and ensure A sees the commit -func TestByzantine(t *testing.T) { - resetConfigTimeouts() - N := 4 - css := randConsensusNet(N) - - switches := make([]*p2p.Switch, N) - for i := 0; i < N; i++ { - switches[i] = p2p.NewSwitch(cfg.NewMapConfig(nil)) - } - - reactors := make([]p2p.Reactor, N) - eventChans := make([]chan interface{}, N) - for i := 0; i < N; i++ { - var privVal PrivValidator - privVal = css[i].privValidator - if i == 0 { - privVal = NewByzantinePrivValidator(privVal.(*types.PrivValidator)) - // make byzantine - css[i].decideProposal = func(j int) func(int, int) { - return func(height, round int) { - byzantineDecideProposalFunc(height, round, css[j], switches[j]) - } - }(i) - css[i].doPrevote = func(height, round int) {} - } - - eventSwitch := events.NewEventSwitch() - _, err := eventSwitch.Start() - if err != nil { - t.Fatalf("Failed to start switch: %v", err) - } - eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1) - - conR := NewConsensusReactor(css[i], false) - conR.SetPrivValidator(privVal) - conR.SetEventSwitch(eventSwitch) - - var conRI p2p.Reactor - conRI = conR - if i == 0 { - conRI = NewByzantineReactor(conR) - } - reactors[i] = conRI - } - - p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch { - // ignore new switch s, we already made ours - switches[i].AddReactor("CONSENSUS", reactors[i]) - return switches[i] - }, func(sws []*p2p.Switch, i, j int) { - // the network starts partitioned with globally active adversary - if i != 0 { - return - } - p2p.Connect2Switches(sws, i, j) - }) - - // byz proposer sends one block to peers[0] - // and the other block to peers[1] and peers[2]. - // note peers and switches order don't match. - peers := switches[0].Peers().List() - ind0 := getSwitchIndex(switches, peers[0]) - ind1 := getSwitchIndex(switches, peers[1]) - ind2 := getSwitchIndex(switches, peers[2]) - - // connect the 2 peers in the larger partition - p2p.Connect2Switches(switches, ind1, ind2) - - // wait for someone in the big partition to make a block - - select { - case <-eventChans[ind2]: - } - - log.Notice("A block has been committed. Healing partition") - - // connect the partitions - p2p.Connect2Switches(switches, ind0, ind1) - p2p.Connect2Switches(switches, ind0, ind2) - - // wait till everyone makes the first new block - // (one of them already has) - wg := new(sync.WaitGroup) - wg.Add(2) - for i := 1; i < N-1; i++ { - go func(j int) { - <-eventChans[j] - wg.Done() - }(i) - } - - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - tick := time.NewTicker(time.Second * 10) - select { - case <-done: - case <-tick.C: - for i, reactor := range reactors { - t.Log(Fmt("Consensus Reactor %v", i)) - t.Log(Fmt("%v", reactor)) - } - t.Fatalf("Timed out waiting for all validators to commit first block") - } -} - -func getSwitchIndex(switches []*p2p.Switch, peer *p2p.Peer) int { - for i, s := range switches { - if bytes.Equal(peer.NodeInfo.PubKey.Address(), s.NodeInfo().PubKey.Address()) { - return i - } - } - panic("didnt find peer in switches") - return -1 -} - -//------------------------------- -// byzantine consensus functions - -func byzantineDecideProposalFunc(height, round int, cs *ConsensusState, sw *p2p.Switch) { - // byzantine user should create two proposals and try to split the vote. - // Avoid sending on internalMsgQueue and running consensus state. - - // Create a new proposal block from state/txs from the mempool. - block1, blockParts1 := cs.createProposalBlock() - polRound, polBlockID := cs.Votes.POLInfo() - proposal1 := types.NewProposal(height, round, blockParts1.Header(), polRound, polBlockID) - cs.privValidator.SignProposal(cs.state.ChainID, proposal1) // byzantine doesnt err - - // Create a new proposal block from state/txs from the mempool. - block2, blockParts2 := cs.createProposalBlock() - polRound, polBlockID = cs.Votes.POLInfo() - proposal2 := types.NewProposal(height, round, blockParts2.Header(), polRound, polBlockID) - cs.privValidator.SignProposal(cs.state.ChainID, proposal2) // byzantine doesnt err - - block1Hash := block1.Hash() - block2Hash := block2.Hash() - - // broadcast conflicting proposals/block parts to peers - peers := sw.Peers().List() - log.Notice("Byzantine: broadcasting conflicting proposals", "peers", len(peers)) - for i, peer := range peers { - if i < len(peers)/2 { - go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1) - } else { - go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2) - } - } -} - -func sendProposalAndParts(height, round int, cs *ConsensusState, peer *p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) { - // proposal - msg := &ProposalMessage{Proposal: proposal} - peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) - - // parts - for i := 0; i < parts.Total(); i++ { - part := parts.GetPart(i) - msg := &BlockPartMessage{ - Height: height, // This tells peer that this part applies to us. - Round: round, // This tells peer that this part applies to us. - Part: part, - } - peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) - } - - // votes - cs.mtx.Lock() - prevote, _ := cs.signVote(types.VoteTypePrevote, blockHash, parts.Header()) - precommit, _ := cs.signVote(types.VoteTypePrecommit, blockHash, parts.Header()) - cs.mtx.Unlock() - - peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{prevote}}) - peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{precommit}}) -} - -//---------------------------------------- -// byzantine consensus reactor - -type ByzantineReactor struct { - Service - reactor *ConsensusReactor -} - -func NewByzantineReactor(conR *ConsensusReactor) *ByzantineReactor { - return &ByzantineReactor{ - Service: conR, - reactor: conR, - } -} - -func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) } -func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() } -func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) { - if !br.reactor.IsRunning() { - return - } - - // Create peerState for peer - peerState := NewPeerState(peer) - peer.Data.Set(types.PeerStateKey, peerState) - - // Send our state to peer. - // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). - if !br.reactor.fastSync { - br.reactor.sendNewRoundStepMessage(peer) - } -} -func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { - br.reactor.RemovePeer(peer, reason) -} -func (br *ByzantineReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) { - br.reactor.Receive(chID, peer, msgBytes) -} - -//---------------------------------------- -// byzantine privValidator - -type ByzantinePrivValidator struct { - Address []byte `json:"address"` - types.Signer `json:"-"` - - mtx sync.Mutex -} - -// Return a priv validator that will sign anything -func NewByzantinePrivValidator(pv *types.PrivValidator) *ByzantinePrivValidator { - return &ByzantinePrivValidator{ - Address: pv.Address, - Signer: pv.Signer, - } -} - -func (privVal *ByzantinePrivValidator) GetAddress() []byte { - return privVal.Address -} - -func (privVal *ByzantinePrivValidator) SignVote(chainID string, vote *types.Vote) error { - privVal.mtx.Lock() - defer privVal.mtx.Unlock() - - // Sign - vote.Signature = privVal.Sign(types.SignBytes(chainID, vote)).(crypto.SignatureEd25519) - return nil -} - -func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *types.Proposal) error { - privVal.mtx.Lock() - defer privVal.mtx.Unlock() - - // Sign - proposal.Signature = privVal.Sign(types.SignBytes(chainID, proposal)).(crypto.SignatureEd25519) - return nil -} - -func (privVal *ByzantinePrivValidator) String() string { - return Fmt("PrivValidator{%X}", privVal.Address) -} diff --git a/consensus/state.go b/consensus/state.go index 85d4d167..bef286e1 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -221,11 +221,12 @@ type PrivValidator interface { type ConsensusState struct { BaseService - config cfg.Config - proxyAppConn proxy.AppConnConsensus - blockStore *bc.BlockStore - mempool *mempl.Mempool - privValidator PrivValidator + config cfg.Config + proxyAppConn proxy.AppConnConsensus + blockStore *bc.BlockStore + mempool *mempl.Mempool + + privValidator PrivValidator // for signing votes mtx sync.Mutex RoundState @@ -313,6 +314,7 @@ func (cs *ConsensusState) GetValidators() (int, []*types.Validator) { return cs.state.LastBlockHeight, cs.state.Validators.Copy().Validators } +// Sets our private validator account for signing votes. func (cs *ConsensusState) SetPrivValidator(priv PrivValidator) { cs.mtx.Lock() defer cs.mtx.Unlock() @@ -1253,6 +1255,8 @@ func (cs *ConsensusState) finalizeCommit(height int) { // Save to blockStore. if cs.blockStore.Height() < block.Height { + // NOTE: the seenCommit is local justification to commit this block, + // but may differ from the LastCommit included in the next block precommits := cs.Votes.Precommits(cs.CommitRound) seenCommit := precommits.MakeCommit() cs.blockStore.SaveBlock(block, blockParts, seenCommit) @@ -1498,7 +1502,6 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool, } func (cs *ConsensusState) signVote(type_ byte, hash []byte, header types.PartSetHeader) (*types.Vote, error) { - // TODO: store our index in the cs so we don't have to do this every time addr := cs.privValidator.GetAddress() valIndex, _ := cs.Validators.GetByAddress(addr) vote := &types.Vote{ @@ -1515,6 +1518,7 @@ func (cs *ConsensusState) signVote(type_ byte, hash []byte, header types.PartSet // sign the vote and publish on internalMsgQueue func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.PartSetHeader) *types.Vote { + // if we don't have a key or we're not in the validator set, do nothing if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.GetAddress()) { return nil } diff --git a/glide.lock b/glide.lock index cbaa5f88..68296b90 100644 --- a/glide.lock +++ b/glide.lock @@ -1,8 +1,8 @@ hash: 20cb38481a78b73ba3a42af08e34cd825ddb7c826833d67cc61e45c1b3a4c484 -updated: 2016-11-16T16:25:10.693961906-05:00 +updated: 2016-11-23T18:28:52.925584919-05:00 imports: - name: github.com/btcsuite/btcd - version: b134beb3b7809de6370a93cc5f6a684d6942e2e8 + version: afec1bd1245a4a19e6dfe1306974b733e7cbb9b8 subpackages: - btcec - name: github.com/btcsuite/fastsha256 @@ -18,7 +18,7 @@ imports: subpackages: - proto - name: github.com/golang/protobuf - version: 224aaba33b1ac32a92a165f27489409fb8133d08 + version: 8ee79997227bf9b34611aee7946ae64735e6fd93 subpackages: - proto - name: github.com/golang/snappy @@ -28,7 +28,7 @@ imports: - name: github.com/mattn/go-colorable version: d228849504861217f796da67fae4f6e347643f15 - name: github.com/mattn/go-isatty - version: 66b8e73f3f5cda9f96b69efd03dd3d7fc4a5cdb8 + version: 30a891c33c7cde7b02a981314b4228ec99380cca - name: github.com/spf13/pflag version: 5ccb023bc27df288a957c5e994cd44fd19619465 - name: github.com/syndtr/goleveldb @@ -58,7 +58,7 @@ imports: - name: github.com/tendermint/go-clist version: 3baa390bbaf7634251c42ad69a8682e7e3990552 - name: github.com/tendermint/go-common - version: fa3daa7abc253264c916c12fecce3effa01a1287 + version: 6b4160f2a57487f277c42bf06fd280195dfdb278 subpackages: - test - name: github.com/tendermint/go-config @@ -76,9 +76,9 @@ imports: - name: github.com/tendermint/go-logger version: cefb3a45c0bf3c493a04e9bcd9b1540528be59f2 - name: github.com/tendermint/go-merkle - version: 05042c6ab9cad51d12e4cecf717ae68e3b1409a8 + version: bfc4afe28c7a50045d4d1eb043e67460f8a51a4f - name: github.com/tendermint/go-p2p - version: 62b37014a89b5eddff74844846979d30911cffda + version: f173a17ed3e9b341d480b36e5041819c8a5b8350 subpackages: - upnp - name: github.com/tendermint/go-rpc @@ -94,7 +94,7 @@ imports: subpackages: - term - name: github.com/tendermint/tmsp - version: 0bdb3b887e70b1ef16d32eece0248ec071fd8490 + version: 3742e35e6db5dcd7596aac9f661b46f5699ebec8 subpackages: - client - example/counter @@ -103,7 +103,7 @@ imports: - server - types - name: golang.org/x/crypto - version: 9477e0b78b9ac3d0b03822fd95422e2fe07627cd + version: ede567c8e044a5913dad1d1af3696d9da953104c subpackages: - curve25519 - nacl/box @@ -124,11 +124,11 @@ imports: - lex/httplex - trace - name: golang.org/x/sys - version: b699b7032584f0953262cb2788a0ca19bb494703 + version: 30237cf4eefd639b184d1f2cb77a581ea0be8947 subpackages: - unix - name: google.golang.org/grpc - version: 941cc894cea3c87a12943fd12b594964541b6d28 + version: eca2ad68af4d7bf894ada6bd263133f069a441d5 subpackages: - codes - credentials diff --git a/node/node.go b/node/node.go index f4a04d82..dcde7fac 100644 --- a/node/node.go +++ b/node/node.go @@ -101,10 +101,10 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato // Make ConsensusReactor consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool) - consensusReactor := consensus.NewConsensusReactor(consensusState, fastSync) if privValidator != nil { - consensusReactor.SetPrivValidator(privValidator) + consensusState.SetPrivValidator(privValidator) } + consensusReactor := consensus.NewConsensusReactor(consensusState, fastSync) // Make p2p network switch sw := p2p.NewSwitch(config.GetConfig("p2p")) diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index ddc84ef0..728e87bd 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -5,13 +5,14 @@ import ( crand "crypto/rand" "fmt" "math/rand" - "strings" "testing" "time" . "github.com/tendermint/go-common" + "github.com/tendermint/go-wire" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmsp/example/dummy" tmsp "github.com/tendermint/tmsp/types" ) @@ -156,9 +157,14 @@ func testTMSPQuery(t *testing.T, statusI interface{}, value []byte) { if query.Result.IsErr() { panic(Fmt("Query returned an err: %v", query)) } + + qResult := new(dummy.QueryResult) + if err := wire.ReadJSONBytes(query.Result.Data, qResult); err != nil { + t.Fatal(err) + } // XXX: specific to value returned by the dummy - if !strings.Contains(string(query.Result.Data), "exists=true") { - panic(Fmt("Query error. Expected to find 'exists=true'. Got: %s", query.Result.Data)) + if qResult.Exists != true { + panic(Fmt("Query error. Expected to find 'exists=true'. Got: %v", qResult)) } } diff --git a/state/execution.go b/state/execution.go index c52be41d..07466c77 100644 --- a/state/execution.go +++ b/state/execution.go @@ -8,6 +8,7 @@ import ( . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" + "github.com/tendermint/go-crypto" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" tmsp "github.com/tendermint/tmsp/types" @@ -21,26 +22,33 @@ import ( func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error { // Validate the block. - err := s.validateBlock(block) - if err != nil { + if err := s.validateBlock(block); err != nil { return ErrInvalidBlock(err) } - // Update the validator set + // compute bitarray of validators that signed + signed := commitBitArrayFromBlock(block) + _ = signed // TODO send on begin block + + // copy the valset valSet := s.Validators.Copy() - // Update valSet with signatures from block. - updateValidatorsWithBlock(s.LastValidators, valSet, block) - // TODO: Update the validator set (e.g. block.Data.ValidatorUpdates?) nextValSet := valSet.Copy() // Execute the block txs - err = s.execBlockOnProxyApp(eventCache, proxyAppConn, block) + changedValidators, err := execBlockOnProxyApp(eventCache, proxyAppConn, block) if err != nil { // There was some error in proxyApp // TODO Report error and wait for proxyApp to be available. return ErrProxyAppConn(err) } + // update the validator set + err = updateValidators(nextValSet, changedValidators) + if err != nil { + log.Warn("Error changing validator set", "error", err) + // TODO: err or carry on? + } + // All good! // Update validator accums and set state variables nextValSet.IncrementAccum(1) @@ -54,8 +62,9 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC } // Executes block's transactions on proxyAppConn. +// Returns a list of updates to the validator set // TODO: Generate a bitmap or otherwise store tx validity in state. -func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) error { +func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) ([]*tmsp.Validator, error) { var validTxs, invalidTxs = 0, 0 @@ -94,7 +103,7 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox err := proxyAppConn.BeginBlockSync(block.Hash(), types.TM2PB.Header(block.Header)) if err != nil { log.Warn("Error in proxyAppConn.BeginBlock", "error", err) - return err + return nil, err } fail.Fail() // XXX @@ -104,7 +113,7 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox fail.FailRand(len(block.Txs)) // XXX proxyAppConn.AppendTxAsync(tx) if err := proxyAppConn.Error(); err != nil { - return err + return nil, err } } @@ -114,44 +123,69 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox changedValidators, err := proxyAppConn.EndBlockSync(uint64(block.Height)) if err != nil { log.Warn("Error in proxyAppConn.EndBlock", "error", err) - return err + return nil, err } fail.Fail() // XXX - // TODO: Do something with changedValidators - log.Debug("TODO: Do something with changedValidators", "changedValidators", changedValidators) + log.Info("Executed block", "height", block.Height, "valid txs", validTxs, "invalid txs", invalidTxs) + if len(changedValidators) > 0 { + log.Info("Update to validator set", "updates", tmsp.ValidatorsString(changedValidators)) + } + return changedValidators, nil +} - log.Info(Fmt("ExecBlock got %v valid txs and %v invalid txs", validTxs, invalidTxs)) +func updateValidators(validators *types.ValidatorSet, changedValidators []*tmsp.Validator) error { + // TODO: prevent change of 1/3+ at once + + for _, v := range changedValidators { + pubkey, err := crypto.PubKeyFromBytes(v.PubKey) // NOTE: expects go-wire encoded pubkey + if err != nil { + return err + } + + address := pubkey.Address() + power := int64(v.Power) + // mind the overflow from uint64 + if power < 0 { + return errors.New(Fmt("Power (%d) overflows int64", v.Power)) + } + + _, val := validators.GetByAddress(address) + if val == nil { + // add val + added := validators.Add(types.NewValidator(pubkey, power)) + if !added { + return errors.New(Fmt("Failed to add new validator %X with voting power %d", address, power)) + } + } else if v.Power == 0 { + // remove val + _, removed := validators.Remove(address) + if !removed { + return errors.New(Fmt("Failed to remove validator %X)")) + } + } else { + // update val + val.VotingPower = power + updated := validators.Update(val) + if !updated { + return errors.New(Fmt("Failed to update validator %X with voting power %d", address, power)) + } + } + } return nil } -// Updates the LastCommitHeight of the validators in valSet, in place. -// Assumes that lastValSet matches the valset of block.LastCommit -// CONTRACT: lastValSet is not mutated. -func updateValidatorsWithBlock(lastValSet *types.ValidatorSet, valSet *types.ValidatorSet, block *types.Block) { - +// return a bit array of validators that signed the last commit +// NOTE: assumes commits have already been authenticated +func commitBitArrayFromBlock(block *types.Block) *BitArray { + signed := NewBitArray(len(block.LastCommit.Precommits)) for i, precommit := range block.LastCommit.Precommits { - if precommit == nil { - continue - } - _, val := lastValSet.GetByIndex(i) - if val == nil { - PanicCrisis(Fmt("Failed to fetch validator at index %v", i)) - } - if _, val_ := valSet.GetByAddress(val.Address); val_ != nil { - val_.LastCommitHeight = block.Height - 1 - updated := valSet.Update(val_) - if !updated { - PanicCrisis("Failed to update validator LastCommitHeight") - } - } else { - // XXX This is not an error if validator was removed. - // But, we don't mutate validators yet so go ahead and panic. - PanicCrisis("Could not find validator") + if precommit != nil { + signed.SetIndex(i, true) // val_.LastCommitHeight = block.Height - 1 } } - + return signed } //----------------------------------------------------- @@ -259,6 +293,7 @@ func (m mockMempool) Update(height int, txs []types.Tx) {} type BlockStore interface { Height() int LoadBlock(height int) *types.Block + LoadBlockMeta(height int) *types.BlockMeta } type Handshaker struct { @@ -273,8 +308,7 @@ func NewHandshaker(config cfg.Config, state *State, store BlockStore) *Handshake return &Handshaker{config, state, store, 0} } -// TODO: retry the handshake once if it fails the first time -// ... let Info take an argument determining its behaviour +// TODO: retry the handshake/replay if it fails ? func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { // handshake is done via info request on the query conn res, tmspInfo, blockInfo, configInfo := proxyApp.Query().InfoSync() @@ -287,10 +321,9 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { return nil } - log.Notice("TMSP Handshake", "height", blockInfo.BlockHeight, "block_hash", blockInfo.BlockHash, "app_hash", blockInfo.AppHash) + log.Notice("TMSP Handshake", "height", blockInfo.BlockHeight, "app_hash", blockInfo.AppHash) - blockHeight := int(blockInfo.BlockHeight) // safe, should be an int32 - blockHash := blockInfo.BlockHash + blockHeight := int(blockInfo.BlockHeight) // XXX: beware overflow appHash := blockInfo.AppHash if tmspInfo != nil { @@ -298,40 +331,13 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { _ = tmspInfo } - // last block (nil if we starting from 0) - var header *types.Header - var partsHeader types.PartSetHeader - - // replay all blocks after blockHeight - // if blockHeight == 0, we will replay everything - if blockHeight != 0 { - block := h.store.LoadBlock(blockHeight) - if block == nil { - return ErrUnknownBlock{blockHeight} - } - - // check block hash - if !bytes.Equal(block.Hash(), blockHash) { - return ErrBlockHashMismatch{block.Hash(), blockHash, blockHeight} - } - - // NOTE: app hash should be in the next block ... - // check app hash - /*if !bytes.Equal(block.Header.AppHash, appHash) { - return fmt.Errorf("Handshake error. App hash at height %d does not match. Got %X, expected %X", blockHeight, appHash, block.Header.AppHash) - }*/ - - header = block.Header - partsHeader = block.MakePartSet(h.config.GetInt("block_part_size")).Header() - } - if configInfo != nil { // TODO: set config info _ = configInfo } // replay blocks up to the latest in the blockstore - err := h.ReplayBlocks(appHash, header, partsHeader, proxyApp.Consensus()) + err := h.ReplayBlocks(appHash, blockHeight, proxyApp.Consensus()) if err != nil { return errors.New(Fmt("Error on replay: %v", err)) } @@ -342,97 +348,72 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { } // Replay all blocks after blockHeight and ensure the result matches the current state. -func (h *Handshaker) ReplayBlocks(appHash []byte, header *types.Header, partsHeader types.PartSetHeader, - appConnConsensus proxy.AppConnConsensus) error { +func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, appConnConsensus proxy.AppConnConsensus) error { - // NOTE/TODO: tendermint may crash after the app commits - // but before it can save the new state root. - // it should save all eg. valset changes before calling Commit. - // then, if tm state is behind app state, the only thing missing can be app hash - - // get a fresh state and reset to the apps latest - stateCopy := h.state.Copy() - - // TODO: put validators in iavl tree so we can set the state with an older validator set - lastVals, nextVals := stateCopy.GetValidators() - if header == nil { - stateCopy.LastBlockHeight = 0 - stateCopy.LastBlockID = types.BlockID{} - // stateCopy.LastBlockTime = ... doesnt matter - stateCopy.Validators = nextVals - stateCopy.LastValidators = lastVals - } else { - stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals) - } - stateCopy.Stale = false - stateCopy.AppHash = appHash - - appBlockHeight := stateCopy.LastBlockHeight - coreBlockHeight := h.store.Height() - if coreBlockHeight < appBlockHeight { + storeBlockHeight := h.store.Height() + if storeBlockHeight < appBlockHeight { // if the app is ahead, there's nothing we can do - return ErrAppBlockHeightTooHigh{coreBlockHeight, appBlockHeight} + return ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight} - } else if coreBlockHeight == appBlockHeight { + } else if storeBlockHeight == appBlockHeight { // if we crashed between Commit and SaveState, - // the state's app hash is stale. + // the state's app hash is stale // otherwise we're synced - if h.state.Stale { - h.state.Stale = false + if h.state.AppHashIsStale { + h.state.AppHashIsStale = false h.state.AppHash = appHash } - return checkState(h.state, stateCopy) + return nil } else if h.state.LastBlockHeight == appBlockHeight { - // core is ahead of app but core's state height is at apps height + // store is ahead of app but core's state height is at apps height // this happens if we crashed after saving the block, // but before committing it. We should be 1 ahead - if coreBlockHeight != appBlockHeight+1 { - PanicSanity(Fmt("core.state.height == app.height but core.height (%d) > app.height+1 (%d)", coreBlockHeight, appBlockHeight+1)) + if storeBlockHeight != appBlockHeight+1 { + PanicSanity(Fmt("core.state.height == app.height but store.height (%d) > app.height+1 (%d)", storeBlockHeight, appBlockHeight+1)) } // check that the blocks last apphash is the states apphash - block := h.store.LoadBlock(coreBlockHeight) + block := h.store.LoadBlock(storeBlockHeight) if !bytes.Equal(block.Header.AppHash, appHash) { - return ErrLastStateMismatch{coreBlockHeight, block.Header.AppHash, appHash} + return ErrLastStateMismatch{storeBlockHeight, block.Header.AppHash, appHash} } - // replay the block against the actual tendermint state (not the copy) - return h.loadApplyBlock(coreBlockHeight, h.state, appConnConsensus) + blockMeta := h.store.LoadBlockMeta(storeBlockHeight) + + h.nBlocks += 1 + var eventCache types.Fireable // nil + + // replay the block against the actual tendermint state + return h.state.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader, mockMempool{}) } else { // either we're caught up or there's blocks to replay // replay all blocks starting with appBlockHeight+1 - for i := appBlockHeight + 1; i <= coreBlockHeight; i++ { - h.loadApplyBlock(i, stateCopy, appConnConsensus) + var eventCache types.Fireable // nil + var appHash []byte + for i := appBlockHeight + 1; i <= storeBlockHeight; i++ { + h.nBlocks += 1 + block := h.store.LoadBlock(i) + _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block) + if err != nil { + log.Warn("Error executing block on proxy app", "height", i, "err", err) + return err + } + // Commit block, get hash back + res := appConnConsensus.CommitSync() + if res.IsErr() { + log.Warn("Error in proxyAppConn.CommitSync", "error", res) + return res + } + if res.Log != "" { + log.Info("Commit.Log: " + res.Log) + } + appHash = res.Data } - return checkState(h.state, stateCopy) - } -} - -func checkState(s, stateCopy *State) error { - // The computed state and the previously set state should be identical - if !s.Equals(stateCopy) { - return ErrStateMismatch{stateCopy, s} - } - return nil -} - -func (h *Handshaker) loadApplyBlock(blockIndex int, state *State, appConnConsensus proxy.AppConnConsensus) error { - h.nBlocks += 1 - block := h.store.LoadBlock(blockIndex) - panicOnNilBlock(blockIndex, h.store.Height(), block) // XXX - var eventCache types.Fireable // nil - return state.ApplyBlock(eventCache, appConnConsensus, block, block.MakePartSet(h.config.GetInt("block_part_size")).Header(), mockMempool{}) -} - -func panicOnNilBlock(height, bsHeight int, block *types.Block) { - if block == nil { - // Sanity? - PanicCrisis(Fmt(` -block is nil for height <= blockStore.Height() (%d <= %d). -Block: %v, -`, height, bsHeight, block)) - + if !bytes.Equal(h.state.AppHash, appHash) { + return errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay", "expected", h.state.AppHash, "got", appHash)) + } + return nil } } diff --git a/state/execution_test.go b/state/execution_test.go index dabcada6..e0527a42 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -8,6 +8,7 @@ import ( "github.com/tendermint/tendermint/config/tendermint_test" // . "github.com/tendermint/go-common" + cfg "github.com/tendermint/go-config" "github.com/tendermint/go-crypto" dbm "github.com/tendermint/go-db" "github.com/tendermint/tendermint/proxy" @@ -23,10 +24,16 @@ var ( testPartSize = 65536 ) +//--------------------------------------- +// Test block execution + func TestExecBlock(t *testing.T) { // TODO } +//--------------------------------------- +// Test handshake/replay + // Sync from scratch func TestHandshakeReplayAll(t *testing.T) { testHandshakeReplay(t, 0) @@ -51,7 +58,7 @@ func TestHandshakeReplayNone(t *testing.T) { func testHandshakeReplay(t *testing.T, n int) { config := tendermint_test.ResetConfig("proxy_test_") - state, store := stateAndStore() + state, store := stateAndStore(config) clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "1"))) clientCreator2 := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "2"))) proxyApp := proxy.NewAppConns(config, clientCreator, NewHandshaker(config, state, store)) @@ -69,7 +76,7 @@ func testHandshakeReplay(t *testing.T, n int) { if _, err := proxyApp.Start(); err != nil { t.Fatalf("Error starting proxy app connections: %v", err) } - state2, _ := stateAndStore() + state2, _ := stateAndStore(config) for i := 0; i < n; i++ { block := chain[i] err := state2.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool) @@ -105,6 +112,7 @@ func testHandshakeReplay(t *testing.T, n int) { } //-------------------------- +// utils for making blocks // make some bogus txs func txsFunc(blockNum int) (txs []types.Tx) { @@ -167,7 +175,7 @@ func makeBlockchain(t *testing.T, proxyApp proxy.AppConns, state *State) (blockc } // fresh state and mock store -func stateAndStore() (*State, *mockBlockStore) { +func stateAndStore(config cfg.Config) (*State, *mockBlockStore) { stateDB := dbm.NewMemDB() return MakeGenesisState(stateDB, &types.GenesisDoc{ ChainID: chainID, @@ -175,19 +183,28 @@ func stateAndStore() (*State, *mockBlockStore) { types.GenesisValidator{privKey.PubKey(), 10000, "test"}, }, AppHash: nil, - }), NewMockBlockStore(nil) + }), NewMockBlockStore(config, nil) } //---------------------------------- // mock block store type mockBlockStore struct { - chain []*types.Block + config cfg.Config + chain []*types.Block } -func NewMockBlockStore(chain []*types.Block) *mockBlockStore { - return &mockBlockStore{chain} +func NewMockBlockStore(config cfg.Config, chain []*types.Block) *mockBlockStore { + return &mockBlockStore{config, chain} } func (bs *mockBlockStore) Height() int { return len(bs.chain) } func (bs *mockBlockStore) LoadBlock(height int) *types.Block { return bs.chain[height-1] } +func (bs *mockBlockStore) LoadBlockMeta(height int) *types.BlockMeta { + block := bs.chain[height-1] + return &types.BlockMeta{ + Hash: block.Hash(), + Header: block.Header, + PartsHeader: block.MakePartSet(bs.config.GetInt("block_part_size")).Header(), + } +} diff --git a/state/state.go b/state/state.go index 4a54dfe6..f8b5dfc2 100644 --- a/state/state.go +++ b/state/state.go @@ -34,12 +34,12 @@ type State struct { LastBlockID types.BlockID LastBlockTime time.Time Validators *types.ValidatorSet - LastValidators *types.ValidatorSet + LastValidators *types.ValidatorSet // block.LastCommit validated against this // AppHash is updated after Commit; // it's stale after ExecBlock and before Commit - Stale bool - AppHash []byte + AppHashIsStale bool + AppHash []byte } func LoadState(db dbm.DB) *State { @@ -60,6 +60,9 @@ func LoadState(db dbm.DB) *State { } func (s *State) Copy() *State { + if s.AppHashIsStale { + PanicSanity(Fmt("App hash is stale: %v", s)) + } return &State{ db: s.db, GenesisDoc: s.GenesisDoc, @@ -69,7 +72,7 @@ func (s *State) Copy() *State { LastBlockTime: s.LastBlockTime, Validators: s.Validators.Copy(), LastValidators: s.LastValidators.Copy(), - Stale: s.Stale, // XXX: but really state shouldnt be copied while its stale + AppHashIsStale: false, AppHash: s.AppHash, } } @@ -94,7 +97,7 @@ func (s *State) Bytes() []byte { } // Mutate state variables to match block and validators -// Since we don't have the new AppHash yet, we set s.Stale=true +// Since we don't have the new AppHash yet, we set s.AppHashIsStale=true func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) { s.LastBlockHeight = header.Height s.LastBlockID = types.BlockID{header.Hash(), blockPartsHeader} @@ -102,7 +105,7 @@ func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader typ s.Validators = nextValSet s.LastValidators = prevValSet - s.Stale = true + s.AppHashIsStale = true } func (s *State) GetValidators() (*types.ValidatorSet, *types.ValidatorSet) { diff --git a/test/app/dummy_test.sh b/test/app/dummy_test.sh index bb17c6c3..c0907bd4 100644 --- a/test/app/dummy_test.sh +++ b/test/app/dummy_test.sh @@ -26,7 +26,7 @@ echo "" RESPONSE=`tmsp-cli query $KEY` set +e -A=`echo $RESPONSE | grep exists=true` +A=`echo $RESPONSE | grep '"exists":true'` if [[ $? != 0 ]]; then echo "Failed to find 'exists=true' for $KEY. Response:" echo "$RESPONSE" @@ -37,7 +37,7 @@ set -e # we should not be able to look up the value RESPONSE=`tmsp-cli query $VALUE` set +e -A=`echo $RESPONSE | grep exists=true` +A=`echo $RESPONSE | grep '"exists":true'` if [[ $? == 0 ]]; then echo "Found 'exists=true' for $VALUE when we should not have. Response:" echo "$RESPONSE" @@ -54,7 +54,7 @@ RESPONSE=`curl -s 127.0.0.1:46657/tmsp_query?query=\"$(toHex $KEY)\"` RESPONSE=`echo $RESPONSE | jq .result[1].result.Data | xxd -r -p` set +e -A=`echo $RESPONSE | grep exists=true` +A=`echo $RESPONSE | grep '"exists":true'` if [[ $? != 0 ]]; then echo "Failed to find 'exists=true' for $KEY. Response:" echo "$RESPONSE" @@ -66,7 +66,7 @@ set -e RESPONSE=`curl -s 127.0.0.1:46657/tmsp_query?query=\"$(toHex $VALUE)\"` RESPONSE=`echo $RESPONSE | jq .result[1].result.Data | xxd -r -p` set +e -A=`echo $RESPONSE | grep exists=true` +A=`echo $RESPONSE | grep '"exists":true'` if [[ $? == 0 ]]; then echo "Found 'exists=true' for $VALUE when we should not have. Response:" echo "$RESPONSE" diff --git a/types/block.go b/types/block.go index 80a59f8a..e9574adf 100644 --- a/types/block.go +++ b/types/block.go @@ -21,6 +21,7 @@ type Block struct { LastCommit *Commit `json:"last_commit"` } +// TODO: version func MakeBlock(height int, chainID string, txs []Tx, commit *Commit, prevBlockID BlockID, valHash, appHash []byte, partSize int) (*Block, *PartSet) { block := &Block{ @@ -150,14 +151,15 @@ func (b *Block) StringShort() string { type Header struct { ChainID string `json:"chain_id"` + Version string `json:"version"` // TODO: Height int `json:"height"` Time time.Time `json:"time"` - NumTxs int `json:"num_txs"` + NumTxs int `json:"num_txs"` // XXX: Can we get rid of this? LastBlockID BlockID `json:"last_block_id"` - LastCommitHash []byte `json:"last_commit_hash"` - DataHash []byte `json:"data_hash"` - ValidatorsHash []byte `json:"validators_hash"` - AppHash []byte `json:"app_hash"` // state merkle root of txs from the previous block + LastCommitHash []byte `json:"last_commit_hash"` // commit from validators from the last block + DataHash []byte `json:"data_hash"` // transactions + ValidatorsHash []byte `json:"validators_hash"` // validators for the current block + AppHash []byte `json:"app_hash"` // state after txs from the previous block } // NOTE: hash is nil if required fields are missing. @@ -291,6 +293,8 @@ func (commit *Commit) ValidateBasic() error { return errors.New("No precommits in commit") } height, round := commit.Height(), commit.Round() + + // validate the precommits for _, precommit := range commit.Precommits { // It's OK for precommits to be missing. if precommit == nil { diff --git a/types/protobuf.go b/types/protobuf.go index a7a95d12..e1f03353 100644 --- a/types/protobuf.go +++ b/types/protobuf.go @@ -12,7 +12,7 @@ type tm2pb struct{} func (tm2pb) Header(header *Header) *types.Header { return &types.Header{ ChainId: header.ChainID, - Height: int32(header.Height), + Height: uint64(header.Height), Time: uint64(header.Time.Unix()), NumTxs: uint64(header.NumTxs), LastBlockId: TM2PB.BlockID(header.LastBlockID), diff --git a/types/validator.go b/types/validator.go index 2e45ebba..479824e6 100644 --- a/types/validator.go +++ b/types/validator.go @@ -12,14 +12,21 @@ import ( // Volatile state for each Validator // TODO: make non-volatile identity -// - Remove LastCommitHeight, send bitarray of vals that signed in BeginBlock // - Remove Accum - it can be computed, and now valset becomes identifying type Validator struct { - Address []byte `json:"address"` - PubKey crypto.PubKey `json:"pub_key"` - LastCommitHeight int `json:"last_commit_height"` - VotingPower int64 `json:"voting_power"` - Accum int64 `json:"accum"` + Address []byte `json:"address"` + PubKey crypto.PubKey `json:"pub_key"` + VotingPower int64 `json:"voting_power"` + Accum int64 `json:"accum"` +} + +func NewValidator(pubKey crypto.PubKey, votingPower int64) *Validator { + return &Validator{ + Address: pubKey.Address(), + PubKey: pubKey, + VotingPower: votingPower, + Accum: 0, + } } // Creates a new copy of the validator so we can mutate accum. @@ -57,7 +64,6 @@ func (v *Validator) String() string { return fmt.Sprintf("Validator{%X %v %v VP:%v A:%v}", v.Address, v.PubKey, - v.LastCommitHeight, v.VotingPower, v.Accum) } @@ -96,12 +102,6 @@ func RandValidator(randPower bool, minPower int64) (*Validator, *PrivValidator) if randPower { votePower += int64(RandUint32()) } - val := &Validator{ - Address: privVal.Address, - PubKey: privVal.PubKey, - LastCommitHeight: 0, - VotingPower: votePower, - Accum: 0, - } + val := NewValidator(privVal.PubKey, votePower) return val, privVal } diff --git a/types/validator_set_test.go b/types/validator_set_test.go index a94a4ebb..9107e77e 100644 --- a/types/validator_set_test.go +++ b/types/validator_set_test.go @@ -16,12 +16,9 @@ func randPubKey() crypto.PubKeyEd25519 { } func randValidator_() *Validator { - return &Validator{ - Address: RandBytes(20), - PubKey: randPubKey(), - VotingPower: RandInt64(), - Accum: RandInt64(), - } + val := NewValidator(randPubKey(), RandInt64()) + val.Accum = RandInt64() + return val } func randValidatorSet(numValidators int) *ValidatorSet { @@ -147,10 +144,7 @@ func BenchmarkValidatorSetCopy(b *testing.B) { for i := 0; i < 1000; i++ { privKey := crypto.GenPrivKeyEd25519() pubKey := privKey.PubKey().(crypto.PubKeyEd25519) - val := &Validator{ - Address: pubKey.Address(), - PubKey: pubKey, - } + val := NewValidator(pubKey, 0) if !vset.Add(val) { panic("Failed to add validator") }