diff --git a/snow/consensus/avalanche/topological.go b/snow/consensus/avalanche/topological.go index 3fa7552..f99cff1 100644 --- a/snow/consensus/avalanche/topological.go +++ b/snow/consensus/avalanche/topological.go @@ -231,6 +231,7 @@ func (ta *Topological) pushVotes( kahnNodes map[[32]byte]kahnNode, leaves []ids.ID) ids.Bag { votes := make(ids.UniqueBag) + txConflicts := make(map[[32]byte]ids.Set) for len(leaves) > 0 { newLeavesSize := len(leaves) - 1 @@ -245,6 +246,12 @@ func (ta *Topological) pushVotes( // Give the votes to the consumer txID := tx.ID() votes.UnionSet(txID, kahn.votes) + + // Map txID to set of Conflicts + txKey := txID.Key() + if _, exists := txConflicts[txKey]; !exists { + txConflicts[txKey] = ta.cg.Conflicts(tx) + } } for _, dep := range vtx.Parents() { @@ -265,6 +272,18 @@ func (ta *Topological) pushVotes( } } + // Create bag of votes for conflicting transactions + conflictingVotes := make(ids.UniqueBag) + for txHash, conflicts := range txConflicts { + txID := ids.NewID(txHash) + for conflictTxHash := range conflicts { + conflictTxID := ids.NewID(conflictTxHash) + conflictingVotes.UnionSet(txID, votes.GetSet(conflictTxID)) + } + } + + votes.Difference(&conflictingVotes) + return votes.Bag(ta.params.Alpha) } diff --git a/snow/consensus/avalanche/topological_test.go b/snow/consensus/avalanche/topological_test.go index 05046bc..3ed648a 100644 --- a/snow/consensus/avalanche/topological_test.go +++ b/snow/consensus/avalanche/topological_test.go @@ -104,6 +104,78 @@ func TestAvalancheVoting(t *testing.T) { } } +func TestAvalancheIgnoreInvalidVoting(t *testing.T) { + params := Parameters{ + Parameters: snowball.Parameters{ + Metrics: prometheus.NewRegistry(), + K: 3, + Alpha: 2, + BetaVirtuous: 1, + BetaRogue: 1, + }, + Parents: 2, + BatchSize: 1, + } + + vts := []Vertex{&Vtx{ + id: GenerateID(), + status: choices.Accepted, + }, &Vtx{ + id: GenerateID(), + status: choices.Accepted, + }} + utxos := []ids.ID{GenerateID()} + + ta := Topological{} + ta.Initialize(snow.DefaultContextTest(), params, vts) + + tx0 := &snowstorm.TestTx{ + Identifier: GenerateID(), + Stat: choices.Processing, + } + tx0.Ins.Add(utxos[0]) + + vtx0 := &Vtx{ + dependencies: vts, + id: GenerateID(), + txs: []snowstorm.Tx{tx0}, + height: 1, + status: choices.Processing, + } + + tx1 := &snowstorm.TestTx{ + Identifier: GenerateID(), + Stat: choices.Processing, + } + tx1.Ins.Add(utxos[0]) + + vtx1 := &Vtx{ + dependencies: vts, + id: GenerateID(), + txs: []snowstorm.Tx{tx1}, + height: 1, + status: choices.Processing, + } + + ta.Add(vtx0) + ta.Add(vtx1) + + sm := make(ids.UniqueBag) + + sm.Add(0, vtx0.id) + sm.Add(1, vtx1.id) + + // Add Illegal Vote cast by Response 2 + sm.Add(2, vtx0.id) + sm.Add(2, vtx1.id) + + ta.RecordPoll(sm) + + if ta.Finalized() { + t.Fatalf("An avalanche instance finalized too early") + } +} + func TestAvalancheTransitiveVoting(t *testing.T) { params := Parameters{ Parameters: snowball.Parameters{ diff --git a/snow/consensus/snowball/unary_snowball.go b/snow/consensus/snowball/unary_snowball.go index 26ea35d..44eea46 100644 --- a/snow/consensus/snowball/unary_snowball.go +++ b/snow/consensus/snowball/unary_snowball.go @@ -27,11 +27,13 @@ func (sb *unarySnowball) Extend(beta int, choice int) BinarySnowball { bs := &binarySnowball{ binarySnowflake: binarySnowflake{ binarySlush: binarySlush{preference: choice}, + confidence: sb.confidence, beta: beta, finalized: sb.Finalized(), }, preference: choice, } + bs.numSuccessfulPolls[choice] = sb.numSuccessfulPolls return bs } diff --git a/snow/consensus/snowball/unary_snowball_test.go b/snow/consensus/snowball/unary_snowball_test.go index 3f4efe5..4ddc6c6 100644 --- a/snow/consensus/snowball/unary_snowball_test.go +++ b/snow/consensus/snowball/unary_snowball_test.go @@ -42,11 +42,32 @@ func TestUnarySnowball(t *testing.T) { binarySnowball := sbClone.Extend(beta, 0) + expected := "SB(Preference = 0, NumSuccessfulPolls[0] = 2, NumSuccessfulPolls[1] = 0, SF(Confidence = 1, Finalized = false, SL(Preference = 0)))" + if result := binarySnowball.String(); result != expected { + t.Fatalf("Expected:\n%s\nReturned:\n%s", expected, result) + } + binarySnowball.RecordUnsuccessfulPoll() + for i := 0; i < 3; i++ { + if binarySnowball.Preference() != 0 { + t.Fatalf("Wrong preference") + } else if binarySnowball.Finalized() { + t.Fatalf("Should not have finalized") + } + binarySnowball.RecordSuccessfulPoll(1) + binarySnowball.RecordUnsuccessfulPoll() + } + + if binarySnowball.Preference() != 1 { + t.Fatalf("Wrong preference") + } else if binarySnowball.Finalized() { + t.Fatalf("Should not have finalized") + } binarySnowball.RecordSuccessfulPoll(1) - - if binarySnowball.Finalized() { + if binarySnowball.Preference() != 1 { + t.Fatalf("Wrong preference") + } else if binarySnowball.Finalized() { t.Fatalf("Should not have finalized") } @@ -57,4 +78,9 @@ func TestUnarySnowball(t *testing.T) { } else if !binarySnowball.Finalized() { t.Fatalf("Should have finalized") } + + expected = "SB(NumSuccessfulPolls = 2, SF(Confidence = 1, Finalized = false))" + if str := sb.String(); str != expected { + t.Fatalf("Wrong state. Expected:\n%s\nGot:\n%s", expected, str) + } } diff --git a/snow/engine/avalanche/issuer.go b/snow/engine/avalanche/issuer.go index 4cbc408..1807fa1 100644 --- a/snow/engine/avalanche/issuer.go +++ b/snow/engine/avalanche/issuer.go @@ -78,9 +78,12 @@ func (i *issuer) Update() { vdrSet.Add(vdr.ID()) } + toSample := ids.ShortSet{} // Copy to a new variable because we may remove an element in sender.Sender + toSample.Union(vdrSet) // and we don't want that to affect the set of validators we wait for [ie vdrSet] + i.t.RequestID++ - if numVdrs := len(vdrs); numVdrs == p.K && i.t.polls.Add(i.t.RequestID, vdrSet.Len()) { - i.t.Config.Sender.PushQuery(vdrSet, i.t.RequestID, vtxID, i.vtx.Bytes()) + if numVdrs := len(vdrs); numVdrs == p.K && i.t.polls.Add(i.t.RequestID, vdrSet) { + i.t.Config.Sender.PushQuery(toSample, i.t.RequestID, vtxID, i.vtx.Bytes()) } else if numVdrs < p.K { i.t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", vtxID) } diff --git a/snow/engine/avalanche/polls.go b/snow/engine/avalanche/polls.go index 282fe6a..fa1e7df 100644 --- a/snow/engine/avalanche/polls.go +++ b/snow/engine/avalanche/polls.go @@ -38,10 +38,10 @@ type polls struct { // Add to the current set of polls // Returns true if the poll was registered correctly and the network sample // should be made. -func (p *polls) Add(requestID uint32, numPolled int) bool { +func (p *polls) Add(requestID uint32, vdrs ids.ShortSet) bool { poll, exists := p.m[requestID] if !exists { - poll.numPending = numPolled + poll.polled = vdrs p.m[requestID] = poll p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics @@ -59,7 +59,7 @@ func (p *polls) Vote(requestID uint32, vdr ids.ShortID, votes []ids.ID) (ids.Uni return nil, false } - poll.Vote(votes) + poll.Vote(votes, vdr) if poll.Finished() { p.log.Verbo("Poll is finished") delete(p.m, requestID) @@ -83,19 +83,19 @@ func (p *polls) String() string { // poll represents the current state of a network poll for a vertex type poll struct { - votes ids.UniqueBag - numPending int + votes ids.UniqueBag + polled ids.ShortSet } // Vote registers a vote for this poll -func (p *poll) Vote(votes []ids.ID) { - if p.numPending > 0 { - p.numPending-- - p.votes.Add(uint(p.numPending), votes...) +func (p *poll) Vote(votes []ids.ID, vdr ids.ShortID) { + if p.polled.Contains(vdr) { + p.polled.Remove(vdr) + p.votes.Add(uint(p.polled.Len()), votes...) } } // Finished returns true if the poll has completed, with no more required // responses -func (p poll) Finished() bool { return p.numPending <= 0 } -func (p poll) String() string { return fmt.Sprintf("Waiting on %d chits", p.numPending) } +func (p poll) Finished() bool { return p.polled.Len() == 0 } +func (p poll) String() string { return fmt.Sprintf("Waiting on %d chits", p.polled.Len()) } diff --git a/snow/engine/avalanche/transitive.go b/snow/engine/avalanche/transitive.go index e48b167..71fbbe0 100644 --- a/snow/engine/avalanche/transitive.go +++ b/snow/engine/avalanche/transitive.go @@ -471,8 +471,11 @@ func (t *Transitive) issueRepoll() { vdrSet.Add(vdr.ID()) } + vdrCopy := ids.ShortSet{} + vdrCopy.Union((vdrSet)) + t.RequestID++ - if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet.Len()) { + if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrCopy) { t.Config.Sender.PullQuery(vdrSet, t.RequestID, vtxID) } else if numVdrs < p.K { t.Config.Context.Log.Error("re-query for %s was dropped due to an insufficient number of validators", vtxID) diff --git a/snow/engine/avalanche/transitive_test.go b/snow/engine/avalanche/transitive_test.go index cc078ca..96b904c 100644 --- a/snow/engine/avalanche/transitive_test.go +++ b/snow/engine/avalanche/transitive_test.go @@ -3085,3 +3085,120 @@ func TestEngineDuplicatedIssuance(t *testing.T) { te.Notify(common.PendingTxs) } + +func TestEngineDoubleChit(t *testing.T) { + config := DefaultConfig() + + config.Params.Alpha = 2 + config.Params.K = 2 + + vdr0 := validators.GenerateRandomValidator(1) + vdr1 := validators.GenerateRandomValidator(1) + vals := validators.NewSet() + vals.Add(vdr0) + vals.Add(vdr1) + config.Validators = vals + + sender := &common.SenderTest{} + sender.T = t + config.Sender = sender + + sender.Default(true) + sender.CantGetAcceptedFrontier = false + + st := &stateTest{t: t} + config.State = st + + st.Default(true) + + gVtx := &Vtx{ + id: GenerateID(), + status: choices.Accepted, + } + mVtx := &Vtx{ + id: GenerateID(), + status: choices.Accepted, + } + + vts := []avalanche.Vertex{gVtx, mVtx} + utxos := []ids.ID{GenerateID()} + + tx := &TestTx{ + TestTx: snowstorm.TestTx{ + Identifier: GenerateID(), + Stat: choices.Processing, + }, + } + tx.Ins.Add(utxos[0]) + + vtx := &Vtx{ + parents: vts, + id: GenerateID(), + txs: []snowstorm.Tx{tx}, + height: 1, + status: choices.Processing, + bytes: []byte{1, 1, 2, 3}, + } + + st.edge = func() []ids.ID { return []ids.ID{vts[0].ID(), vts[1].ID()} } + st.getVertex = func(id ids.ID) (avalanche.Vertex, error) { + switch { + case id.Equals(gVtx.ID()): + return gVtx, nil + case id.Equals(mVtx.ID()): + return mVtx, nil + } + t.Fatalf("Unknown vertex") + panic("Should have errored") + } + + te := &Transitive{} + te.Initialize(config) + te.finishBootstrapping() + + reqID := new(uint32) + sender.PushQueryF = func(inVdrs ids.ShortSet, requestID uint32, vtxID ids.ID, _ []byte) { + *reqID = requestID + if inVdrs.Len() != 2 { + t.Fatalf("Wrong number of validators") + } + if !vtxID.Equals(vtx.ID()) { + t.Fatalf("Wrong vertex requested") + } + } + st.getVertex = func(id ids.ID) (avalanche.Vertex, error) { + switch { + case id.Equals(vtx.ID()): + return vtx, nil + } + t.Fatalf("Unknown vertex") + panic("Should have errored") + } + + te.insert(vtx) + + votes := ids.Set{} + votes.Add(vtx.ID()) + + if status := tx.Status(); status != choices.Processing { + t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Processing) + } + + te.Chits(vdr0.ID(), *reqID, votes) + + if status := tx.Status(); status != choices.Processing { + t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Processing) + } + + te.Chits(vdr0.ID(), *reqID, votes) + + if status := tx.Status(); status != choices.Processing { + t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Processing) + } + + te.Chits(vdr1.ID(), *reqID, votes) + + if status := tx.Status(); status != choices.Accepted { + t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Accepted) + } +} diff --git a/snow/engine/snowman/polls.go b/snow/engine/snowman/polls.go index 6e666dc..6765ff7 100644 --- a/snow/engine/snowman/polls.go +++ b/snow/engine/snowman/polls.go @@ -22,11 +22,11 @@ type polls struct { // Add to the current set of polls // Returns true if the poll was registered correctly and the network sample // should be made. -func (p *polls) Add(requestID uint32, numPolled int) bool { +func (p *polls) Add(requestID uint32, vdrs ids.ShortSet) bool { poll, exists := p.m[requestID] if !exists { poll.alpha = p.alpha - poll.numPolled = numPolled + poll.polled = vdrs p.m[requestID] = poll p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics @@ -42,7 +42,7 @@ func (p *polls) Vote(requestID uint32, vdr ids.ShortID, vote ids.ID) (ids.Bag, b if !exists { return ids.Bag{}, false } - poll.Vote(vote) + poll.Vote(vote, vdr) if poll.Finished() { delete(p.m, requestID) p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics @@ -60,7 +60,7 @@ func (p *polls) CancelVote(requestID uint32, vdr ids.ShortID) (ids.Bag, bool) { return ids.Bag{}, false } - poll.CancelVote() + poll.CancelVote(vdr) if poll.Finished() { delete(p.m, requestID) p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics @@ -83,22 +83,18 @@ func (p *polls) String() string { // poll represents the current state of a network poll for a block type poll struct { - alpha int - votes ids.Bag - numPolled int + alpha int + votes ids.Bag + polled ids.ShortSet } // Vote registers a vote for this poll -func (p *poll) CancelVote() { - if p.numPolled > 0 { - p.numPolled-- - } -} +func (p *poll) CancelVote(vdr ids.ShortID) { p.polled.Remove(vdr) } // Vote registers a vote for this poll -func (p *poll) Vote(vote ids.ID) { - if p.numPolled > 0 { - p.numPolled-- +func (p *poll) Vote(vote ids.ID, vdr ids.ShortID) { + if p.polled.Contains(vdr) { + p.polled.Remove(vdr) p.votes.Add(vote) } } @@ -106,13 +102,14 @@ func (p *poll) Vote(vote ids.ID) { // Finished returns true if the poll has completed, with no more required // responses func (p poll) Finished() bool { + remaining := p.polled.Len() received := p.votes.Len() _, freq := p.votes.Mode() - return p.numPolled == 0 || // All k nodes responded + return remaining == 0 || // All k nodes responded freq >= p.alpha || // An alpha majority has returned - received+p.numPolled < p.alpha // An alpha majority can never return + received+remaining < p.alpha // An alpha majority can never return } func (p poll) String() string { - return fmt.Sprintf("Waiting on %d chits", p.numPolled) + return fmt.Sprintf("Waiting on %d chits from %s", p.polled.Len(), p.polled) } diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index f155f5e..2897262 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -542,18 +542,15 @@ func (t *Transitive) pullSample(blkID ids.ID) { vdrSet.Add(vdr.ID()) } - if numVdrs := len(vdrs); numVdrs != p.K { - t.Config.Context.Log.Error("query for %s was dropped due to an insufficient number of validators", blkID) - return - } + toSample := ids.ShortSet{} + toSample.Union(vdrSet) t.RequestID++ - if !t.polls.Add(t.RequestID, vdrSet.Len()) { - t.Config.Context.Log.Error("query for %s was dropped due to use of a duplicated requestID", blkID) - return + if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet) { + t.Config.Sender.PullQuery(toSample, t.RequestID, blkID) + } else if numVdrs < p.K { + t.Config.Context.Log.Error("query for %s was dropped due to an insufficient number of validators", blkID) } - - t.Config.Sender.PullQuery(vdrSet, t.RequestID, blkID) } // send a push request for this block @@ -566,20 +563,15 @@ func (t *Transitive) pushSample(blk snowman.Block) { vdrSet.Add(vdr.ID()) } - blkID := blk.ID() - if numVdrs := len(vdrs); numVdrs != p.K { - t.Config.Context.Log.Error("query for %s was dropped due to an insufficient number of validators", blkID) - return - } + toSample := ids.ShortSet{} + toSample.Union(vdrSet) t.RequestID++ - if !t.polls.Add(t.RequestID, vdrSet.Len()) { - t.Config.Context.Log.Error("query for %s was dropped due to use of a duplicated requestID", blkID) - return + if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet) { + t.Config.Sender.PushQuery(toSample, t.RequestID, blk.ID(), blk.Bytes()) + } else if numVdrs < p.K { + t.Config.Context.Log.Error("query for %s was dropped due to an insufficient number of validators", blk.ID()) } - - t.Config.Sender.PushQuery(vdrSet, t.RequestID, blkID, blk.Bytes()) - return } func (t *Transitive) deliver(blk snowman.Block) error { diff --git a/snow/engine/snowman/transitive_test.go b/snow/engine/snowman/transitive_test.go index a3db1d6..8c5f9d1 100644 --- a/snow/engine/snowman/transitive_test.go +++ b/snow/engine/snowman/transitive_test.go @@ -1522,3 +1522,124 @@ func TestEngineAggressivePolling(t *testing.T) { t.Fatalf("Should have sent an additional pull query") } } + +func TestEngineDoubleChit(t *testing.T) { + config := DefaultConfig() + + config.Params = snowball.Parameters{ + Metrics: prometheus.NewRegistry(), + K: 2, + Alpha: 2, + BetaVirtuous: 1, + BetaRogue: 2, + } + + vdr0 := validators.GenerateRandomValidator(1) + vdr1 := validators.GenerateRandomValidator(1) + + vals := validators.NewSet() + config.Validators = vals + + vals.Add(vdr0) + vals.Add(vdr1) + + sender := &common.SenderTest{} + sender.T = t + config.Sender = sender + + sender.Default(true) + + vm := &VMTest{} + vm.T = t + config.VM = vm + + vm.Default(true) + vm.CantSetPreference = false + + gBlk := &Blk{ + id: GenerateID(), + status: choices.Accepted, + } + + vm.LastAcceptedF = func() ids.ID { return gBlk.ID() } + sender.CantGetAcceptedFrontier = false + + vm.GetBlockF = func(id ids.ID) (snowman.Block, error) { + switch { + case id.Equals(gBlk.ID()): + return gBlk, nil + } + t.Fatalf("Unknown block") + panic("Should have errored") + } + + te := &Transitive{} + te.Initialize(config) + te.finishBootstrapping() + + vm.LastAcceptedF = nil + sender.CantGetAcceptedFrontier = true + + blk := &Blk{ + parent: gBlk, + id: GenerateID(), + status: choices.Processing, + bytes: []byte{1}, + } + + queried := new(bool) + queryRequestID := new(uint32) + sender.PushQueryF = func(inVdrs ids.ShortSet, requestID uint32, blkID ids.ID, blkBytes []byte) { + if *queried { + t.Fatalf("Asked multiple times") + } + *queried = true + *queryRequestID = requestID + vdrSet := ids.ShortSet{} + vdrSet.Add(vdr0.ID(), vdr1.ID()) + if !inVdrs.Equals(vdrSet) { + t.Fatalf("Asking wrong validator for preference") + } + if !blk.ID().Equals(blkID) { + t.Fatalf("Asking for wrong block") + } + } + + te.insert(blk) + + vm.GetBlockF = func(id ids.ID) (snowman.Block, error) { + switch { + case id.Equals(gBlk.ID()): + return gBlk, nil + case id.Equals(blk.ID()): + return blk, nil + } + t.Fatalf("Unknown block") + panic("Should have errored") + } + + blkSet := ids.Set{} + blkSet.Add(blk.ID()) + + if status := blk.Status(); status != choices.Processing { + t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Processing) + } + + te.Chits(vdr0.ID(), *queryRequestID, blkSet) + + if status := blk.Status(); status != choices.Processing { + t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Processing) + } + + te.Chits(vdr0.ID(), *queryRequestID, blkSet) + + if status := blk.Status(); status != choices.Processing { + t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Processing) + } + + te.Chits(vdr1.ID(), *queryRequestID, blkSet) + + if status := blk.Status(); status != choices.Accepted { + t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Accepted) + } +}