Merge pull request #74 from ava-labs/fix-consensus-bugs

Fix consensus bugs
This commit is contained in:
Stephen Buttolph 2020-06-16 14:45:10 -04:00 committed by GitHub
commit bf7cad2a37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 406 additions and 54 deletions

View File

@ -231,6 +231,7 @@ func (ta *Topological) pushVotes(
kahnNodes map[[32]byte]kahnNode, kahnNodes map[[32]byte]kahnNode,
leaves []ids.ID) ids.Bag { leaves []ids.ID) ids.Bag {
votes := make(ids.UniqueBag) votes := make(ids.UniqueBag)
txConflicts := make(map[[32]byte]ids.Set)
for len(leaves) > 0 { for len(leaves) > 0 {
newLeavesSize := len(leaves) - 1 newLeavesSize := len(leaves) - 1
@ -245,6 +246,12 @@ func (ta *Topological) pushVotes(
// Give the votes to the consumer // Give the votes to the consumer
txID := tx.ID() txID := tx.ID()
votes.UnionSet(txID, kahn.votes) votes.UnionSet(txID, kahn.votes)
// Map txID to set of Conflicts
txKey := txID.Key()
if _, exists := txConflicts[txKey]; !exists {
txConflicts[txKey] = ta.cg.Conflicts(tx)
}
} }
for _, dep := range vtx.Parents() { for _, dep := range vtx.Parents() {
@ -265,6 +272,18 @@ func (ta *Topological) pushVotes(
} }
} }
// Create bag of votes for conflicting transactions
conflictingVotes := make(ids.UniqueBag)
for txHash, conflicts := range txConflicts {
txID := ids.NewID(txHash)
for conflictTxHash := range conflicts {
conflictTxID := ids.NewID(conflictTxHash)
conflictingVotes.UnionSet(txID, votes.GetSet(conflictTxID))
}
}
votes.Difference(&conflictingVotes)
return votes.Bag(ta.params.Alpha) return votes.Bag(ta.params.Alpha)
} }

View File

@ -104,6 +104,78 @@ func TestAvalancheVoting(t *testing.T) {
} }
} }
func TestAvalancheIgnoreInvalidVoting(t *testing.T) {
params := Parameters{
Parameters: snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 3,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 1,
},
Parents: 2,
BatchSize: 1,
}
vts := []Vertex{&Vtx{
id: GenerateID(),
status: choices.Accepted,
}, &Vtx{
id: GenerateID(),
status: choices.Accepted,
}}
utxos := []ids.ID{GenerateID()}
ta := Topological{}
ta.Initialize(snow.DefaultContextTest(), params, vts)
tx0 := &snowstorm.TestTx{
Identifier: GenerateID(),
Stat: choices.Processing,
}
tx0.Ins.Add(utxos[0])
vtx0 := &Vtx{
dependencies: vts,
id: GenerateID(),
txs: []snowstorm.Tx{tx0},
height: 1,
status: choices.Processing,
}
tx1 := &snowstorm.TestTx{
Identifier: GenerateID(),
Stat: choices.Processing,
}
tx1.Ins.Add(utxos[0])
vtx1 := &Vtx{
dependencies: vts,
id: GenerateID(),
txs: []snowstorm.Tx{tx1},
height: 1,
status: choices.Processing,
}
ta.Add(vtx0)
ta.Add(vtx1)
sm := make(ids.UniqueBag)
sm.Add(0, vtx0.id)
sm.Add(1, vtx1.id)
// Add Illegal Vote cast by Response 2
sm.Add(2, vtx0.id)
sm.Add(2, vtx1.id)
ta.RecordPoll(sm)
if ta.Finalized() {
t.Fatalf("An avalanche instance finalized too early")
}
}
func TestAvalancheTransitiveVoting(t *testing.T) { func TestAvalancheTransitiveVoting(t *testing.T) {
params := Parameters{ params := Parameters{
Parameters: snowball.Parameters{ Parameters: snowball.Parameters{

View File

@ -27,11 +27,13 @@ func (sb *unarySnowball) Extend(beta int, choice int) BinarySnowball {
bs := &binarySnowball{ bs := &binarySnowball{
binarySnowflake: binarySnowflake{ binarySnowflake: binarySnowflake{
binarySlush: binarySlush{preference: choice}, binarySlush: binarySlush{preference: choice},
confidence: sb.confidence,
beta: beta, beta: beta,
finalized: sb.Finalized(), finalized: sb.Finalized(),
}, },
preference: choice, preference: choice,
} }
bs.numSuccessfulPolls[choice] = sb.numSuccessfulPolls
return bs return bs
} }

View File

@ -42,11 +42,32 @@ func TestUnarySnowball(t *testing.T) {
binarySnowball := sbClone.Extend(beta, 0) binarySnowball := sbClone.Extend(beta, 0)
expected := "SB(Preference = 0, NumSuccessfulPolls[0] = 2, NumSuccessfulPolls[1] = 0, SF(Confidence = 1, Finalized = false, SL(Preference = 0)))"
if result := binarySnowball.String(); result != expected {
t.Fatalf("Expected:\n%s\nReturned:\n%s", expected, result)
}
binarySnowball.RecordUnsuccessfulPoll() binarySnowball.RecordUnsuccessfulPoll()
for i := 0; i < 3; i++ {
if binarySnowball.Preference() != 0 {
t.Fatalf("Wrong preference")
} else if binarySnowball.Finalized() {
t.Fatalf("Should not have finalized")
}
binarySnowball.RecordSuccessfulPoll(1)
binarySnowball.RecordUnsuccessfulPoll()
}
if binarySnowball.Preference() != 1 {
t.Fatalf("Wrong preference")
} else if binarySnowball.Finalized() {
t.Fatalf("Should not have finalized")
}
binarySnowball.RecordSuccessfulPoll(1) binarySnowball.RecordSuccessfulPoll(1)
if binarySnowball.Preference() != 1 {
if binarySnowball.Finalized() { t.Fatalf("Wrong preference")
} else if binarySnowball.Finalized() {
t.Fatalf("Should not have finalized") t.Fatalf("Should not have finalized")
} }
@ -57,4 +78,9 @@ func TestUnarySnowball(t *testing.T) {
} else if !binarySnowball.Finalized() { } else if !binarySnowball.Finalized() {
t.Fatalf("Should have finalized") t.Fatalf("Should have finalized")
} }
expected = "SB(NumSuccessfulPolls = 2, SF(Confidence = 1, Finalized = false))"
if str := sb.String(); str != expected {
t.Fatalf("Wrong state. Expected:\n%s\nGot:\n%s", expected, str)
}
} }

View File

@ -78,9 +78,12 @@ func (i *issuer) Update() {
vdrSet.Add(vdr.ID()) vdrSet.Add(vdr.ID())
} }
toSample := ids.ShortSet{} // Copy to a new variable because we may remove an element in sender.Sender
toSample.Union(vdrSet) // and we don't want that to affect the set of validators we wait for [ie vdrSet]
i.t.RequestID++ i.t.RequestID++
if numVdrs := len(vdrs); numVdrs == p.K && i.t.polls.Add(i.t.RequestID, vdrSet.Len()) { if numVdrs := len(vdrs); numVdrs == p.K && i.t.polls.Add(i.t.RequestID, vdrSet) {
i.t.Config.Sender.PushQuery(vdrSet, i.t.RequestID, vtxID, i.vtx.Bytes()) i.t.Config.Sender.PushQuery(toSample, i.t.RequestID, vtxID, i.vtx.Bytes())
} else if numVdrs < p.K { } else if numVdrs < p.K {
i.t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", vtxID) i.t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", vtxID)
} }

View File

@ -38,10 +38,10 @@ type polls struct {
// Add to the current set of polls // Add to the current set of polls
// Returns true if the poll was registered correctly and the network sample // Returns true if the poll was registered correctly and the network sample
// should be made. // should be made.
func (p *polls) Add(requestID uint32, numPolled int) bool { func (p *polls) Add(requestID uint32, vdrs ids.ShortSet) bool {
poll, exists := p.m[requestID] poll, exists := p.m[requestID]
if !exists { if !exists {
poll.numPending = numPolled poll.polled = vdrs
p.m[requestID] = poll p.m[requestID] = poll
p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics
@ -59,7 +59,7 @@ func (p *polls) Vote(requestID uint32, vdr ids.ShortID, votes []ids.ID) (ids.Uni
return nil, false return nil, false
} }
poll.Vote(votes) poll.Vote(votes, vdr)
if poll.Finished() { if poll.Finished() {
p.log.Verbo("Poll is finished") p.log.Verbo("Poll is finished")
delete(p.m, requestID) delete(p.m, requestID)
@ -83,19 +83,19 @@ func (p *polls) String() string {
// poll represents the current state of a network poll for a vertex // poll represents the current state of a network poll for a vertex
type poll struct { type poll struct {
votes ids.UniqueBag votes ids.UniqueBag
numPending int polled ids.ShortSet
} }
// Vote registers a vote for this poll // Vote registers a vote for this poll
func (p *poll) Vote(votes []ids.ID) { func (p *poll) Vote(votes []ids.ID, vdr ids.ShortID) {
if p.numPending > 0 { if p.polled.Contains(vdr) {
p.numPending-- p.polled.Remove(vdr)
p.votes.Add(uint(p.numPending), votes...) p.votes.Add(uint(p.polled.Len()), votes...)
} }
} }
// Finished returns true if the poll has completed, with no more required // Finished returns true if the poll has completed, with no more required
// responses // responses
func (p poll) Finished() bool { return p.numPending <= 0 } func (p poll) Finished() bool { return p.polled.Len() == 0 }
func (p poll) String() string { return fmt.Sprintf("Waiting on %d chits", p.numPending) } func (p poll) String() string { return fmt.Sprintf("Waiting on %d chits", p.polled.Len()) }

View File

@ -471,8 +471,11 @@ func (t *Transitive) issueRepoll() {
vdrSet.Add(vdr.ID()) vdrSet.Add(vdr.ID())
} }
vdrCopy := ids.ShortSet{}
vdrCopy.Union((vdrSet))
t.RequestID++ t.RequestID++
if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet.Len()) { if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrCopy) {
t.Config.Sender.PullQuery(vdrSet, t.RequestID, vtxID) t.Config.Sender.PullQuery(vdrSet, t.RequestID, vtxID)
} else if numVdrs < p.K { } else if numVdrs < p.K {
t.Config.Context.Log.Error("re-query for %s was dropped due to an insufficient number of validators", vtxID) t.Config.Context.Log.Error("re-query for %s was dropped due to an insufficient number of validators", vtxID)

View File

@ -3085,3 +3085,120 @@ func TestEngineDuplicatedIssuance(t *testing.T) {
te.Notify(common.PendingTxs) te.Notify(common.PendingTxs)
} }
func TestEngineDoubleChit(t *testing.T) {
config := DefaultConfig()
config.Params.Alpha = 2
config.Params.K = 2
vdr0 := validators.GenerateRandomValidator(1)
vdr1 := validators.GenerateRandomValidator(1)
vals := validators.NewSet()
vals.Add(vdr0)
vals.Add(vdr1)
config.Validators = vals
sender := &common.SenderTest{}
sender.T = t
config.Sender = sender
sender.Default(true)
sender.CantGetAcceptedFrontier = false
st := &stateTest{t: t}
config.State = st
st.Default(true)
gVtx := &Vtx{
id: GenerateID(),
status: choices.Accepted,
}
mVtx := &Vtx{
id: GenerateID(),
status: choices.Accepted,
}
vts := []avalanche.Vertex{gVtx, mVtx}
utxos := []ids.ID{GenerateID()}
tx := &TestTx{
TestTx: snowstorm.TestTx{
Identifier: GenerateID(),
Stat: choices.Processing,
},
}
tx.Ins.Add(utxos[0])
vtx := &Vtx{
parents: vts,
id: GenerateID(),
txs: []snowstorm.Tx{tx},
height: 1,
status: choices.Processing,
bytes: []byte{1, 1, 2, 3},
}
st.edge = func() []ids.ID { return []ids.ID{vts[0].ID(), vts[1].ID()} }
st.getVertex = func(id ids.ID) (avalanche.Vertex, error) {
switch {
case id.Equals(gVtx.ID()):
return gVtx, nil
case id.Equals(mVtx.ID()):
return mVtx, nil
}
t.Fatalf("Unknown vertex")
panic("Should have errored")
}
te := &Transitive{}
te.Initialize(config)
te.finishBootstrapping()
reqID := new(uint32)
sender.PushQueryF = func(inVdrs ids.ShortSet, requestID uint32, vtxID ids.ID, _ []byte) {
*reqID = requestID
if inVdrs.Len() != 2 {
t.Fatalf("Wrong number of validators")
}
if !vtxID.Equals(vtx.ID()) {
t.Fatalf("Wrong vertex requested")
}
}
st.getVertex = func(id ids.ID) (avalanche.Vertex, error) {
switch {
case id.Equals(vtx.ID()):
return vtx, nil
}
t.Fatalf("Unknown vertex")
panic("Should have errored")
}
te.insert(vtx)
votes := ids.Set{}
votes.Add(vtx.ID())
if status := tx.Status(); status != choices.Processing {
t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Processing)
}
te.Chits(vdr0.ID(), *reqID, votes)
if status := tx.Status(); status != choices.Processing {
t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Processing)
}
te.Chits(vdr0.ID(), *reqID, votes)
if status := tx.Status(); status != choices.Processing {
t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Processing)
}
te.Chits(vdr1.ID(), *reqID, votes)
if status := tx.Status(); status != choices.Accepted {
t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Accepted)
}
}

View File

@ -22,11 +22,11 @@ type polls struct {
// Add to the current set of polls // Add to the current set of polls
// Returns true if the poll was registered correctly and the network sample // Returns true if the poll was registered correctly and the network sample
// should be made. // should be made.
func (p *polls) Add(requestID uint32, numPolled int) bool { func (p *polls) Add(requestID uint32, vdrs ids.ShortSet) bool {
poll, exists := p.m[requestID] poll, exists := p.m[requestID]
if !exists { if !exists {
poll.alpha = p.alpha poll.alpha = p.alpha
poll.numPolled = numPolled poll.polled = vdrs
p.m[requestID] = poll p.m[requestID] = poll
p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics
@ -42,7 +42,7 @@ func (p *polls) Vote(requestID uint32, vdr ids.ShortID, vote ids.ID) (ids.Bag, b
if !exists { if !exists {
return ids.Bag{}, false return ids.Bag{}, false
} }
poll.Vote(vote) poll.Vote(vote, vdr)
if poll.Finished() { if poll.Finished() {
delete(p.m, requestID) delete(p.m, requestID)
p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics
@ -60,7 +60,7 @@ func (p *polls) CancelVote(requestID uint32, vdr ids.ShortID) (ids.Bag, bool) {
return ids.Bag{}, false return ids.Bag{}, false
} }
poll.CancelVote() poll.CancelVote(vdr)
if poll.Finished() { if poll.Finished() {
delete(p.m, requestID) delete(p.m, requestID)
p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics
@ -83,22 +83,18 @@ func (p *polls) String() string {
// poll represents the current state of a network poll for a block // poll represents the current state of a network poll for a block
type poll struct { type poll struct {
alpha int alpha int
votes ids.Bag votes ids.Bag
numPolled int polled ids.ShortSet
} }
// Vote registers a vote for this poll // Vote registers a vote for this poll
func (p *poll) CancelVote() { func (p *poll) CancelVote(vdr ids.ShortID) { p.polled.Remove(vdr) }
if p.numPolled > 0 {
p.numPolled--
}
}
// Vote registers a vote for this poll // Vote registers a vote for this poll
func (p *poll) Vote(vote ids.ID) { func (p *poll) Vote(vote ids.ID, vdr ids.ShortID) {
if p.numPolled > 0 { if p.polled.Contains(vdr) {
p.numPolled-- p.polled.Remove(vdr)
p.votes.Add(vote) p.votes.Add(vote)
} }
} }
@ -106,13 +102,14 @@ func (p *poll) Vote(vote ids.ID) {
// Finished returns true if the poll has completed, with no more required // Finished returns true if the poll has completed, with no more required
// responses // responses
func (p poll) Finished() bool { func (p poll) Finished() bool {
remaining := p.polled.Len()
received := p.votes.Len() received := p.votes.Len()
_, freq := p.votes.Mode() _, freq := p.votes.Mode()
return p.numPolled == 0 || // All k nodes responded return remaining == 0 || // All k nodes responded
freq >= p.alpha || // An alpha majority has returned freq >= p.alpha || // An alpha majority has returned
received+p.numPolled < p.alpha // An alpha majority can never return received+remaining < p.alpha // An alpha majority can never return
} }
func (p poll) String() string { func (p poll) String() string {
return fmt.Sprintf("Waiting on %d chits", p.numPolled) return fmt.Sprintf("Waiting on %d chits from %s", p.polled.Len(), p.polled)
} }

View File

@ -542,18 +542,15 @@ func (t *Transitive) pullSample(blkID ids.ID) {
vdrSet.Add(vdr.ID()) vdrSet.Add(vdr.ID())
} }
if numVdrs := len(vdrs); numVdrs != p.K { toSample := ids.ShortSet{}
t.Config.Context.Log.Error("query for %s was dropped due to an insufficient number of validators", blkID) toSample.Union(vdrSet)
return
}
t.RequestID++ t.RequestID++
if !t.polls.Add(t.RequestID, vdrSet.Len()) { if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet) {
t.Config.Context.Log.Error("query for %s was dropped due to use of a duplicated requestID", blkID) t.Config.Sender.PullQuery(toSample, t.RequestID, blkID)
return } else if numVdrs < p.K {
t.Config.Context.Log.Error("query for %s was dropped due to an insufficient number of validators", blkID)
} }
t.Config.Sender.PullQuery(vdrSet, t.RequestID, blkID)
} }
// send a push request for this block // send a push request for this block
@ -566,20 +563,15 @@ func (t *Transitive) pushSample(blk snowman.Block) {
vdrSet.Add(vdr.ID()) vdrSet.Add(vdr.ID())
} }
blkID := blk.ID() toSample := ids.ShortSet{}
if numVdrs := len(vdrs); numVdrs != p.K { toSample.Union(vdrSet)
t.Config.Context.Log.Error("query for %s was dropped due to an insufficient number of validators", blkID)
return
}
t.RequestID++ t.RequestID++
if !t.polls.Add(t.RequestID, vdrSet.Len()) { if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet) {
t.Config.Context.Log.Error("query for %s was dropped due to use of a duplicated requestID", blkID) t.Config.Sender.PushQuery(toSample, t.RequestID, blk.ID(), blk.Bytes())
return } else if numVdrs < p.K {
t.Config.Context.Log.Error("query for %s was dropped due to an insufficient number of validators", blk.ID())
} }
t.Config.Sender.PushQuery(vdrSet, t.RequestID, blkID, blk.Bytes())
return
} }
func (t *Transitive) deliver(blk snowman.Block) error { func (t *Transitive) deliver(blk snowman.Block) error {

View File

@ -1522,3 +1522,124 @@ func TestEngineAggressivePolling(t *testing.T) {
t.Fatalf("Should have sent an additional pull query") t.Fatalf("Should have sent an additional pull query")
} }
} }
func TestEngineDoubleChit(t *testing.T) {
config := DefaultConfig()
config.Params = snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 2,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
}
vdr0 := validators.GenerateRandomValidator(1)
vdr1 := validators.GenerateRandomValidator(1)
vals := validators.NewSet()
config.Validators = vals
vals.Add(vdr0)
vals.Add(vdr1)
sender := &common.SenderTest{}
sender.T = t
config.Sender = sender
sender.Default(true)
vm := &VMTest{}
vm.T = t
config.VM = vm
vm.Default(true)
vm.CantSetPreference = false
gBlk := &Blk{
id: GenerateID(),
status: choices.Accepted,
}
vm.LastAcceptedF = func() ids.ID { return gBlk.ID() }
sender.CantGetAcceptedFrontier = false
vm.GetBlockF = func(id ids.ID) (snowman.Block, error) {
switch {
case id.Equals(gBlk.ID()):
return gBlk, nil
}
t.Fatalf("Unknown block")
panic("Should have errored")
}
te := &Transitive{}
te.Initialize(config)
te.finishBootstrapping()
vm.LastAcceptedF = nil
sender.CantGetAcceptedFrontier = true
blk := &Blk{
parent: gBlk,
id: GenerateID(),
status: choices.Processing,
bytes: []byte{1},
}
queried := new(bool)
queryRequestID := new(uint32)
sender.PushQueryF = func(inVdrs ids.ShortSet, requestID uint32, blkID ids.ID, blkBytes []byte) {
if *queried {
t.Fatalf("Asked multiple times")
}
*queried = true
*queryRequestID = requestID
vdrSet := ids.ShortSet{}
vdrSet.Add(vdr0.ID(), vdr1.ID())
if !inVdrs.Equals(vdrSet) {
t.Fatalf("Asking wrong validator for preference")
}
if !blk.ID().Equals(blkID) {
t.Fatalf("Asking for wrong block")
}
}
te.insert(blk)
vm.GetBlockF = func(id ids.ID) (snowman.Block, error) {
switch {
case id.Equals(gBlk.ID()):
return gBlk, nil
case id.Equals(blk.ID()):
return blk, nil
}
t.Fatalf("Unknown block")
panic("Should have errored")
}
blkSet := ids.Set{}
blkSet.Add(blk.ID())
if status := blk.Status(); status != choices.Processing {
t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Processing)
}
te.Chits(vdr0.ID(), *queryRequestID, blkSet)
if status := blk.Status(); status != choices.Processing {
t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Processing)
}
te.Chits(vdr0.ID(), *queryRequestID, blkSet)
if status := blk.Status(); status != choices.Processing {
t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Processing)
}
te.Chits(vdr1.ID(), *queryRequestID, blkSet)
if status := blk.Status(); status != choices.Accepted {
t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Accepted)
}
}