diff --git a/snow/engine/avalanche/metrics.go b/snow/engine/avalanche/metrics.go index 021fe38..2cd3671 100644 --- a/snow/engine/avalanche/metrics.go +++ b/snow/engine/avalanche/metrics.go @@ -14,7 +14,7 @@ type metrics struct { numBSVtx, numBSDroppedVtx, numBSTx, numBSDroppedTx prometheus.Counter - numPolls, numVtxRequests, numTxRequests, numPendingVtx prometheus.Gauge + numVtxRequests, numTxRequests, numPendingVtx prometheus.Gauge } // Initialize implements the Engine interface @@ -61,12 +61,6 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr Name: "av_bs_dropped_txs", Help: "Number of dropped txs", }) - m.numPolls = prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Name: "av_polls", - Help: "Number of pending network polls", - }) m.numVtxRequests = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: namespace, @@ -107,9 +101,6 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr if err := registerer.Register(m.numBSDroppedTx); err != nil { log.Error("Failed to register av_bs_dropped_txs statistics due to %s", err) } - if err := registerer.Register(m.numPolls); err != nil { - log.Error("Failed to register av_polls statistics due to %s", err) - } if err := registerer.Register(m.numVtxRequests); err != nil { log.Error("Failed to register av_vtx_requests statistics due to %s", err) } diff --git a/snow/engine/avalanche/poll/early_term_no_traversal.go b/snow/engine/avalanche/poll/early_term_no_traversal.go new file mode 100644 index 0000000..52fdae3 --- /dev/null +++ b/snow/engine/avalanche/poll/early_term_no_traversal.go @@ -0,0 +1,85 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package poll + +import ( + "fmt" + + "github.com/ava-labs/gecko/ids" +) + +type earlyTermNoTraversalFactory struct { + alpha int +} + +// NewEarlyTermNoTraversalFactory returns a factory that returns polls with +// early termination, without doing DAG traversals +func NewEarlyTermNoTraversalFactory(alpha int) Factory { + return &earlyTermNoTraversalFactory{alpha: alpha} +} + +func (f *earlyTermNoTraversalFactory) New(vdrs ids.ShortSet) Poll { + return &earlyTermNoTraversalPoll{ + polled: vdrs, + alpha: f.alpha, + } +} + +// earlyTermNoTraversalPoll finishes when any remaining validators can't change +// the result of the poll. However, does not terminate tightly with this bound. +// It terminates as quickly as it can without performing any DAG traversals. +type earlyTermNoTraversalPoll struct { + votes ids.UniqueBag + polled ids.ShortSet + alpha int +} + +// Vote registers a response for this poll +func (p *earlyTermNoTraversalPoll) Vote(vdr ids.ShortID, votes []ids.ID) { + if !p.polled.Contains(vdr) { + // if the validator wasn't polled or already responded to this poll, we + // should just drop the vote + return + } + + // make sure that a validator can't respond multiple times + p.polled.Remove(vdr) + + // track the votes the validator responded with + p.votes.Add(uint(p.polled.Len()), votes...) +} + +// Finished returns true when all validators have voted +func (p *earlyTermNoTraversalPoll) Finished() bool { + // If there are no outstanding queries, the poll is finished + numPending := p.polled.Len() + if numPending == 0 { + return true + } + // If there are still enough pending responses to include another vertex, + // then the poll must wait for more responses + if numPending > p.alpha { + return false + } + + // Ignore any vertex that has already received alpha votes. To safely skip + // DAG traversal, assume that all votes for vertices with less than alpha + // votes will be applied to a single shared ancestor. In this case, the poll + // can terminate early, iff there are not enough pending votes for this + // ancestor to receive alpha votes. + partialVotes := ids.BitSet(0) + for _, vote := range p.votes.List() { + if voters := p.votes.GetSet(vote); voters.Len() < p.alpha { + partialVotes.Union(voters) + } + } + return partialVotes.Len()+numPending < p.alpha +} + +// Result returns the result of this poll +func (p *earlyTermNoTraversalPoll) Result() ids.UniqueBag { return p.votes } + +func (p *earlyTermNoTraversalPoll) String() string { + return fmt.Sprintf("waiting on %s", p.polled) +} diff --git a/snow/engine/avalanche/poll/early_term_no_traversal_test.go b/snow/engine/avalanche/poll/early_term_no_traversal_test.go new file mode 100644 index 0000000..ba2d81a --- /dev/null +++ b/snow/engine/avalanche/poll/early_term_no_traversal_test.go @@ -0,0 +1,207 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package poll + +import ( + "testing" + + "github.com/ava-labs/gecko/ids" +) + +func TestEarlyTermNoTraversalResults(t *testing.T) { + alpha := 1 + + vtxID := ids.NewID([32]byte{1}) + votes := []ids.ID{vtxID} + + vdr1 := ids.NewShortID([20]byte{1}) // k = 1 + + vdrs := ids.ShortSet{} + vdrs.Add(vdr1) + + factory := NewEarlyTermNoTraversalFactory(alpha) + poll := factory.New(vdrs) + + poll.Vote(vdr1, votes) + if !poll.Finished() { + t.Fatalf("Poll did not terminate after receiving k votes") + } + + result := poll.Result() + if list := result.List(); len(list) != 1 { + t.Fatalf("Wrong number of vertices returned") + } else if retVtxID := list[0]; !retVtxID.Equals(vtxID) { + t.Fatalf("Wrong vertex returned") + } else if set := result.GetSet(vtxID); set.Len() != 1 { + t.Fatalf("Wrong number of votes returned") + } +} + +func TestEarlyTermNoTraversalString(t *testing.T) { + alpha := 2 + + vtxID := ids.NewID([32]byte{1}) + votes := []ids.ID{vtxID} + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) // k = 2 + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + ) + + factory := NewEarlyTermNoTraversalFactory(alpha) + poll := factory.New(vdrs) + + poll.Vote(vdr1, votes) + + expected := "waiting on {BaMPFdqMUQ46BV8iRcwbVfsam55kMqcp}" + if result := poll.String(); expected != result { + t.Fatalf("Poll should have returned %s but returned %s", expected, result) + } +} + +func TestEarlyTermNoTraversalDropsDuplicatedVotes(t *testing.T) { + alpha := 2 + + vtxID := ids.NewID([32]byte{1}) + votes := []ids.ID{vtxID} + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) // k = 2 + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + ) + + factory := NewEarlyTermNoTraversalFactory(alpha) + poll := factory.New(vdrs) + + poll.Vote(vdr1, votes) + if poll.Finished() { + t.Fatalf("Poll finished after less than alpha votes") + } + poll.Vote(vdr1, votes) + if poll.Finished() { + t.Fatalf("Poll finished after getting a duplicated vote") + } + poll.Vote(vdr2, votes) + if !poll.Finished() { + t.Fatalf("Poll did not terminate after receiving k votes") + } +} + +func TestEarlyTermNoTraversalTerminatesEarly(t *testing.T) { + alpha := 3 + + vtxID := ids.NewID([32]byte{1}) + votes := []ids.ID{vtxID} + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) + vdr3 := ids.NewShortID([20]byte{3}) + vdr4 := ids.NewShortID([20]byte{4}) + vdr5 := ids.NewShortID([20]byte{5}) // k = 5 + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + vdr3, + vdr4, + vdr5, + ) + + factory := NewEarlyTermNoTraversalFactory(alpha) + poll := factory.New(vdrs) + + poll.Vote(vdr1, votes) + if poll.Finished() { + t.Fatalf("Poll finished after less than alpha votes") + } + poll.Vote(vdr2, votes) + if poll.Finished() { + t.Fatalf("Poll finished after less than alpha votes") + } + poll.Vote(vdr3, votes) + if !poll.Finished() { + t.Fatalf("Poll did not terminate early after receiving alpha votes for one vertex and none for other vertices") + } +} + +func TestEarlyTermNoTraversalForSharedAncestor(t *testing.T) { + alpha := 4 + + vtxA := ids.NewID([32]byte{1}) + vtxB := ids.NewID([32]byte{2}) + vtxC := ids.NewID([32]byte{3}) + vtxD := ids.NewID([32]byte{4}) + + // If validators 1-3 vote for frontier vertices + // B, C, and D respectively, which all share the common ancestor + // A, then we cannot terminate early with alpha = k = 4 + // If the final vote is cast for any of A, B, C, or D, then + // vertex A will have transitively received alpha = 4 votes + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) + vdr3 := ids.NewShortID([20]byte{3}) + vdr4 := ids.NewShortID([20]byte{4}) + + vdrs := ids.ShortSet{} + vdrs.Add(vdr1) + vdrs.Add(vdr2) + vdrs.Add(vdr3) + vdrs.Add(vdr4) + + factory := NewEarlyTermNoTraversalFactory(alpha) + poll := factory.New(vdrs) + + poll.Vote(vdr1, []ids.ID{vtxB}) + if poll.Finished() { + t.Fatalf("Poll finished early after receiving one vote") + } + poll.Vote(vdr2, []ids.ID{vtxC}) + if poll.Finished() { + t.Fatalf("Poll finished early after receiving two votes") + } + poll.Vote(vdr3, []ids.ID{vtxD}) + if poll.Finished() { + t.Fatalf("Poll terminated early, when a shared ancestor could have received alpha votes") + } + poll.Vote(vdr4, []ids.ID{vtxA}) + if !poll.Finished() { + t.Fatalf("Poll did not terminate after receiving all outstanding votes") + } +} + +func TestEarlyTermNoTraversalWithFastDrops(t *testing.T) { + alpha := 2 + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) + vdr3 := ids.NewShortID([20]byte{3}) // k = 3 + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + vdr3, + ) + + factory := NewEarlyTermNoTraversalFactory(alpha) + poll := factory.New(vdrs) + + poll.Vote(vdr1, nil) + if poll.Finished() { + t.Fatalf("Poll finished early after dropping one vote") + } + poll.Vote(vdr2, nil) + if !poll.Finished() { + t.Fatalf("Poll did not terminate after dropping two votes") + } +} diff --git a/snow/engine/avalanche/poll/interfaces.go b/snow/engine/avalanche/poll/interfaces.go new file mode 100644 index 0000000..05234a3 --- /dev/null +++ b/snow/engine/avalanche/poll/interfaces.go @@ -0,0 +1,33 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package poll + +import ( + "fmt" + + "github.com/ava-labs/gecko/ids" +) + +// Set is a collection of polls +type Set interface { + fmt.Stringer + + Add(requestID uint32, vdrs ids.ShortSet) bool + Vote(requestID uint32, vdr ids.ShortID, votes []ids.ID) (ids.UniqueBag, bool) + Len() int +} + +// Poll is an outstanding poll +type Poll interface { + fmt.Stringer + + Vote(vdr ids.ShortID, votes []ids.ID) + Finished() bool + Result() ids.UniqueBag +} + +// Factory creates a new Poll +type Factory interface { + New(vdrs ids.ShortSet) Poll +} diff --git a/snow/engine/avalanche/poll/no_early_term.go b/snow/engine/avalanche/poll/no_early_term.go new file mode 100644 index 0000000..9a06649 --- /dev/null +++ b/snow/engine/avalanche/poll/no_early_term.go @@ -0,0 +1,52 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package poll + +import ( + "fmt" + + "github.com/ava-labs/gecko/ids" +) + +type noEarlyTermFactory struct{} + +// NewNoEarlyTermFactory returns a factory that returns polls with no early +// termination +func NewNoEarlyTermFactory() Factory { return noEarlyTermFactory{} } + +func (noEarlyTermFactory) New(vdrs ids.ShortSet) Poll { + return &noEarlyTermPoll{polled: vdrs} +} + +// noEarlyTermPoll finishes when all polled validators either respond to the +// query or a timeout occurs +type noEarlyTermPoll struct { + votes ids.UniqueBag + polled ids.ShortSet +} + +// Vote registers a response for this poll +func (p *noEarlyTermPoll) Vote(vdr ids.ShortID, votes []ids.ID) { + if !p.polled.Contains(vdr) { + // if the validator wasn't polled or already responded to this poll, we + // should just drop the vote + return + } + + // make sure that a validator can't respond multiple times + p.polled.Remove(vdr) + + // track the votes the validator responded with + p.votes.Add(uint(p.polled.Len()), votes...) +} + +// Finished returns true when all validators have voted +func (p *noEarlyTermPoll) Finished() bool { return p.polled.Len() == 0 } + +// Result returns the result of this poll +func (p *noEarlyTermPoll) Result() ids.UniqueBag { return p.votes } + +func (p *noEarlyTermPoll) String() string { + return fmt.Sprintf("waiting on %s", p.polled) +} diff --git a/snow/engine/avalanche/poll/no_early_term_test.go b/snow/engine/avalanche/poll/no_early_term_test.go new file mode 100644 index 0000000..f877416 --- /dev/null +++ b/snow/engine/avalanche/poll/no_early_term_test.go @@ -0,0 +1,91 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package poll + +import ( + "testing" + + "github.com/ava-labs/gecko/ids" +) + +func TestNoEarlyTermResults(t *testing.T) { + vtxID := ids.NewID([32]byte{1}) + votes := []ids.ID{vtxID} + + vdr1 := ids.NewShortID([20]byte{1}) // k = 1 + + vdrs := ids.ShortSet{} + vdrs.Add(vdr1) + + factory := NewNoEarlyTermFactory() + poll := factory.New(vdrs) + + poll.Vote(vdr1, votes) + if !poll.Finished() { + t.Fatalf("Poll did not terminate after receiving k votes") + } + + result := poll.Result() + if list := result.List(); len(list) != 1 { + t.Fatalf("Wrong number of vertices returned") + } else if retVtxID := list[0]; !retVtxID.Equals(vtxID) { + t.Fatalf("Wrong vertex returned") + } else if set := result.GetSet(vtxID); set.Len() != 1 { + t.Fatalf("Wrong number of votes returned") + } +} + +func TestNoEarlyTermString(t *testing.T) { + vtxID := ids.NewID([32]byte{1}) + votes := []ids.ID{vtxID} + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) // k = 2 + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + ) + + factory := NewNoEarlyTermFactory() + poll := factory.New(vdrs) + + poll.Vote(vdr1, votes) + + expected := "waiting on {BaMPFdqMUQ46BV8iRcwbVfsam55kMqcp}" + if result := poll.String(); expected != result { + t.Fatalf("Poll should have returned %s but returned %s", expected, result) + } +} + +func TestNoEarlyTermDropsDuplicatedVotes(t *testing.T) { + vtxID := ids.NewID([32]byte{1}) + votes := []ids.ID{vtxID} + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) // k = 2 + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + ) + + factory := NewNoEarlyTermFactory() + poll := factory.New(vdrs) + + poll.Vote(vdr1, votes) + if poll.Finished() { + t.Fatalf("Poll finished after less than alpha votes") + } + poll.Vote(vdr1, votes) + if poll.Finished() { + t.Fatalf("Poll finished after getting a duplicated vote") + } + poll.Vote(vdr2, votes) + if !poll.Finished() { + t.Fatalf("Poll did not terminate after receiving k votes") + } +} diff --git a/snow/engine/avalanche/poll/set.go b/snow/engine/avalanche/poll/set.go new file mode 100644 index 0000000..24c93fb --- /dev/null +++ b/snow/engine/avalanche/poll/set.go @@ -0,0 +1,130 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package poll + +import ( + "fmt" + "strings" + "time" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/ava-labs/gecko/ids" + "github.com/ava-labs/gecko/utils/logging" + "github.com/ava-labs/gecko/utils/timer" +) + +type poll struct { + Poll + start time.Time +} + +type set struct { + log logging.Logger + numPolls prometheus.Gauge + durPolls prometheus.Histogram + factory Factory + polls map[uint32]poll +} + +// NewSet returns a new empty set of polls +func NewSet( + factory Factory, + log logging.Logger, + namespace string, + registerer prometheus.Registerer, +) Set { + numPolls := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "polls", + Help: "Number of pending network polls", + }) + if err := registerer.Register(numPolls); err != nil { + log.Error("failed to register polls statistics due to %s", err) + } + + durPolls := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Name: "poll_duration", + Help: "Length of time the poll existed in milliseconds", + Buckets: timer.MillisecondsBuckets, + }) + if err := registerer.Register(durPolls); err != nil { + log.Error("failed to register poll_duration statistics due to %s", err) + } + + return &set{ + log: log, + numPolls: numPolls, + durPolls: durPolls, + factory: factory, + polls: make(map[uint32]poll), + } +} + +// Add to the current set of polls +// Returns true if the poll was registered correctly and the network sample +// should be made. +func (s *set) Add(requestID uint32, vdrs ids.ShortSet) bool { + if _, exists := s.polls[requestID]; exists { + s.log.Debug("dropping poll due to duplicated requestID: %d", requestID) + return false + } + + s.log.Verbo("creating poll with requestID %d and validators %s", + requestID, + vdrs) + + s.polls[requestID] = poll{ + Poll: s.factory.New(vdrs), // create the new poll + start: time.Now(), + } + s.numPolls.Inc() // increase the metrics + return true +} + +// Vote registers the connections response to a query for [id]. If there was no +// query, or the response has already be registered, nothing is performed. +func (s *set) Vote( + requestID uint32, + vdr ids.ShortID, + votes []ids.ID, +) (ids.UniqueBag, bool) { + poll, exists := s.polls[requestID] + if !exists { + s.log.Verbo("dropping vote from %s to an unknown poll with requestID: %d", + vdr, + requestID) + return nil, false + } + + s.log.Verbo("processing vote from %s in the poll with requestID: %d with the votes %v", + vdr, + requestID, + votes) + + poll.Vote(vdr, votes) + if !poll.Finished() { + return nil, false + } + + s.log.Verbo("poll with requestID %d finished as %s", requestID, poll) + + delete(s.polls, requestID) // remove the poll from the current set + s.durPolls.Observe(float64(time.Now().Sub(poll.start).Milliseconds())) + s.numPolls.Dec() // decrease the metrics + return poll.Result(), true +} + +// Len returns the number of outstanding polls +func (s *set) Len() int { return len(s.polls) } + +func (s *set) String() string { + sb := strings.Builder{} + sb.WriteString(fmt.Sprintf("current polls: (Size = %d)", len(s.polls))) + for requestID, poll := range s.polls { + sb.WriteString(fmt.Sprintf("\n %d: %s", requestID, poll)) + } + return sb.String() +} diff --git a/snow/engine/avalanche/poll/set_test.go b/snow/engine/avalanche/poll/set_test.go new file mode 100644 index 0000000..0f0e38e --- /dev/null +++ b/snow/engine/avalanche/poll/set_test.go @@ -0,0 +1,97 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package poll + +import ( + "testing" + + "github.com/ava-labs/gecko/ids" + "github.com/ava-labs/gecko/utils/logging" + "github.com/prometheus/client_golang/prometheus" +) + +func TestNewSetErrorOnMetrics(t *testing.T) { + factory := NewNoEarlyTermFactory() + log := logging.NoLog{} + namespace := "" + registerer := prometheus.NewRegistry() + + registerer.Register(prometheus.NewCounter(prometheus.CounterOpts{ + Name: "polls", + })) + registerer.Register(prometheus.NewCounter(prometheus.CounterOpts{ + Name: "poll_duration", + })) + + _ = NewSet(factory, log, namespace, registerer) +} + +func TestCreateAndFinishPoll(t *testing.T) { + factory := NewNoEarlyTermFactory() + log := logging.NoLog{} + namespace := "" + registerer := prometheus.NewRegistry() + s := NewSet(factory, log, namespace, registerer) + + vtxID := ids.NewID([32]byte{1}) + votes := []ids.ID{vtxID} + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) // k = 2 + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + ) + + if s.Len() != 0 { + t.Fatalf("Shouldn't have any active polls yet") + } else if !s.Add(0, vdrs) { + t.Fatalf("Should have been able to add a new poll") + } else if s.Len() != 1 { + t.Fatalf("Should only have one active poll") + } else if s.Add(0, vdrs) { + t.Fatalf("Shouldn't have been able to add a duplicated poll") + } else if s.Len() != 1 { + t.Fatalf("Should only have one active poll") + } else if _, finished := s.Vote(1, vdr1, votes); finished { + t.Fatalf("Shouldn't have been able to finish a non-existant poll") + } else if _, finished := s.Vote(0, vdr1, votes); finished { + t.Fatalf("Shouldn't have been able to finish an ongoing poll") + } else if _, finished := s.Vote(0, vdr1, votes); finished { + t.Fatalf("Should have dropped a duplicated poll") + } else if result, finished := s.Vote(0, vdr2, votes); !finished { + t.Fatalf("Should have finished the") + } else if list := result.List(); len(list) != 1 { + t.Fatalf("Wrong number of vertices returned") + } else if retVtxID := list[0]; !retVtxID.Equals(vtxID) { + t.Fatalf("Wrong vertex returned") + } else if set := result.GetSet(vtxID); set.Len() != 2 { + t.Fatalf("Wrong number of votes returned") + } +} + +func TestSetString(t *testing.T) { + factory := NewNoEarlyTermFactory() + log := logging.NoLog{} + namespace := "" + registerer := prometheus.NewRegistry() + s := NewSet(factory, log, namespace, registerer) + + vdr1 := ids.NewShortID([20]byte{1}) // k = 1 + + vdrs := ids.ShortSet{} + vdrs.Add(vdr1) + + expected := "current polls: (Size = 1)\n" + + " 0: waiting on {6HgC8KRBEhXYbF4riJyJFLSHt37UNuRt}" + if !s.Add(0, vdrs) { + t.Fatalf("Should have been able to add a new poll") + } else if str := s.String(); expected != str { + t.Fatalf("Set return wrong string, Expected:\n%s\nReturned:\n%s", + expected, + str) + } +} diff --git a/snow/engine/avalanche/polls.go b/snow/engine/avalanche/polls.go deleted file mode 100644 index ac3fe5c..0000000 --- a/snow/engine/avalanche/polls.go +++ /dev/null @@ -1,137 +0,0 @@ -// (c) 2019-2020, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package avalanche - -import ( - "fmt" - "strings" - - "github.com/prometheus/client_golang/prometheus" - - "github.com/ava-labs/gecko/ids" - "github.com/ava-labs/gecko/utils/logging" -) - -// TODO: There is a conservative early termination case that doesn't require dag -// traversals we may want to implement. The algorithm would go as follows: -// Keep track of the number of response that reference an ID. If an ID gets >= -// alpha responses, then remove it from all responses and place it into a chit -// list. Remove all empty responses. If the number of responses + the number of -// pending responses is less than alpha, terminate the poll. -// In the synchronous + virtuous case, when everyone returns the same hash, the -// poll now terminates after receiving alpha responses. -// In the rogue case, it is possible that the poll doesn't terminate as quickly -// as possible, because IDs may have the alpha threshold but only when counting -// transitive votes. In this case, we may wait even if it is no longer possible -// for another ID to earn alpha votes. -// Because alpha is typically set close to k, this may not be performance -// critical. However, early termination may be performance critical with crashed -// nodes. - -type polls struct { - log logging.Logger - numPolls prometheus.Gauge - alpha int - m map[uint32]poll -} - -func newPolls(alpha int, log logging.Logger, numPolls prometheus.Gauge) polls { - return polls{ - log: log, - numPolls: numPolls, - alpha: alpha, - m: make(map[uint32]poll), - } -} - -// Add to the current set of polls -// Returns true if the poll was registered correctly and the network sample -// should be made. -func (p *polls) Add(requestID uint32, vdrs ids.ShortSet) bool { - poll, exists := p.m[requestID] - if !exists { - poll.polled = vdrs - poll.alpha = p.alpha - p.m[requestID] = poll - - p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics - } - return !exists -} - -// Vote registers the connections response to a query for [id]. If there was no -// query, or the response has already be registered, nothing is performed. -func (p *polls) Vote(requestID uint32, vdr ids.ShortID, votes []ids.ID) (ids.UniqueBag, bool) { - p.log.Verbo("Vote. requestID: %d. validatorID: %s.", requestID, vdr) - poll, exists := p.m[requestID] - p.log.Verbo("Poll: %+v", poll) - if !exists { - return nil, false - } - - poll.Vote(votes, vdr) - if poll.Finished() { - p.log.Verbo("Poll is finished") - delete(p.m, requestID) - p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics - return poll.votes, true - } - p.m[requestID] = poll - return nil, false -} - -func (p *polls) String() string { - sb := strings.Builder{} - - sb.WriteString(fmt.Sprintf("Current polls: (Size = %d)", len(p.m))) - for requestID, poll := range p.m { - sb.WriteString(fmt.Sprintf("\n %d: %s", requestID, poll)) - } - - return sb.String() -} - -// poll represents the current state of a network poll for a vertex -type poll struct { - votes ids.UniqueBag - polled ids.ShortSet - alpha int -} - -// Vote registers a vote for this poll -func (p *poll) Vote(votes []ids.ID, vdr ids.ShortID) { - if p.polled.Contains(vdr) { - p.polled.Remove(vdr) - p.votes.Add(uint(p.polled.Len()), votes...) - } -} - -// Finished returns true if the poll has completed, with no more required -// responses -func (p poll) Finished() bool { - // If there are no outstanding queries, the poll is finished - numPending := p.polled.Len() - if numPending == 0 { - return true - } - // If there are still enough pending responses to include another vertex, - // then the poll must wait for more responses - if numPending > p.alpha { - return false - } - - // Ignore any vertex that has already received alpha votes. To safely skip - // DAG traversal, assume that all votes for vertices with less than alpha - // votes will be applied to a single shared ancestor. In this case, the poll - // can terminate early, iff there are not enough pending votes for this - // ancestor to receive alpha votes. - partialVotes := ids.BitSet(0) - for _, vote := range p.votes.List() { - if voters := p.votes.GetSet(vote); voters.Len() < p.alpha { - partialVotes.Union(voters) - } - } - return partialVotes.Len()+numPending < p.alpha -} -func (p poll) String() string { return fmt.Sprintf("Waiting on %d chits", p.polled.Len()) } diff --git a/snow/engine/avalanche/polls_test.go b/snow/engine/avalanche/polls_test.go deleted file mode 100644 index cbb1ea4..0000000 --- a/snow/engine/avalanche/polls_test.go +++ /dev/null @@ -1,99 +0,0 @@ -package avalanche - -import ( - "testing" - - "github.com/ava-labs/gecko/ids" -) - -func TestPollTerminatesEarlyVirtuousCase(t *testing.T) { - alpha := 3 - - vtxID := GenerateID() - votes := []ids.ID{vtxID} - - vdr1 := ids.NewShortID([20]byte{1}) - vdr2 := ids.NewShortID([20]byte{2}) - vdr3 := ids.NewShortID([20]byte{3}) - vdr4 := ids.NewShortID([20]byte{4}) - vdr5 := ids.NewShortID([20]byte{5}) // k = 5 - - vdrs := ids.ShortSet{} - vdrs.Add(vdr1) - vdrs.Add(vdr2) - vdrs.Add(vdr3) - vdrs.Add(vdr4) - vdrs.Add(vdr5) - - poll := poll{ - votes: make(ids.UniqueBag), - polled: vdrs, - alpha: alpha, - } - - poll.Vote(votes, vdr1) - if poll.Finished() { - t.Fatalf("Poll finished after less than alpha votes") - } - poll.Vote(votes, vdr2) - if poll.Finished() { - t.Fatalf("Poll finished after less than alpha votes") - } - poll.Vote(votes, vdr3) - if !poll.Finished() { - t.Fatalf("Poll did not terminate early after receiving alpha votes for one vertex and none for other vertices") - } -} - -func TestPollAccountsForSharedAncestor(t *testing.T) { - alpha := 4 - - vtxA := GenerateID() - vtxB := GenerateID() - vtxC := GenerateID() - vtxD := GenerateID() - - // If validators 1-3 vote for frontier vertices - // B, C, and D respectively, which all share the common ancestor - // A, then we cannot terminate early with alpha = k = 4 - // If the final vote is cast for any of A, B, C, or D, then - // vertex A will have transitively received alpha = 4 votes - vdr1 := ids.NewShortID([20]byte{1}) - vdr2 := ids.NewShortID([20]byte{2}) - vdr3 := ids.NewShortID([20]byte{3}) - vdr4 := ids.NewShortID([20]byte{4}) - - vdrs := ids.ShortSet{} - vdrs.Add(vdr1) - vdrs.Add(vdr2) - vdrs.Add(vdr3) - vdrs.Add(vdr4) - - poll := poll{ - votes: make(ids.UniqueBag), - polled: vdrs, - alpha: alpha, - } - - votes1 := []ids.ID{vtxB} - poll.Vote(votes1, vdr1) - if poll.Finished() { - t.Fatalf("Poll finished early after receiving one vote") - } - votes2 := []ids.ID{vtxC} - poll.Vote(votes2, vdr2) - if poll.Finished() { - t.Fatalf("Poll finished early after receiving two votes") - } - votes3 := []ids.ID{vtxD} - poll.Vote(votes3, vdr3) - if poll.Finished() { - t.Fatalf("Poll terminated early, when a shared ancestor could have received alpha votes") - } - - votes4 := []ids.ID{vtxA} - poll.Vote(votes4, vdr4) - if !poll.Finished() { - t.Fatalf("Poll did not terminate after receiving all outstanding votes") - } -} diff --git a/snow/engine/avalanche/transitive.go b/snow/engine/avalanche/transitive.go index 565267b..7412276 100644 --- a/snow/engine/avalanche/transitive.go +++ b/snow/engine/avalanche/transitive.go @@ -12,6 +12,7 @@ import ( "github.com/ava-labs/gecko/snow/choices" "github.com/ava-labs/gecko/snow/consensus/avalanche" "github.com/ava-labs/gecko/snow/consensus/snowstorm" + "github.com/ava-labs/gecko/snow/engine/avalanche/poll" "github.com/ava-labs/gecko/snow/engine/common" "github.com/ava-labs/gecko/snow/events" "github.com/ava-labs/gecko/utils/formatting" @@ -31,7 +32,7 @@ type Transitive struct { Config bootstrapper - polls polls // track people I have asked for their preference + polls poll.Set // track people I have asked for their preference // vtxReqs prevents asking validators for the same vertex vtxReqs common.Requests @@ -57,7 +58,12 @@ func (t *Transitive) Initialize(config Config) error { t.onFinished = t.finishBootstrapping - t.polls = newPolls(int(config.Params.Alpha), config.Context.Log, t.numPolls) + factory := poll.NewEarlyTermNoTraversalFactory(int(config.Params.Alpha)) + t.polls = poll.NewSet(factory, + config.Context.Log, + config.Params.Namespace, + config.Params.Metrics, + ) return t.bootstrapper.Initialize(config.BootstrapConfig) } @@ -309,7 +315,7 @@ func (t *Transitive) Notify(msg common.Message) error { } func (t *Transitive) repoll() error { - if len(t.polls.m) >= t.Params.ConcurrentRepolls || t.errs.Errored() { + if t.polls.Len() >= t.Params.ConcurrentRepolls || t.errs.Errored() { return nil } @@ -318,7 +324,7 @@ func (t *Transitive) repoll() error { return err } - for i := len(t.polls.m); i < t.Params.ConcurrentRepolls; i++ { + for i := t.polls.Len(); i < t.Params.ConcurrentRepolls; i++ { if err := t.batch(nil, false /*=force*/, true /*=empty*/); err != nil { return err } diff --git a/snow/engine/snowman/metrics.go b/snow/engine/snowman/metrics.go index f17d360..d71697a 100644 --- a/snow/engine/snowman/metrics.go +++ b/snow/engine/snowman/metrics.go @@ -13,7 +13,7 @@ type metrics struct { numPendingRequests, numBlocked prometheus.Gauge numBootstrapped, numDropped prometheus.Counter - numPolls, numBlkRequests, numBlockedBlk prometheus.Gauge + numBlkRequests, numBlockedBlk prometheus.Gauge } // Initialize implements the Engine interface @@ -42,12 +42,6 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr Name: "sm_bs_dropped", Help: "Number of dropped bootstrap blocks", }) - m.numPolls = prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Name: "sm_polls", - Help: "Number of pending network polls", - }) m.numBlkRequests = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: namespace, @@ -73,9 +67,6 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr if err := registerer.Register(m.numDropped); err != nil { log.Error("Failed to register sm_bs_dropped statistics due to %s", err) } - if err := registerer.Register(m.numPolls); err != nil { - log.Error("Failed to register sm_polls statistics due to %s", err) - } if err := registerer.Register(m.numBlkRequests); err != nil { log.Error("Failed to register sm_blk_requests statistics due to %s", err) } diff --git a/snow/engine/snowman/poll/early_term_no_traversal.go b/snow/engine/snowman/poll/early_term_no_traversal.go new file mode 100644 index 0000000..8042b27 --- /dev/null +++ b/snow/engine/snowman/poll/early_term_no_traversal.go @@ -0,0 +1,73 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package poll + +import ( + "fmt" + + "github.com/ava-labs/gecko/ids" +) + +type earlyTermNoTraversalFactory struct { + alpha int +} + +// NewEarlyTermNoTraversalFactory returns a factory that returns polls with +// early termination, without doing DAG traversals +func NewEarlyTermNoTraversalFactory(alpha int) Factory { + return &earlyTermNoTraversalFactory{alpha: alpha} +} + +func (f *earlyTermNoTraversalFactory) New(vdrs ids.ShortSet) Poll { + return &earlyTermNoTraversalPoll{ + polled: vdrs, + alpha: f.alpha, + } +} + +// earlyTermNoTraversalPoll finishes when any remaining validators can't change +// the result of the poll. However, does not terminate tightly with this bound. +// It terminates as quickly as it can without performing any DAG traversals. +type earlyTermNoTraversalPoll struct { + votes ids.Bag + polled ids.ShortSet + alpha int +} + +// Vote registers a response for this poll +func (p *earlyTermNoTraversalPoll) Vote(vdr ids.ShortID, vote ids.ID) { + if !p.polled.Contains(vdr) { + // if the validator wasn't polled or already responded to this poll, we + // should just drop the vote + return + } + + // make sure that a validator can't respond multiple times + p.polled.Remove(vdr) + + // track the votes the validator responded with + p.votes.Add(vote) +} + +// Drop any future response for this poll +func (p *earlyTermNoTraversalPoll) Drop(vdr ids.ShortID) { + p.polled.Remove(vdr) +} + +// Finished returns true when all validators have voted +func (p *earlyTermNoTraversalPoll) Finished() bool { + remaining := p.polled.Len() + received := p.votes.Len() + _, freq := p.votes.Mode() + return remaining == 0 || // All k nodes responded + freq >= p.alpha || // An alpha majority has returned + received+remaining < p.alpha // An alpha majority can never return +} + +// Result returns the result of this poll +func (p *earlyTermNoTraversalPoll) Result() ids.Bag { return p.votes } + +func (p *earlyTermNoTraversalPoll) String() string { + return fmt.Sprintf("waiting on %s", p.polled) +} diff --git a/snow/engine/snowman/poll/early_term_no_traversal_test.go b/snow/engine/snowman/poll/early_term_no_traversal_test.go new file mode 100644 index 0000000..dd444e9 --- /dev/null +++ b/snow/engine/snowman/poll/early_term_no_traversal_test.go @@ -0,0 +1,205 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package poll + +import ( + "testing" + + "github.com/ava-labs/gecko/ids" +) + +func TestEarlyTermNoTraversalResults(t *testing.T) { + alpha := 1 + + vtxID := ids.NewID([32]byte{1}) + + vdr1 := ids.NewShortID([20]byte{1}) // k = 1 + + vdrs := ids.ShortSet{} + vdrs.Add(vdr1) + + factory := NewEarlyTermNoTraversalFactory(alpha) + poll := factory.New(vdrs) + + poll.Vote(vdr1, vtxID) + if !poll.Finished() { + t.Fatalf("Poll did not terminate after receiving k votes") + } + + result := poll.Result() + if list := result.List(); len(list) != 1 { + t.Fatalf("Wrong number of vertices returned") + } else if retVtxID := list[0]; !retVtxID.Equals(vtxID) { + t.Fatalf("Wrong vertex returned") + } else if result.Count(vtxID) != 1 { + t.Fatalf("Wrong number of votes returned") + } +} + +func TestEarlyTermNoTraversalString(t *testing.T) { + alpha := 2 + + vtxID := ids.NewID([32]byte{1}) + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) // k = 2 + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + ) + + factory := NewEarlyTermNoTraversalFactory(alpha) + poll := factory.New(vdrs) + + poll.Vote(vdr1, vtxID) + + expected := "waiting on {BaMPFdqMUQ46BV8iRcwbVfsam55kMqcp}" + if result := poll.String(); expected != result { + t.Fatalf("Poll should have returned %s but returned %s", expected, result) + } +} + +func TestEarlyTermNoTraversalDropsDuplicatedVotes(t *testing.T) { + alpha := 2 + + vtxID := ids.NewID([32]byte{1}) + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) // k = 2 + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + ) + + factory := NewEarlyTermNoTraversalFactory(alpha) + poll := factory.New(vdrs) + + poll.Vote(vdr1, vtxID) + if poll.Finished() { + t.Fatalf("Poll finished after less than alpha votes") + } + poll.Vote(vdr1, vtxID) + if poll.Finished() { + t.Fatalf("Poll finished after getting a duplicated vote") + } + poll.Vote(vdr2, vtxID) + if !poll.Finished() { + t.Fatalf("Poll did not terminate after receiving k votes") + } +} + +func TestEarlyTermNoTraversalTerminatesEarly(t *testing.T) { + alpha := 3 + + vtxID := ids.NewID([32]byte{1}) + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) + vdr3 := ids.NewShortID([20]byte{3}) + vdr4 := ids.NewShortID([20]byte{4}) + vdr5 := ids.NewShortID([20]byte{5}) // k = 5 + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + vdr3, + vdr4, + vdr5, + ) + + factory := NewEarlyTermNoTraversalFactory(alpha) + poll := factory.New(vdrs) + + poll.Vote(vdr1, vtxID) + if poll.Finished() { + t.Fatalf("Poll finished after less than alpha votes") + } + poll.Vote(vdr2, vtxID) + if poll.Finished() { + t.Fatalf("Poll finished after less than alpha votes") + } + poll.Vote(vdr3, vtxID) + if !poll.Finished() { + t.Fatalf("Poll did not terminate early after receiving alpha votes for one vertex and none for other vertices") + } +} + +func TestEarlyTermNoTraversalForSharedAncestor(t *testing.T) { + alpha := 4 + + vtxA := ids.NewID([32]byte{1}) + vtxB := ids.NewID([32]byte{2}) + vtxC := ids.NewID([32]byte{3}) + vtxD := ids.NewID([32]byte{4}) + + // If validators 1-3 vote for frontier vertices + // B, C, and D respectively, which all share the common ancestor + // A, then we cannot terminate early with alpha = k = 4 + // If the final vote is cast for any of A, B, C, or D, then + // vertex A will have transitively received alpha = 4 votes + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) + vdr3 := ids.NewShortID([20]byte{3}) + vdr4 := ids.NewShortID([20]byte{4}) + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + vdr3, + vdr4, + ) + + factory := NewEarlyTermNoTraversalFactory(alpha) + poll := factory.New(vdrs) + + poll.Vote(vdr1, vtxB) + if poll.Finished() { + t.Fatalf("Poll finished early after receiving one vote") + } + poll.Vote(vdr2, vtxC) + if poll.Finished() { + t.Fatalf("Poll finished early after receiving two votes") + } + poll.Vote(vdr3, vtxD) + if poll.Finished() { + t.Fatalf("Poll terminated early, when a shared ancestor could have received alpha votes") + } + poll.Vote(vdr4, vtxA) + if !poll.Finished() { + t.Fatalf("Poll did not terminate after receiving all outstanding votes") + } +} + +func TestEarlyTermNoTraversalWithFastDrops(t *testing.T) { + alpha := 2 + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) + vdr3 := ids.NewShortID([20]byte{3}) // k = 3 + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + vdr3, + ) + + factory := NewEarlyTermNoTraversalFactory(alpha) + poll := factory.New(vdrs) + + poll.Drop(vdr1) + if poll.Finished() { + t.Fatalf("Poll finished early after dropping one vote") + } + poll.Drop(vdr2) + if !poll.Finished() { + t.Fatalf("Poll did not terminate after dropping two votes") + } +} diff --git a/snow/engine/snowman/poll/interfaces.go b/snow/engine/snowman/poll/interfaces.go new file mode 100644 index 0000000..33731ad --- /dev/null +++ b/snow/engine/snowman/poll/interfaces.go @@ -0,0 +1,35 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package poll + +import ( + "fmt" + + "github.com/ava-labs/gecko/ids" +) + +// Set is a collection of polls +type Set interface { + fmt.Stringer + + Add(requestID uint32, vdrs ids.ShortSet) bool + Vote(requestID uint32, vdr ids.ShortID, vote ids.ID) (ids.Bag, bool) + Drop(requestID uint32, vdr ids.ShortID) (ids.Bag, bool) + Len() int +} + +// Poll is an outstanding poll +type Poll interface { + fmt.Stringer + + Vote(vdr ids.ShortID, vote ids.ID) + Drop(vdr ids.ShortID) + Finished() bool + Result() ids.Bag +} + +// Factory creates a new Poll +type Factory interface { + New(vdrs ids.ShortSet) Poll +} diff --git a/snow/engine/snowman/poll/no_early_term.go b/snow/engine/snowman/poll/no_early_term.go new file mode 100644 index 0000000..3bcaf38 --- /dev/null +++ b/snow/engine/snowman/poll/no_early_term.go @@ -0,0 +1,55 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package poll + +import ( + "fmt" + + "github.com/ava-labs/gecko/ids" +) + +type noEarlyTermFactory struct{} + +// NewNoEarlyTermFactory returns a factory that returns polls with no early +// termination +func NewNoEarlyTermFactory() Factory { return noEarlyTermFactory{} } + +func (noEarlyTermFactory) New(vdrs ids.ShortSet) Poll { + return &noEarlyTermPoll{polled: vdrs} +} + +// noEarlyTermPoll finishes when all polled validators either respond to the +// query or a timeout occurs +type noEarlyTermPoll struct { + votes ids.Bag + polled ids.ShortSet +} + +// Vote registers a response for this poll +func (p *noEarlyTermPoll) Vote(vdr ids.ShortID, vote ids.ID) { + if !p.polled.Contains(vdr) { + // if the validator wasn't polled or already responded to this poll, we + // should just drop the vote + return + } + + // make sure that a validator can't respond multiple times + p.polled.Remove(vdr) + + // track the votes the validator responded with + p.votes.Add(vote) +} + +// Drop any future response for this poll +func (p *noEarlyTermPoll) Drop(vdr ids.ShortID) { p.polled.Remove(vdr) } + +// Finished returns true when all validators have voted +func (p *noEarlyTermPoll) Finished() bool { return p.polled.Len() == 0 } + +// Result returns the result of this poll +func (p *noEarlyTermPoll) Result() ids.Bag { return p.votes } + +func (p *noEarlyTermPoll) String() string { + return fmt.Sprintf("waiting on %s", p.polled) +} diff --git a/snow/engine/snowman/poll/no_early_term_test.go b/snow/engine/snowman/poll/no_early_term_test.go new file mode 100644 index 0000000..a366b5e --- /dev/null +++ b/snow/engine/snowman/poll/no_early_term_test.go @@ -0,0 +1,92 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package poll + +import ( + "testing" + + "github.com/ava-labs/gecko/ids" +) + +func TestNoEarlyTermResults(t *testing.T) { + vtxID := ids.NewID([32]byte{1}) + + vdr1 := ids.NewShortID([20]byte{1}) // k = 1 + + vdrs := ids.ShortSet{} + vdrs.Add(vdr1) + + factory := NewNoEarlyTermFactory() + poll := factory.New(vdrs) + + poll.Vote(vdr1, vtxID) + if !poll.Finished() { + t.Fatalf("Poll did not terminate after receiving k votes") + } + + result := poll.Result() + if list := result.List(); len(list) != 1 { + t.Fatalf("Wrong number of vertices returned") + } else if retVtxID := list[0]; !retVtxID.Equals(vtxID) { + t.Fatalf("Wrong vertex returned") + } else if result.Count(vtxID) != 1 { + t.Fatalf("Wrong number of votes returned") + } +} + +func TestNoEarlyTermString(t *testing.T) { + vtxID := ids.NewID([32]byte{1}) + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) // k = 2 + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + ) + + factory := NewNoEarlyTermFactory() + poll := factory.New(vdrs) + + poll.Vote(vdr1, vtxID) + + expected := "waiting on {BaMPFdqMUQ46BV8iRcwbVfsam55kMqcp}" + if result := poll.String(); expected != result { + t.Fatalf("Poll should have returned %s but returned %s", expected, result) + } +} + +func TestNoEarlyTermDropsDuplicatedVotes(t *testing.T) { + vtxID := ids.NewID([32]byte{1}) + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) // k = 2 + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + ) + + factory := NewNoEarlyTermFactory() + poll := factory.New(vdrs) + + poll.Vote(vdr1, vtxID) + if poll.Finished() { + t.Fatalf("Poll finished after less than alpha votes") + } + poll.Vote(vdr1, vtxID) + if poll.Finished() { + t.Fatalf("Poll finished after getting a duplicated vote") + } + poll.Drop(vdr1) + if poll.Finished() { + t.Fatalf("Poll finished after getting a duplicated vote") + } + poll.Vote(vdr2, vtxID) + if !poll.Finished() { + t.Fatalf("Poll did not terminate after receiving k votes") + } +} diff --git a/snow/engine/snowman/poll/set.go b/snow/engine/snowman/poll/set.go new file mode 100644 index 0000000..25e0e68 --- /dev/null +++ b/snow/engine/snowman/poll/set.go @@ -0,0 +1,158 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package poll + +import ( + "fmt" + "strings" + "time" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/ava-labs/gecko/ids" + "github.com/ava-labs/gecko/utils/logging" + "github.com/ava-labs/gecko/utils/timer" +) + +type poll struct { + Poll + start time.Time +} + +type set struct { + log logging.Logger + numPolls prometheus.Gauge + durPolls prometheus.Histogram + factory Factory + polls map[uint32]poll +} + +// NewSet returns a new empty set of polls +func NewSet( + factory Factory, + log logging.Logger, + namespace string, + registerer prometheus.Registerer, +) Set { + numPolls := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "polls", + Help: "Number of pending network polls", + }) + if err := registerer.Register(numPolls); err != nil { + log.Error("failed to register polls statistics due to %s", err) + } + + durPolls := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Name: "poll_duration", + Help: "Length of time the poll existed in milliseconds", + Buckets: timer.MillisecondsBuckets, + }) + if err := registerer.Register(durPolls); err != nil { + log.Error("failed to register poll_duration statistics due to %s", err) + } + + return &set{ + log: log, + numPolls: numPolls, + durPolls: durPolls, + factory: factory, + polls: make(map[uint32]poll), + } +} + +// Add to the current set of polls +// Returns true if the poll was registered correctly and the network sample +// should be made. +func (s *set) Add(requestID uint32, vdrs ids.ShortSet) bool { + if _, exists := s.polls[requestID]; exists { + s.log.Debug("dropping poll due to duplicated requestID: %d", requestID) + return false + } + + s.log.Verbo("creating poll with requestID %d and validators %s", + requestID, + vdrs) + + s.polls[requestID] = poll{ + Poll: s.factory.New(vdrs), // create the new poll + start: time.Now(), + } + s.numPolls.Inc() // increase the metrics + return true +} + +// Vote registers the connections response to a query for [id]. If there was no +// query, or the response has already be registered, nothing is performed. +func (s *set) Vote( + requestID uint32, + vdr ids.ShortID, + vote ids.ID, +) (ids.Bag, bool) { + poll, exists := s.polls[requestID] + if !exists { + s.log.Verbo("dropping vote from %s to an unknown poll with requestID: %d", + vdr, + requestID) + return ids.Bag{}, false + } + + s.log.Verbo("processing vote from %s in the poll with requestID: %d with the vote %s", + vdr, + requestID, + vote) + + poll.Vote(vdr, vote) + if !poll.Finished() { + return ids.Bag{}, false + } + + s.log.Verbo("poll with requestID %d finished as %s", requestID, poll) + + delete(s.polls, requestID) // remove the poll from the current set + s.durPolls.Observe(float64(time.Now().Sub(poll.start).Milliseconds())) + s.numPolls.Dec() // decrease the metrics + return poll.Result(), true +} + +// Drop registers the connections response to a query for [id]. If there was no +// query, or the response has already be registered, nothing is performed. +func (s *set) Drop(requestID uint32, vdr ids.ShortID) (ids.Bag, bool) { + poll, exists := s.polls[requestID] + if !exists { + s.log.Verbo("dropping vote from %s to an unknown poll with requestID: %d", + vdr, + requestID) + return ids.Bag{}, false + } + + s.log.Verbo("processing dropped vote from %s in the poll with requestID: %d", + vdr, + requestID) + + poll.Drop(vdr) + if !poll.Finished() { + return ids.Bag{}, false + } + + s.log.Verbo("poll with requestID %d finished as %s", requestID, poll) + + delete(s.polls, requestID) // remove the poll from the current set + s.durPolls.Observe(float64(time.Now().Sub(poll.start).Milliseconds())) + s.numPolls.Dec() // decrease the metrics + return poll.Result(), true +} + +// Len returns the number of outstanding polls +func (s *set) Len() int { return len(s.polls) } + +func (s *set) String() string { + sb := strings.Builder{} + sb.WriteString(fmt.Sprintf("current polls: (Size = %d)", len(s.polls))) + for requestID, poll := range s.polls { + sb.WriteString(fmt.Sprintf("\n %d: %s", requestID, poll)) + } + return sb.String() +} diff --git a/snow/engine/snowman/poll/set_test.go b/snow/engine/snowman/poll/set_test.go new file mode 100644 index 0000000..2ccf0f3 --- /dev/null +++ b/snow/engine/snowman/poll/set_test.go @@ -0,0 +1,135 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package poll + +import ( + "testing" + + "github.com/ava-labs/gecko/ids" + "github.com/ava-labs/gecko/utils/logging" + "github.com/prometheus/client_golang/prometheus" +) + +func TestNewSetErrorOnMetrics(t *testing.T) { + factory := NewNoEarlyTermFactory() + log := logging.NoLog{} + namespace := "" + registerer := prometheus.NewRegistry() + + registerer.Register(prometheus.NewCounter(prometheus.CounterOpts{ + Name: "polls", + })) + registerer.Register(prometheus.NewCounter(prometheus.CounterOpts{ + Name: "poll_duration", + })) + + _ = NewSet(factory, log, namespace, registerer) +} + +func TestCreateAndFinishSuccessfulPoll(t *testing.T) { + factory := NewNoEarlyTermFactory() + log := logging.NoLog{} + namespace := "" + registerer := prometheus.NewRegistry() + s := NewSet(factory, log, namespace, registerer) + + vtxID := ids.NewID([32]byte{1}) + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) // k = 2 + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + ) + + if s.Len() != 0 { + t.Fatalf("Shouldn't have any active polls yet") + } else if !s.Add(0, vdrs) { + t.Fatalf("Should have been able to add a new poll") + } else if s.Len() != 1 { + t.Fatalf("Should only have one active poll") + } else if s.Add(0, vdrs) { + t.Fatalf("Shouldn't have been able to add a duplicated poll") + } else if s.Len() != 1 { + t.Fatalf("Should only have one active poll") + } else if _, finished := s.Vote(1, vdr1, vtxID); finished { + t.Fatalf("Shouldn't have been able to finish a non-existant poll") + } else if _, finished := s.Vote(0, vdr1, vtxID); finished { + t.Fatalf("Shouldn't have been able to finish an ongoing poll") + } else if _, finished := s.Vote(0, vdr1, vtxID); finished { + t.Fatalf("Should have dropped a duplicated poll") + } else if result, finished := s.Vote(0, vdr2, vtxID); !finished { + t.Fatalf("Should have finished the") + } else if list := result.List(); len(list) != 1 { + t.Fatalf("Wrong number of vertices returned") + } else if retVtxID := list[0]; !retVtxID.Equals(vtxID) { + t.Fatalf("Wrong vertex returned") + } else if result.Count(vtxID) != 2 { + t.Fatalf("Wrong number of votes returned") + } +} + +func TestCreateAndFinishFailedPoll(t *testing.T) { + factory := NewNoEarlyTermFactory() + log := logging.NoLog{} + namespace := "" + registerer := prometheus.NewRegistry() + s := NewSet(factory, log, namespace, registerer) + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) // k = 2 + + vdrs := ids.ShortSet{} + vdrs.Add( + vdr1, + vdr2, + ) + + if s.Len() != 0 { + t.Fatalf("Shouldn't have any active polls yet") + } else if !s.Add(0, vdrs) { + t.Fatalf("Should have been able to add a new poll") + } else if s.Len() != 1 { + t.Fatalf("Should only have one active poll") + } else if s.Add(0, vdrs) { + t.Fatalf("Shouldn't have been able to add a duplicated poll") + } else if s.Len() != 1 { + t.Fatalf("Should only have one active poll") + } else if _, finished := s.Drop(1, vdr1); finished { + t.Fatalf("Shouldn't have been able to finish a non-existant poll") + } else if _, finished := s.Drop(0, vdr1); finished { + t.Fatalf("Shouldn't have been able to finish an ongoing poll") + } else if _, finished := s.Drop(0, vdr1); finished { + t.Fatalf("Should have dropped a duplicated poll") + } else if result, finished := s.Drop(0, vdr2); !finished { + t.Fatalf("Should have finished the") + } else if list := result.List(); len(list) != 0 { + t.Fatalf("Wrong number of vertices returned") + } +} + +func TestSetString(t *testing.T) { + factory := NewNoEarlyTermFactory() + log := logging.NoLog{} + namespace := "" + registerer := prometheus.NewRegistry() + s := NewSet(factory, log, namespace, registerer) + + vdr1 := ids.NewShortID([20]byte{1}) // k = 1 + + vdrs := ids.ShortSet{} + vdrs.Add(vdr1) + + expected := "current polls: (Size = 1)\n" + + " 0: waiting on {6HgC8KRBEhXYbF4riJyJFLSHt37UNuRt}" + if !s.Add(0, vdrs) { + t.Fatalf("Should have been able to add a new poll") + } else if str := s.String(); expected != str { + t.Fatalf("Set return wrong string, Expected:\n%s\nReturned:\n%s", + expected, + str) + } +} diff --git a/snow/engine/snowman/polls.go b/snow/engine/snowman/polls.go deleted file mode 100644 index 6765ff7..0000000 --- a/snow/engine/snowman/polls.go +++ /dev/null @@ -1,115 +0,0 @@ -// (c) 2019-2020, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package snowman - -import ( - "fmt" - "strings" - - "github.com/ava-labs/gecko/ids" - "github.com/ava-labs/gecko/utils/logging" - "github.com/prometheus/client_golang/prometheus" -) - -type polls struct { - log logging.Logger - numPolls prometheus.Gauge - alpha int - m map[uint32]poll -} - -// Add to the current set of polls -// Returns true if the poll was registered correctly and the network sample -// should be made. -func (p *polls) Add(requestID uint32, vdrs ids.ShortSet) bool { - poll, exists := p.m[requestID] - if !exists { - poll.alpha = p.alpha - poll.polled = vdrs - p.m[requestID] = poll - - p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics - } - return !exists -} - -// Vote registers the connections response to a query for [id]. If there was no -// query, or the response has already be registered, nothing is performed. -func (p *polls) Vote(requestID uint32, vdr ids.ShortID, vote ids.ID) (ids.Bag, bool) { - p.log.Verbo("[polls.Vote] Vote: requestID: %d. validatorID: %s. Vote: %s", requestID, vdr, vote) - poll, exists := p.m[requestID] - if !exists { - return ids.Bag{}, false - } - poll.Vote(vote, vdr) - if poll.Finished() { - delete(p.m, requestID) - p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics - return poll.votes, true - } - p.m[requestID] = poll - return ids.Bag{}, false -} - -// CancelVote registers the connections failure to respond to a query for [id]. -func (p *polls) CancelVote(requestID uint32, vdr ids.ShortID) (ids.Bag, bool) { - p.log.Verbo("CancelVote received. requestID: %d. validatorID: %s. Vote: %s", requestID, vdr) - poll, exists := p.m[requestID] - if !exists { - return ids.Bag{}, false - } - - poll.CancelVote(vdr) - if poll.Finished() { - delete(p.m, requestID) - p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics - return poll.votes, true - } - p.m[requestID] = poll - return ids.Bag{}, false -} - -func (p *polls) String() string { - sb := strings.Builder{} - - sb.WriteString(fmt.Sprintf("Current polls: (Size = %d)", len(p.m))) - for requestID, poll := range p.m { - sb.WriteString(fmt.Sprintf("\n %d: %s", requestID, poll)) - } - - return sb.String() -} - -// poll represents the current state of a network poll for a block -type poll struct { - alpha int - votes ids.Bag - polled ids.ShortSet -} - -// Vote registers a vote for this poll -func (p *poll) CancelVote(vdr ids.ShortID) { p.polled.Remove(vdr) } - -// Vote registers a vote for this poll -func (p *poll) Vote(vote ids.ID, vdr ids.ShortID) { - if p.polled.Contains(vdr) { - p.polled.Remove(vdr) - p.votes.Add(vote) - } -} - -// Finished returns true if the poll has completed, with no more required -// responses -func (p poll) Finished() bool { - remaining := p.polled.Len() - received := p.votes.Len() - _, freq := p.votes.Mode() - return remaining == 0 || // All k nodes responded - freq >= p.alpha || // An alpha majority has returned - received+remaining < p.alpha // An alpha majority can never return -} - -func (p poll) String() string { - return fmt.Sprintf("Waiting on %d chits from %s", p.polled.Len(), p.polled) -} diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index 0a89dc4..ab4a881 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -12,6 +12,7 @@ import ( "github.com/ava-labs/gecko/snow/choices" "github.com/ava-labs/gecko/snow/consensus/snowman" "github.com/ava-labs/gecko/snow/engine/common" + "github.com/ava-labs/gecko/snow/engine/snowman/poll" "github.com/ava-labs/gecko/snow/events" "github.com/ava-labs/gecko/utils/formatting" "github.com/ava-labs/gecko/utils/wrappers" @@ -30,7 +31,7 @@ type Transitive struct { bootstrapper // track outstanding preference requests - polls polls + polls poll.Set // blocks that have outstanding get requests blkReqs common.Requests @@ -64,10 +65,12 @@ func (t *Transitive) Initialize(config Config) error { t.onFinished = t.finishBootstrapping - t.polls.log = config.Context.Log - t.polls.numPolls = t.numPolls - t.polls.alpha = t.Params.Alpha - t.polls.m = make(map[uint32]poll) + factory := poll.NewEarlyTermNoTraversalFactory(int(config.Params.Alpha)) + t.polls = poll.NewSet(factory, + config.Context.Log, + config.Params.Namespace, + config.Params.Metrics, + ) return t.bootstrapper.Initialize(config.BootstrapConfig) } @@ -409,7 +412,7 @@ func (t *Transitive) repoll() { // propagate the most likely branch as quickly as possible prefID := t.Consensus.Preference() - for i := len(t.polls.m); i < t.Params.ConcurrentRepolls; i++ { + for i := t.polls.Len(); i < t.Params.ConcurrentRepolls; i++ { t.pullSample(prefID) } } diff --git a/snow/engine/snowman/transitive_test.go b/snow/engine/snowman/transitive_test.go index 8c5f9d1..e9571af 100644 --- a/snow/engine/snowman/transitive_test.go +++ b/snow/engine/snowman/transitive_test.go @@ -810,13 +810,13 @@ func TestVoteCanceling(t *testing.T) { te.insert(blk) - if len(te.polls.m) != 1 { + if te.polls.Len() != 1 { t.Fatalf("Shouldn't have finished blocking issue") } te.QueryFailed(vdr0.ID(), *queryRequestID) - if len(te.polls.m) != 1 { + if te.polls.Len() != 1 { t.Fatalf("Shouldn't have finished blocking issue") } diff --git a/snow/engine/snowman/voter.go b/snow/engine/snowman/voter.go index bd15831..5a3ca87 100644 --- a/snow/engine/snowman/voter.go +++ b/snow/engine/snowman/voter.go @@ -32,7 +32,7 @@ func (v *voter) Update() { results := ids.Bag{} finished := false if v.response.IsZero() { - results, finished = v.t.polls.CancelVote(v.requestID, v.vdr) + results, finished = v.t.polls.Drop(v.requestID, v.vdr) } else { results, finished = v.t.polls.Vote(v.requestID, v.vdr, v.response) }