consensus: test validator set change
This commit is contained in:
parent
65496ace20
commit
2f9063c1d6
|
@ -0,0 +1,278 @@
|
|||
package consensus
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/config/tendermint_test"
|
||||
|
||||
. "github.com/tendermint/go-common"
|
||||
cfg "github.com/tendermint/go-config"
|
||||
"github.com/tendermint/go-crypto"
|
||||
"github.com/tendermint/go-events"
|
||||
"github.com/tendermint/go-p2p"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func init() {
|
||||
config = tendermint_test.ResetConfig("consensus_byzantine_test")
|
||||
}
|
||||
|
||||
//----------------------------------------------
|
||||
// byzantine failures
|
||||
|
||||
// 4 validators. 1 is byzantine. The other three are partitioned into A (1 val) and B (2 vals).
|
||||
// byzantine validator sends conflicting proposals into A and B,
|
||||
// and prevotes/precommits on both of them.
|
||||
// B sees a commit, A doesn't.
|
||||
// Byzantine validator refuses to prevote.
|
||||
// Heal partition and ensure A sees the commit
|
||||
func TestByzantine(t *testing.T) {
|
||||
resetConfigTimeouts()
|
||||
N := 4
|
||||
css := randConsensusNet(N)
|
||||
|
||||
switches := make([]*p2p.Switch, N)
|
||||
for i := 0; i < N; i++ {
|
||||
switches[i] = p2p.NewSwitch(cfg.NewMapConfig(nil))
|
||||
}
|
||||
|
||||
reactors := make([]p2p.Reactor, N)
|
||||
eventChans := make([]chan interface{}, N)
|
||||
for i := 0; i < N; i++ {
|
||||
if i == 0 {
|
||||
css[i].privValidator = NewByzantinePrivValidator(css[i].privValidator.(*types.PrivValidator))
|
||||
// make byzantine
|
||||
css[i].decideProposal = func(j int) func(int, int) {
|
||||
return func(height, round int) {
|
||||
byzantineDecideProposalFunc(height, round, css[j], switches[j])
|
||||
}
|
||||
}(i)
|
||||
css[i].doPrevote = func(height, round int) {}
|
||||
}
|
||||
|
||||
eventSwitch := events.NewEventSwitch()
|
||||
_, err := eventSwitch.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start switch: %v", err)
|
||||
}
|
||||
eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
|
||||
|
||||
conR := NewConsensusReactor(css[i], false)
|
||||
conR.SetEventSwitch(eventSwitch)
|
||||
|
||||
var conRI p2p.Reactor
|
||||
conRI = conR
|
||||
if i == 0 {
|
||||
conRI = NewByzantineReactor(conR)
|
||||
}
|
||||
reactors[i] = conRI
|
||||
}
|
||||
|
||||
p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
// ignore new switch s, we already made ours
|
||||
switches[i].AddReactor("CONSENSUS", reactors[i])
|
||||
return switches[i]
|
||||
}, func(sws []*p2p.Switch, i, j int) {
|
||||
// the network starts partitioned with globally active adversary
|
||||
if i != 0 {
|
||||
return
|
||||
}
|
||||
p2p.Connect2Switches(sws, i, j)
|
||||
})
|
||||
|
||||
// byz proposer sends one block to peers[0]
|
||||
// and the other block to peers[1] and peers[2].
|
||||
// note peers and switches order don't match.
|
||||
peers := switches[0].Peers().List()
|
||||
ind0 := getSwitchIndex(switches, peers[0])
|
||||
ind1 := getSwitchIndex(switches, peers[1])
|
||||
ind2 := getSwitchIndex(switches, peers[2])
|
||||
|
||||
// connect the 2 peers in the larger partition
|
||||
p2p.Connect2Switches(switches, ind1, ind2)
|
||||
|
||||
// wait for someone in the big partition to make a block
|
||||
|
||||
select {
|
||||
case <-eventChans[ind2]:
|
||||
}
|
||||
|
||||
log.Notice("A block has been committed. Healing partition")
|
||||
|
||||
// connect the partitions
|
||||
p2p.Connect2Switches(switches, ind0, ind1)
|
||||
p2p.Connect2Switches(switches, ind0, ind2)
|
||||
|
||||
// wait till everyone makes the first new block
|
||||
// (one of them already has)
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(2)
|
||||
for i := 1; i < N-1; i++ {
|
||||
go func(j int) {
|
||||
<-eventChans[j]
|
||||
wg.Done()
|
||||
}(i)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
tick := time.NewTicker(time.Second * 10)
|
||||
select {
|
||||
case <-done:
|
||||
case <-tick.C:
|
||||
for i, reactor := range reactors {
|
||||
t.Log(Fmt("Consensus Reactor %v", i))
|
||||
t.Log(Fmt("%v", reactor))
|
||||
}
|
||||
t.Fatalf("Timed out waiting for all validators to commit first block")
|
||||
}
|
||||
}
|
||||
|
||||
//-------------------------------
|
||||
// byzantine consensus functions
|
||||
|
||||
func byzantineDecideProposalFunc(height, round int, cs *ConsensusState, sw *p2p.Switch) {
|
||||
// byzantine user should create two proposals and try to split the vote.
|
||||
// Avoid sending on internalMsgQueue and running consensus state.
|
||||
|
||||
// Create a new proposal block from state/txs from the mempool.
|
||||
block1, blockParts1 := cs.createProposalBlock()
|
||||
polRound, polBlockID := cs.Votes.POLInfo()
|
||||
proposal1 := types.NewProposal(height, round, blockParts1.Header(), polRound, polBlockID)
|
||||
cs.privValidator.SignProposal(cs.state.ChainID, proposal1) // byzantine doesnt err
|
||||
|
||||
// Create a new proposal block from state/txs from the mempool.
|
||||
block2, blockParts2 := cs.createProposalBlock()
|
||||
polRound, polBlockID = cs.Votes.POLInfo()
|
||||
proposal2 := types.NewProposal(height, round, blockParts2.Header(), polRound, polBlockID)
|
||||
cs.privValidator.SignProposal(cs.state.ChainID, proposal2) // byzantine doesnt err
|
||||
|
||||
block1Hash := block1.Hash()
|
||||
block2Hash := block2.Hash()
|
||||
|
||||
// broadcast conflicting proposals/block parts to peers
|
||||
peers := sw.Peers().List()
|
||||
log.Notice("Byzantine: broadcasting conflicting proposals", "peers", len(peers))
|
||||
for i, peer := range peers {
|
||||
if i < len(peers)/2 {
|
||||
go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
|
||||
} else {
|
||||
go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func sendProposalAndParts(height, round int, cs *ConsensusState, peer *p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) {
|
||||
// proposal
|
||||
msg := &ProposalMessage{Proposal: proposal}
|
||||
peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
|
||||
|
||||
// parts
|
||||
for i := 0; i < parts.Total(); i++ {
|
||||
part := parts.GetPart(i)
|
||||
msg := &BlockPartMessage{
|
||||
Height: height, // This tells peer that this part applies to us.
|
||||
Round: round, // This tells peer that this part applies to us.
|
||||
Part: part,
|
||||
}
|
||||
peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
|
||||
}
|
||||
|
||||
// votes
|
||||
cs.mtx.Lock()
|
||||
prevote, _ := cs.signVote(types.VoteTypePrevote, blockHash, parts.Header())
|
||||
precommit, _ := cs.signVote(types.VoteTypePrecommit, blockHash, parts.Header())
|
||||
cs.mtx.Unlock()
|
||||
|
||||
peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{prevote}})
|
||||
peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{precommit}})
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
// byzantine consensus reactor
|
||||
|
||||
type ByzantineReactor struct {
|
||||
Service
|
||||
reactor *ConsensusReactor
|
||||
}
|
||||
|
||||
func NewByzantineReactor(conR *ConsensusReactor) *ByzantineReactor {
|
||||
return &ByzantineReactor{
|
||||
Service: conR,
|
||||
reactor: conR,
|
||||
}
|
||||
}
|
||||
|
||||
func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
|
||||
func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
|
||||
func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) {
|
||||
if !br.reactor.IsRunning() {
|
||||
return
|
||||
}
|
||||
|
||||
// Create peerState for peer
|
||||
peerState := NewPeerState(peer)
|
||||
peer.Data.Set(types.PeerStateKey, peerState)
|
||||
|
||||
// Send our state to peer.
|
||||
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
|
||||
if !br.reactor.fastSync {
|
||||
br.reactor.sendNewRoundStepMessage(peer)
|
||||
}
|
||||
}
|
||||
func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
|
||||
br.reactor.RemovePeer(peer, reason)
|
||||
}
|
||||
func (br *ByzantineReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) {
|
||||
br.reactor.Receive(chID, peer, msgBytes)
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
// byzantine privValidator
|
||||
|
||||
type ByzantinePrivValidator struct {
|
||||
Address []byte `json:"address"`
|
||||
types.Signer `json:"-"`
|
||||
|
||||
mtx sync.Mutex
|
||||
}
|
||||
|
||||
// Return a priv validator that will sign anything
|
||||
func NewByzantinePrivValidator(pv *types.PrivValidator) *ByzantinePrivValidator {
|
||||
return &ByzantinePrivValidator{
|
||||
Address: pv.Address,
|
||||
Signer: pv.Signer,
|
||||
}
|
||||
}
|
||||
|
||||
func (privVal *ByzantinePrivValidator) GetAddress() []byte {
|
||||
return privVal.Address
|
||||
}
|
||||
|
||||
func (privVal *ByzantinePrivValidator) SignVote(chainID string, vote *types.Vote) error {
|
||||
privVal.mtx.Lock()
|
||||
defer privVal.mtx.Unlock()
|
||||
|
||||
// Sign
|
||||
vote.Signature = privVal.Sign(types.SignBytes(chainID, vote)).(crypto.SignatureEd25519)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *types.Proposal) error {
|
||||
privVal.mtx.Lock()
|
||||
defer privVal.mtx.Unlock()
|
||||
|
||||
// Sign
|
||||
proposal.Signature = privVal.Sign(types.SignBytes(chainID, proposal)).(crypto.SignatureEd25519)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (privVal *ByzantinePrivValidator) String() string {
|
||||
return Fmt("PrivValidator{%X}", privVal.Address)
|
||||
}
|
|
@ -4,7 +4,7 @@ import (
|
|||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// NOTE: this is blocking
|
||||
// NOTE: if chanCap=0, this blocks on the event being consumed
|
||||
func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap int) chan interface{} {
|
||||
// listen for event
|
||||
ch := make(chan interface{}, chanCap)
|
||||
|
@ -13,3 +13,14 @@ func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap
|
|||
})
|
||||
return ch
|
||||
}
|
||||
|
||||
// NOTE: this blocks on receiving a response after the event is consumed
|
||||
func subscribeToEventRespond(evsw types.EventSwitch, receiver, eventID string) chan interface{} {
|
||||
// listen for event
|
||||
ch := make(chan interface{})
|
||||
types.AddListenerForEvent(evsw, receiver, eventID, func(data types.TMEventData) {
|
||||
ch <- data
|
||||
<-ch
|
||||
})
|
||||
return ch
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package consensus
|
|||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -11,6 +12,7 @@ import (
|
|||
. "github.com/tendermint/go-common"
|
||||
cfg "github.com/tendermint/go-config"
|
||||
dbm "github.com/tendermint/go-db"
|
||||
"github.com/tendermint/go-p2p"
|
||||
bc "github.com/tendermint/tendermint/blockchain"
|
||||
"github.com/tendermint/tendermint/config/tendermint_test"
|
||||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
|
@ -33,6 +35,8 @@ type validatorStub struct {
|
|||
*types.PrivValidator
|
||||
}
|
||||
|
||||
var testMinPower = 10
|
||||
|
||||
func NewValidatorStub(privValidator *types.PrivValidator, valIndex int) *validatorStub {
|
||||
return &validatorStub{
|
||||
Index: valIndex,
|
||||
|
@ -266,6 +270,31 @@ func randConsensusNet(nValidators int) []*ConsensusState {
|
|||
return css
|
||||
}
|
||||
|
||||
// nPeers = nValidators + nNotValidator
|
||||
func randConsensusNetWithPeers(nValidators int, nPeers int) []*ConsensusState {
|
||||
genDoc, privVals := randGenesisDoc(nValidators, false, int64(testMinPower))
|
||||
css := make([]*ConsensusState, nPeers)
|
||||
for i := 0; i < nPeers; i++ {
|
||||
db := dbm.NewMemDB() // each state needs its own db
|
||||
state := sm.MakeGenesisState(db, genDoc)
|
||||
state.Save()
|
||||
thisConfig := tendermint_test.ResetConfig(Fmt("consensus_reactor_test_%d", i))
|
||||
EnsureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal
|
||||
var privVal *types.PrivValidator
|
||||
if i < nValidators {
|
||||
privVal = privVals[i]
|
||||
} else {
|
||||
privVal = types.GenPrivValidator()
|
||||
_, tempFilePath := Tempfile("priv_validator_")
|
||||
privVal.SetFile(tempFilePath)
|
||||
}
|
||||
|
||||
dir, _ := ioutil.TempDir("/tmp", "persistent-dummy")
|
||||
css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, dummy.NewPersistentDummyApplication(dir))
|
||||
}
|
||||
return css
|
||||
}
|
||||
|
||||
func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
|
||||
voteCh0 := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1)
|
||||
voteCh := make(chan interface{})
|
||||
|
@ -325,3 +354,16 @@ func startTestRound(cs *ConsensusState, height, round int) {
|
|||
cs.enterNewRound(height, round)
|
||||
cs.startRoutines(0)
|
||||
}
|
||||
|
||||
//--------------------------------
|
||||
// reactor stuff
|
||||
|
||||
func getSwitchIndex(switches []*p2p.Switch, peer *p2p.Peer) int {
|
||||
for i, s := range switches {
|
||||
if bytes.Equal(peer.NodeInfo.PubKey.Address(), s.NodeInfo().PubKey.Address()) {
|
||||
return i
|
||||
}
|
||||
}
|
||||
panic("didnt find peer in switches")
|
||||
return -1
|
||||
}
|
||||
|
|
|
@ -111,6 +111,7 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
|
|||
|
||||
// Implements Reactor
|
||||
func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
|
||||
log.Info("ADDING PEER")
|
||||
if !conR.IsRunning() {
|
||||
return
|
||||
}
|
||||
|
@ -949,7 +950,7 @@ func (ps *PeerState) SetHasVote(vote *types.Vote) {
|
|||
|
||||
func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
|
||||
log := log.New("peer", ps.Peer, "peerRound", ps.Round, "height", height, "round", round)
|
||||
log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
|
||||
log.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
|
||||
|
||||
// NOTE: some may be nil BitArrays -> no side effects.
|
||||
ps.getVoteBitArray(height, round, type_).SetIndex(index, true)
|
||||
|
|
|
@ -1,20 +1,18 @@
|
|||
package consensus
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/config/tendermint_test"
|
||||
|
||||
. "github.com/tendermint/go-common"
|
||||
cfg "github.com/tendermint/go-config"
|
||||
"github.com/tendermint/go-crypto"
|
||||
"github.com/tendermint/go-events"
|
||||
"github.com/tendermint/go-logger"
|
||||
"github.com/tendermint/go-p2p"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
"github.com/tendermint/tmsp/example/dummy"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -22,7 +20,7 @@ func init() {
|
|||
}
|
||||
|
||||
func resetConfigTimeouts() {
|
||||
logger.SetLogLevel("notice")
|
||||
logger.SetLogLevel("info")
|
||||
//config.Set("log_level", "notice")
|
||||
config.Set("timeout_propose", 2000)
|
||||
// config.Set("timeout_propose_delta", 500)
|
||||
|
@ -30,9 +28,13 @@ func resetConfigTimeouts() {
|
|||
// config.Set("timeout_prevote_delta", 500)
|
||||
// config.Set("timeout_precommit", 1000)
|
||||
// config.Set("timeout_precommit_delta", 500)
|
||||
// config.Set("timeout_commit", 1000)
|
||||
config.Set("timeout_commit", 1000)
|
||||
}
|
||||
|
||||
//----------------------------------------------
|
||||
// in-process testnets
|
||||
|
||||
// Ensure a testnet makes blocks
|
||||
func TestReactor(t *testing.T) {
|
||||
resetConfigTimeouts()
|
||||
N := 4
|
||||
|
@ -51,19 +53,120 @@ func TestReactor(t *testing.T) {
|
|||
reactors[i].SetEventSwitch(eventSwitch)
|
||||
eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
|
||||
}
|
||||
// make connected switches and start all reactors
|
||||
p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
s.AddReactor("CONSENSUS", reactors[i])
|
||||
return s
|
||||
}, p2p.Connect2Switches)
|
||||
|
||||
// wait till everyone makes the first new block
|
||||
timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) {
|
||||
<-eventChans[j]
|
||||
wg.Done()
|
||||
})
|
||||
}
|
||||
|
||||
//-------------------------------------------------------------
|
||||
// ensure we can make blocks despite cycling a validator set
|
||||
|
||||
func TestValidatorSetChanges(t *testing.T) {
|
||||
resetConfigTimeouts()
|
||||
nPeers := 8
|
||||
nVals := 4
|
||||
css := randConsensusNetWithPeers(nVals, nPeers)
|
||||
reactors := make([]*ConsensusReactor, nPeers)
|
||||
eventChans := make([]chan interface{}, nPeers)
|
||||
for i := 0; i < nPeers; i++ {
|
||||
reactors[i] = NewConsensusReactor(css[i], false)
|
||||
|
||||
eventSwitch := events.NewEventSwitch()
|
||||
_, err := eventSwitch.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start switch: %v", err)
|
||||
}
|
||||
|
||||
reactors[i].SetEventSwitch(eventSwitch)
|
||||
eventChans[i] = subscribeToEventRespond(eventSwitch, "tester", types.EventStringNewBlock())
|
||||
}
|
||||
p2p.MakeConnectedSwitches(nPeers, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
s.AddReactor("CONSENSUS", reactors[i])
|
||||
return s
|
||||
}, p2p.Connect2Switches)
|
||||
|
||||
// map of active validators
|
||||
activeVals := make(map[string]struct{})
|
||||
for i := 0; i < nVals; i++ {
|
||||
activeVals[string(css[i].privValidator.GetAddress())] = struct{}{}
|
||||
}
|
||||
|
||||
// wait till everyone makes block 1
|
||||
timeoutWaitGroup(t, nPeers, func(wg *sync.WaitGroup, j int) {
|
||||
<-eventChans[j]
|
||||
eventChans[j] <- struct{}{}
|
||||
wg.Done()
|
||||
})
|
||||
|
||||
newValidatorPubKey := css[nVals].privValidator.(*types.PrivValidator).PubKey
|
||||
newValidatorTx := dummy.MakeValSetChangeTx(newValidatorPubKey.Bytes(), uint64(testMinPower))
|
||||
|
||||
// wait till everyone makes block 2
|
||||
// ensure the commit includes all validators
|
||||
// send newValTx to change vals in block 3
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx)
|
||||
|
||||
// wait till everyone makes block 3.
|
||||
// it includes the commit for block 2, which is by the original validator set
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
|
||||
|
||||
// wait till everyone makes block 4.
|
||||
// it includes the commit for block 3, which is by the original validator set
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
|
||||
|
||||
// the commits for block 4 should be with the updated validator set
|
||||
activeVals[string(newValidatorPubKey.Address())] = struct{}{}
|
||||
|
||||
// wait till everyone makes block 5
|
||||
// it includes the commit for block 4, which should have the updated validator set
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
|
||||
|
||||
// TODO: test more changes!
|
||||
}
|
||||
|
||||
func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
|
||||
timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) {
|
||||
newBlock := <-eventChans[j]
|
||||
err := validateBlock(newBlock.(types.EventDataNewBlock).Block, activeVals)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, tx := range txs {
|
||||
css[j].mempool.CheckTx(tx, nil)
|
||||
}
|
||||
|
||||
eventChans[j] <- struct{}{}
|
||||
wg.Done()
|
||||
})
|
||||
}
|
||||
|
||||
// expects high synchrony!
|
||||
func validateBlock(block *types.Block, activeVals map[string]struct{}) error {
|
||||
if block.LastCommit.Size() != len(activeVals) {
|
||||
return fmt.Errorf("Commit size doesn't match number of active validators. Got %d, expected %d", block.LastCommit.Size(), len(activeVals))
|
||||
}
|
||||
|
||||
for _, vote := range block.LastCommit.Precommits {
|
||||
if _, ok := activeVals[string(vote.ValidatorAddress)]; !ok {
|
||||
return fmt.Errorf("Found vote for unactive validator %X", vote.ValidatorAddress)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func timeoutWaitGroup(t *testing.T, n int, f func(*sync.WaitGroup, int)) {
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(N)
|
||||
for i := 0; i < N; i++ {
|
||||
go func(j int) {
|
||||
<-eventChans[j]
|
||||
wg.Done()
|
||||
}(i)
|
||||
wg.Add(n)
|
||||
for i := 0; i < n; i++ {
|
||||
go f(wg, i)
|
||||
}
|
||||
|
||||
// Make wait into a channel
|
||||
|
@ -77,271 +180,6 @@ func TestReactor(t *testing.T) {
|
|||
select {
|
||||
case <-done:
|
||||
case <-tick.C:
|
||||
t.Fatalf("Timed out waiting for all validators to commit first block")
|
||||
t.Fatalf("Timed out waiting for all validators to commit a block")
|
||||
}
|
||||
}
|
||||
|
||||
// 4 validators. 1 is byzantine. The other three are partitioned into A (1 val) and B (2 vals).
|
||||
// byzantine validator sends conflicting proposals into A and B,
|
||||
// and prevotes/precommits on both of them.
|
||||
// B sees a commit, A doesn't.
|
||||
// Byzantine validator refuses to prevote.
|
||||
// Heal partition and ensure A sees the commit
|
||||
func TestByzantine(t *testing.T) {
|
||||
resetConfigTimeouts()
|
||||
N := 4
|
||||
css := randConsensusNet(N)
|
||||
|
||||
switches := make([]*p2p.Switch, N)
|
||||
for i := 0; i < N; i++ {
|
||||
switches[i] = p2p.NewSwitch(cfg.NewMapConfig(nil))
|
||||
}
|
||||
|
||||
reactors := make([]p2p.Reactor, N)
|
||||
eventChans := make([]chan interface{}, N)
|
||||
for i := 0; i < N; i++ {
|
||||
if i == 0 {
|
||||
css[i].privValidator = NewByzantinePrivValidator(css[i].privValidator.(*types.PrivValidator))
|
||||
// make byzantine
|
||||
css[i].decideProposal = func(j int) func(int, int) {
|
||||
return func(height, round int) {
|
||||
byzantineDecideProposalFunc(height, round, css[j], switches[j])
|
||||
}
|
||||
}(i)
|
||||
css[i].doPrevote = func(height, round int) {}
|
||||
}
|
||||
|
||||
eventSwitch := events.NewEventSwitch()
|
||||
_, err := eventSwitch.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start switch: %v", err)
|
||||
}
|
||||
eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
|
||||
|
||||
conR := NewConsensusReactor(css[i], false)
|
||||
conR.SetEventSwitch(eventSwitch)
|
||||
|
||||
var conRI p2p.Reactor
|
||||
conRI = conR
|
||||
if i == 0 {
|
||||
conRI = NewByzantineReactor(conR)
|
||||
}
|
||||
reactors[i] = conRI
|
||||
}
|
||||
|
||||
p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
// ignore new switch s, we already made ours
|
||||
switches[i].AddReactor("CONSENSUS", reactors[i])
|
||||
return switches[i]
|
||||
}, func(sws []*p2p.Switch, i, j int) {
|
||||
// the network starts partitioned with globally active adversary
|
||||
if i != 0 {
|
||||
return
|
||||
}
|
||||
p2p.Connect2Switches(sws, i, j)
|
||||
})
|
||||
|
||||
// byz proposer sends one block to peers[0]
|
||||
// and the other block to peers[1] and peers[2].
|
||||
// note peers and switches order don't match.
|
||||
peers := switches[0].Peers().List()
|
||||
ind0 := getSwitchIndex(switches, peers[0])
|
||||
ind1 := getSwitchIndex(switches, peers[1])
|
||||
ind2 := getSwitchIndex(switches, peers[2])
|
||||
|
||||
// connect the 2 peers in the larger partition
|
||||
p2p.Connect2Switches(switches, ind1, ind2)
|
||||
|
||||
// wait for someone in the big partition to make a block
|
||||
|
||||
select {
|
||||
case <-eventChans[ind2]:
|
||||
}
|
||||
|
||||
log.Notice("A block has been committed. Healing partition")
|
||||
|
||||
// connect the partitions
|
||||
p2p.Connect2Switches(switches, ind0, ind1)
|
||||
p2p.Connect2Switches(switches, ind0, ind2)
|
||||
|
||||
// wait till everyone makes the first new block
|
||||
// (one of them already has)
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(2)
|
||||
for i := 1; i < N-1; i++ {
|
||||
go func(j int) {
|
||||
<-eventChans[j]
|
||||
wg.Done()
|
||||
}(i)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
tick := time.NewTicker(time.Second * 10)
|
||||
select {
|
||||
case <-done:
|
||||
case <-tick.C:
|
||||
for i, reactor := range reactors {
|
||||
t.Log(Fmt("Consensus Reactor %v", i))
|
||||
t.Log(Fmt("%v", reactor))
|
||||
}
|
||||
t.Fatalf("Timed out waiting for all validators to commit first block")
|
||||
}
|
||||
}
|
||||
|
||||
func getSwitchIndex(switches []*p2p.Switch, peer *p2p.Peer) int {
|
||||
for i, s := range switches {
|
||||
if bytes.Equal(peer.NodeInfo.PubKey.Address(), s.NodeInfo().PubKey.Address()) {
|
||||
return i
|
||||
}
|
||||
}
|
||||
panic("didnt find peer in switches")
|
||||
return -1
|
||||
}
|
||||
|
||||
//-------------------------------
|
||||
// byzantine consensus functions
|
||||
|
||||
func byzantineDecideProposalFunc(height, round int, cs *ConsensusState, sw *p2p.Switch) {
|
||||
// byzantine user should create two proposals and try to split the vote.
|
||||
// Avoid sending on internalMsgQueue and running consensus state.
|
||||
|
||||
// Create a new proposal block from state/txs from the mempool.
|
||||
block1, blockParts1 := cs.createProposalBlock()
|
||||
polRound, polBlockID := cs.Votes.POLInfo()
|
||||
proposal1 := types.NewProposal(height, round, blockParts1.Header(), polRound, polBlockID)
|
||||
cs.privValidator.SignProposal(cs.state.ChainID, proposal1) // byzantine doesnt err
|
||||
|
||||
// Create a new proposal block from state/txs from the mempool.
|
||||
block2, blockParts2 := cs.createProposalBlock()
|
||||
polRound, polBlockID = cs.Votes.POLInfo()
|
||||
proposal2 := types.NewProposal(height, round, blockParts2.Header(), polRound, polBlockID)
|
||||
cs.privValidator.SignProposal(cs.state.ChainID, proposal2) // byzantine doesnt err
|
||||
|
||||
block1Hash := block1.Hash()
|
||||
block2Hash := block2.Hash()
|
||||
|
||||
// broadcast conflicting proposals/block parts to peers
|
||||
peers := sw.Peers().List()
|
||||
log.Notice("Byzantine: broadcasting conflicting proposals", "peers", len(peers))
|
||||
for i, peer := range peers {
|
||||
if i < len(peers)/2 {
|
||||
go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
|
||||
} else {
|
||||
go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func sendProposalAndParts(height, round int, cs *ConsensusState, peer *p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) {
|
||||
// proposal
|
||||
msg := &ProposalMessage{Proposal: proposal}
|
||||
peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
|
||||
|
||||
// parts
|
||||
for i := 0; i < parts.Total(); i++ {
|
||||
part := parts.GetPart(i)
|
||||
msg := &BlockPartMessage{
|
||||
Height: height, // This tells peer that this part applies to us.
|
||||
Round: round, // This tells peer that this part applies to us.
|
||||
Part: part,
|
||||
}
|
||||
peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
|
||||
}
|
||||
|
||||
// votes
|
||||
cs.mtx.Lock()
|
||||
prevote, _ := cs.signVote(types.VoteTypePrevote, blockHash, parts.Header())
|
||||
precommit, _ := cs.signVote(types.VoteTypePrecommit, blockHash, parts.Header())
|
||||
cs.mtx.Unlock()
|
||||
|
||||
peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{prevote}})
|
||||
peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{precommit}})
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
// byzantine consensus reactor
|
||||
|
||||
type ByzantineReactor struct {
|
||||
Service
|
||||
reactor *ConsensusReactor
|
||||
}
|
||||
|
||||
func NewByzantineReactor(conR *ConsensusReactor) *ByzantineReactor {
|
||||
return &ByzantineReactor{
|
||||
Service: conR,
|
||||
reactor: conR,
|
||||
}
|
||||
}
|
||||
|
||||
func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
|
||||
func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
|
||||
func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) {
|
||||
if !br.reactor.IsRunning() {
|
||||
return
|
||||
}
|
||||
|
||||
// Create peerState for peer
|
||||
peerState := NewPeerState(peer)
|
||||
peer.Data.Set(types.PeerStateKey, peerState)
|
||||
|
||||
// Send our state to peer.
|
||||
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
|
||||
if !br.reactor.fastSync {
|
||||
br.reactor.sendNewRoundStepMessage(peer)
|
||||
}
|
||||
}
|
||||
func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
|
||||
br.reactor.RemovePeer(peer, reason)
|
||||
}
|
||||
func (br *ByzantineReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) {
|
||||
br.reactor.Receive(chID, peer, msgBytes)
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
// byzantine privValidator
|
||||
|
||||
type ByzantinePrivValidator struct {
|
||||
Address []byte `json:"address"`
|
||||
types.Signer `json:"-"`
|
||||
|
||||
mtx sync.Mutex
|
||||
}
|
||||
|
||||
// Return a priv validator that will sign anything
|
||||
func NewByzantinePrivValidator(pv *types.PrivValidator) *ByzantinePrivValidator {
|
||||
return &ByzantinePrivValidator{
|
||||
Address: pv.Address,
|
||||
Signer: pv.Signer,
|
||||
}
|
||||
}
|
||||
|
||||
func (privVal *ByzantinePrivValidator) GetAddress() []byte {
|
||||
return privVal.Address
|
||||
}
|
||||
|
||||
func (privVal *ByzantinePrivValidator) SignVote(chainID string, vote *types.Vote) error {
|
||||
privVal.mtx.Lock()
|
||||
defer privVal.mtx.Unlock()
|
||||
|
||||
// Sign
|
||||
vote.Signature = privVal.Sign(types.SignBytes(chainID, vote)).(crypto.SignatureEd25519)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *types.Proposal) error {
|
||||
privVal.mtx.Lock()
|
||||
defer privVal.mtx.Unlock()
|
||||
|
||||
// Sign
|
||||
proposal.Signature = privVal.Sign(types.SignBytes(chainID, proposal)).(crypto.SignatureEd25519)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (privVal *ByzantinePrivValidator) String() string {
|
||||
return Fmt("PrivValidator{%X}", privVal.Address)
|
||||
}
|
||||
|
|
|
@ -130,7 +130,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
|
|||
|
||||
log.Info("Executed block", "height", block.Height, "valid txs", validTxs, "invalid txs", invalidTxs)
|
||||
if len(changedValidators) > 0 {
|
||||
log.Info("Update to validator set", "updates", changedValidators)
|
||||
log.Info("Update to validator set", "updates", tmsp.ValidatorsString(changedValidators))
|
||||
}
|
||||
return changedValidators, nil
|
||||
}
|
||||
|
@ -325,7 +325,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
log.Notice("TMSP Handshake", "height", blockInfo.BlockHeight, "block_hash", blockInfo.BlockHash, "app_hash", blockInfo.AppHash)
|
||||
log.Notice("TMSP Handshake", "height", blockInfo.BlockHeight, "app_hash", blockInfo.AppHash)
|
||||
|
||||
blockHeight := int(blockInfo.BlockHeight) // XXX: beware overflow
|
||||
appHash := blockInfo.AppHash
|
||||
|
|
|
@ -24,10 +24,16 @@ var (
|
|||
testPartSize = 65536
|
||||
)
|
||||
|
||||
//---------------------------------------
|
||||
// Test block execution
|
||||
|
||||
func TestExecBlock(t *testing.T) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
//---------------------------------------
|
||||
// Test handshake/replay
|
||||
|
||||
// Sync from scratch
|
||||
func TestHandshakeReplayAll(t *testing.T) {
|
||||
testHandshakeReplay(t, 0)
|
||||
|
@ -106,6 +112,7 @@ func testHandshakeReplay(t *testing.T, n int) {
|
|||
}
|
||||
|
||||
//--------------------------
|
||||
// utils for making blocks
|
||||
|
||||
// make some bogus txs
|
||||
func txsFunc(blockNum int) (txs []types.Tx) {
|
||||
|
|
|
@ -21,6 +21,7 @@ type Block struct {
|
|||
LastCommit *Commit `json:"last_commit"`
|
||||
}
|
||||
|
||||
// TODO: version
|
||||
func MakeBlock(height int, chainID string, txs []Tx, commit *Commit,
|
||||
prevBlockID BlockID, valHash, appHash []byte, partSize int) (*Block, *PartSet) {
|
||||
block := &Block{
|
||||
|
@ -150,9 +151,10 @@ func (b *Block) StringShort() string {
|
|||
|
||||
type Header struct {
|
||||
ChainID string `json:"chain_id"`
|
||||
Version string `json:"version"` // TODO:
|
||||
Height int `json:"height"`
|
||||
Time time.Time `json:"time"`
|
||||
NumTxs int `json:"num_txs"`
|
||||
NumTxs int `json:"num_txs"` // XXX: Can we get rid of this?
|
||||
LastBlockID BlockID `json:"last_block_id"`
|
||||
LastCommitHash []byte `json:"last_commit_hash"` // commit from validators from the last block
|
||||
DataHash []byte `json:"data_hash"` // transactions
|
||||
|
@ -291,6 +293,8 @@ func (commit *Commit) ValidateBasic() error {
|
|||
return errors.New("No precommits in commit")
|
||||
}
|
||||
height, round := commit.Height(), commit.Round()
|
||||
|
||||
// validate the precommits
|
||||
for _, precommit := range commit.Precommits {
|
||||
// It's OK for precommits to be missing.
|
||||
if precommit == nil {
|
||||
|
|
Loading…
Reference in New Issue