Merge pull request #101 from ava-labs/moar-metrics

Added poll duration metrics
This commit is contained in:
Stephen Buttolph 2020-06-22 01:22:49 -04:00 committed by GitHub
commit bb6bd861d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1472 additions and 384 deletions

View File

@ -14,7 +14,7 @@ type metrics struct {
numBSVtx, numBSDroppedVtx,
numBSTx, numBSDroppedTx prometheus.Counter
numPolls, numVtxRequests, numTxRequests, numPendingVtx prometheus.Gauge
numVtxRequests, numTxRequests, numPendingVtx prometheus.Gauge
}
// Initialize implements the Engine interface
@ -61,12 +61,6 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr
Name: "av_bs_dropped_txs",
Help: "Number of dropped txs",
})
m.numPolls = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "av_polls",
Help: "Number of pending network polls",
})
m.numVtxRequests = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
@ -107,9 +101,6 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr
if err := registerer.Register(m.numBSDroppedTx); err != nil {
log.Error("Failed to register av_bs_dropped_txs statistics due to %s", err)
}
if err := registerer.Register(m.numPolls); err != nil {
log.Error("Failed to register av_polls statistics due to %s", err)
}
if err := registerer.Register(m.numVtxRequests); err != nil {
log.Error("Failed to register av_vtx_requests statistics due to %s", err)
}

View File

@ -0,0 +1,85 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package poll
import (
"fmt"
"github.com/ava-labs/gecko/ids"
)
type earlyTermNoTraversalFactory struct {
alpha int
}
// NewEarlyTermNoTraversalFactory returns a factory that returns polls with
// early termination, without doing DAG traversals
func NewEarlyTermNoTraversalFactory(alpha int) Factory {
return &earlyTermNoTraversalFactory{alpha: alpha}
}
func (f *earlyTermNoTraversalFactory) New(vdrs ids.ShortSet) Poll {
return &earlyTermNoTraversalPoll{
polled: vdrs,
alpha: f.alpha,
}
}
// earlyTermNoTraversalPoll finishes when any remaining validators can't change
// the result of the poll. However, does not terminate tightly with this bound.
// It terminates as quickly as it can without performing any DAG traversals.
type earlyTermNoTraversalPoll struct {
votes ids.UniqueBag
polled ids.ShortSet
alpha int
}
// Vote registers a response for this poll
func (p *earlyTermNoTraversalPoll) Vote(vdr ids.ShortID, votes []ids.ID) {
if !p.polled.Contains(vdr) {
// if the validator wasn't polled or already responded to this poll, we
// should just drop the vote
return
}
// make sure that a validator can't respond multiple times
p.polled.Remove(vdr)
// track the votes the validator responded with
p.votes.Add(uint(p.polled.Len()), votes...)
}
// Finished returns true when all validators have voted
func (p *earlyTermNoTraversalPoll) Finished() bool {
// If there are no outstanding queries, the poll is finished
numPending := p.polled.Len()
if numPending == 0 {
return true
}
// If there are still enough pending responses to include another vertex,
// then the poll must wait for more responses
if numPending > p.alpha {
return false
}
// Ignore any vertex that has already received alpha votes. To safely skip
// DAG traversal, assume that all votes for vertices with less than alpha
// votes will be applied to a single shared ancestor. In this case, the poll
// can terminate early, iff there are not enough pending votes for this
// ancestor to receive alpha votes.
partialVotes := ids.BitSet(0)
for _, vote := range p.votes.List() {
if voters := p.votes.GetSet(vote); voters.Len() < p.alpha {
partialVotes.Union(voters)
}
}
return partialVotes.Len()+numPending < p.alpha
}
// Result returns the result of this poll
func (p *earlyTermNoTraversalPoll) Result() ids.UniqueBag { return p.votes }
func (p *earlyTermNoTraversalPoll) String() string {
return fmt.Sprintf("waiting on %s", p.polled)
}

View File

@ -0,0 +1,207 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package poll
import (
"testing"
"github.com/ava-labs/gecko/ids"
)
func TestEarlyTermNoTraversalResults(t *testing.T) {
alpha := 1
vtxID := ids.NewID([32]byte{1})
votes := []ids.ID{vtxID}
vdr1 := ids.NewShortID([20]byte{1}) // k = 1
vdrs := ids.ShortSet{}
vdrs.Add(vdr1)
factory := NewEarlyTermNoTraversalFactory(alpha)
poll := factory.New(vdrs)
poll.Vote(vdr1, votes)
if !poll.Finished() {
t.Fatalf("Poll did not terminate after receiving k votes")
}
result := poll.Result()
if list := result.List(); len(list) != 1 {
t.Fatalf("Wrong number of vertices returned")
} else if retVtxID := list[0]; !retVtxID.Equals(vtxID) {
t.Fatalf("Wrong vertex returned")
} else if set := result.GetSet(vtxID); set.Len() != 1 {
t.Fatalf("Wrong number of votes returned")
}
}
func TestEarlyTermNoTraversalString(t *testing.T) {
alpha := 2
vtxID := ids.NewID([32]byte{1})
votes := []ids.ID{vtxID}
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2}) // k = 2
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
)
factory := NewEarlyTermNoTraversalFactory(alpha)
poll := factory.New(vdrs)
poll.Vote(vdr1, votes)
expected := "waiting on {BaMPFdqMUQ46BV8iRcwbVfsam55kMqcp}"
if result := poll.String(); expected != result {
t.Fatalf("Poll should have returned %s but returned %s", expected, result)
}
}
func TestEarlyTermNoTraversalDropsDuplicatedVotes(t *testing.T) {
alpha := 2
vtxID := ids.NewID([32]byte{1})
votes := []ids.ID{vtxID}
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2}) // k = 2
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
)
factory := NewEarlyTermNoTraversalFactory(alpha)
poll := factory.New(vdrs)
poll.Vote(vdr1, votes)
if poll.Finished() {
t.Fatalf("Poll finished after less than alpha votes")
}
poll.Vote(vdr1, votes)
if poll.Finished() {
t.Fatalf("Poll finished after getting a duplicated vote")
}
poll.Vote(vdr2, votes)
if !poll.Finished() {
t.Fatalf("Poll did not terminate after receiving k votes")
}
}
func TestEarlyTermNoTraversalTerminatesEarly(t *testing.T) {
alpha := 3
vtxID := ids.NewID([32]byte{1})
votes := []ids.ID{vtxID}
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2})
vdr3 := ids.NewShortID([20]byte{3})
vdr4 := ids.NewShortID([20]byte{4})
vdr5 := ids.NewShortID([20]byte{5}) // k = 5
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
vdr3,
vdr4,
vdr5,
)
factory := NewEarlyTermNoTraversalFactory(alpha)
poll := factory.New(vdrs)
poll.Vote(vdr1, votes)
if poll.Finished() {
t.Fatalf("Poll finished after less than alpha votes")
}
poll.Vote(vdr2, votes)
if poll.Finished() {
t.Fatalf("Poll finished after less than alpha votes")
}
poll.Vote(vdr3, votes)
if !poll.Finished() {
t.Fatalf("Poll did not terminate early after receiving alpha votes for one vertex and none for other vertices")
}
}
func TestEarlyTermNoTraversalForSharedAncestor(t *testing.T) {
alpha := 4
vtxA := ids.NewID([32]byte{1})
vtxB := ids.NewID([32]byte{2})
vtxC := ids.NewID([32]byte{3})
vtxD := ids.NewID([32]byte{4})
// If validators 1-3 vote for frontier vertices
// B, C, and D respectively, which all share the common ancestor
// A, then we cannot terminate early with alpha = k = 4
// If the final vote is cast for any of A, B, C, or D, then
// vertex A will have transitively received alpha = 4 votes
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2})
vdr3 := ids.NewShortID([20]byte{3})
vdr4 := ids.NewShortID([20]byte{4})
vdrs := ids.ShortSet{}
vdrs.Add(vdr1)
vdrs.Add(vdr2)
vdrs.Add(vdr3)
vdrs.Add(vdr4)
factory := NewEarlyTermNoTraversalFactory(alpha)
poll := factory.New(vdrs)
poll.Vote(vdr1, []ids.ID{vtxB})
if poll.Finished() {
t.Fatalf("Poll finished early after receiving one vote")
}
poll.Vote(vdr2, []ids.ID{vtxC})
if poll.Finished() {
t.Fatalf("Poll finished early after receiving two votes")
}
poll.Vote(vdr3, []ids.ID{vtxD})
if poll.Finished() {
t.Fatalf("Poll terminated early, when a shared ancestor could have received alpha votes")
}
poll.Vote(vdr4, []ids.ID{vtxA})
if !poll.Finished() {
t.Fatalf("Poll did not terminate after receiving all outstanding votes")
}
}
func TestEarlyTermNoTraversalWithFastDrops(t *testing.T) {
alpha := 2
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2})
vdr3 := ids.NewShortID([20]byte{3}) // k = 3
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
vdr3,
)
factory := NewEarlyTermNoTraversalFactory(alpha)
poll := factory.New(vdrs)
poll.Vote(vdr1, nil)
if poll.Finished() {
t.Fatalf("Poll finished early after dropping one vote")
}
poll.Vote(vdr2, nil)
if !poll.Finished() {
t.Fatalf("Poll did not terminate after dropping two votes")
}
}

View File

@ -0,0 +1,33 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package poll
import (
"fmt"
"github.com/ava-labs/gecko/ids"
)
// Set is a collection of polls
type Set interface {
fmt.Stringer
Add(requestID uint32, vdrs ids.ShortSet) bool
Vote(requestID uint32, vdr ids.ShortID, votes []ids.ID) (ids.UniqueBag, bool)
Len() int
}
// Poll is an outstanding poll
type Poll interface {
fmt.Stringer
Vote(vdr ids.ShortID, votes []ids.ID)
Finished() bool
Result() ids.UniqueBag
}
// Factory creates a new Poll
type Factory interface {
New(vdrs ids.ShortSet) Poll
}

View File

@ -0,0 +1,52 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package poll
import (
"fmt"
"github.com/ava-labs/gecko/ids"
)
type noEarlyTermFactory struct{}
// NewNoEarlyTermFactory returns a factory that returns polls with no early
// termination
func NewNoEarlyTermFactory() Factory { return noEarlyTermFactory{} }
func (noEarlyTermFactory) New(vdrs ids.ShortSet) Poll {
return &noEarlyTermPoll{polled: vdrs}
}
// noEarlyTermPoll finishes when all polled validators either respond to the
// query or a timeout occurs
type noEarlyTermPoll struct {
votes ids.UniqueBag
polled ids.ShortSet
}
// Vote registers a response for this poll
func (p *noEarlyTermPoll) Vote(vdr ids.ShortID, votes []ids.ID) {
if !p.polled.Contains(vdr) {
// if the validator wasn't polled or already responded to this poll, we
// should just drop the vote
return
}
// make sure that a validator can't respond multiple times
p.polled.Remove(vdr)
// track the votes the validator responded with
p.votes.Add(uint(p.polled.Len()), votes...)
}
// Finished returns true when all validators have voted
func (p *noEarlyTermPoll) Finished() bool { return p.polled.Len() == 0 }
// Result returns the result of this poll
func (p *noEarlyTermPoll) Result() ids.UniqueBag { return p.votes }
func (p *noEarlyTermPoll) String() string {
return fmt.Sprintf("waiting on %s", p.polled)
}

View File

@ -0,0 +1,91 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package poll
import (
"testing"
"github.com/ava-labs/gecko/ids"
)
func TestNoEarlyTermResults(t *testing.T) {
vtxID := ids.NewID([32]byte{1})
votes := []ids.ID{vtxID}
vdr1 := ids.NewShortID([20]byte{1}) // k = 1
vdrs := ids.ShortSet{}
vdrs.Add(vdr1)
factory := NewNoEarlyTermFactory()
poll := factory.New(vdrs)
poll.Vote(vdr1, votes)
if !poll.Finished() {
t.Fatalf("Poll did not terminate after receiving k votes")
}
result := poll.Result()
if list := result.List(); len(list) != 1 {
t.Fatalf("Wrong number of vertices returned")
} else if retVtxID := list[0]; !retVtxID.Equals(vtxID) {
t.Fatalf("Wrong vertex returned")
} else if set := result.GetSet(vtxID); set.Len() != 1 {
t.Fatalf("Wrong number of votes returned")
}
}
func TestNoEarlyTermString(t *testing.T) {
vtxID := ids.NewID([32]byte{1})
votes := []ids.ID{vtxID}
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2}) // k = 2
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
)
factory := NewNoEarlyTermFactory()
poll := factory.New(vdrs)
poll.Vote(vdr1, votes)
expected := "waiting on {BaMPFdqMUQ46BV8iRcwbVfsam55kMqcp}"
if result := poll.String(); expected != result {
t.Fatalf("Poll should have returned %s but returned %s", expected, result)
}
}
func TestNoEarlyTermDropsDuplicatedVotes(t *testing.T) {
vtxID := ids.NewID([32]byte{1})
votes := []ids.ID{vtxID}
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2}) // k = 2
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
)
factory := NewNoEarlyTermFactory()
poll := factory.New(vdrs)
poll.Vote(vdr1, votes)
if poll.Finished() {
t.Fatalf("Poll finished after less than alpha votes")
}
poll.Vote(vdr1, votes)
if poll.Finished() {
t.Fatalf("Poll finished after getting a duplicated vote")
}
poll.Vote(vdr2, votes)
if !poll.Finished() {
t.Fatalf("Poll did not terminate after receiving k votes")
}
}

View File

@ -0,0 +1,130 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package poll
import (
"fmt"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/timer"
)
type poll struct {
Poll
start time.Time
}
type set struct {
log logging.Logger
numPolls prometheus.Gauge
durPolls prometheus.Histogram
factory Factory
polls map[uint32]poll
}
// NewSet returns a new empty set of polls
func NewSet(
factory Factory,
log logging.Logger,
namespace string,
registerer prometheus.Registerer,
) Set {
numPolls := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "polls",
Help: "Number of pending network polls",
})
if err := registerer.Register(numPolls); err != nil {
log.Error("failed to register polls statistics due to %s", err)
}
durPolls := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Name: "poll_duration",
Help: "Length of time the poll existed in milliseconds",
Buckets: timer.MillisecondsBuckets,
})
if err := registerer.Register(durPolls); err != nil {
log.Error("failed to register poll_duration statistics due to %s", err)
}
return &set{
log: log,
numPolls: numPolls,
durPolls: durPolls,
factory: factory,
polls: make(map[uint32]poll),
}
}
// Add to the current set of polls
// Returns true if the poll was registered correctly and the network sample
// should be made.
func (s *set) Add(requestID uint32, vdrs ids.ShortSet) bool {
if _, exists := s.polls[requestID]; exists {
s.log.Debug("dropping poll due to duplicated requestID: %d", requestID)
return false
}
s.log.Verbo("creating poll with requestID %d and validators %s",
requestID,
vdrs)
s.polls[requestID] = poll{
Poll: s.factory.New(vdrs), // create the new poll
start: time.Now(),
}
s.numPolls.Inc() // increase the metrics
return true
}
// Vote registers the connections response to a query for [id]. If there was no
// query, or the response has already be registered, nothing is performed.
func (s *set) Vote(
requestID uint32,
vdr ids.ShortID,
votes []ids.ID,
) (ids.UniqueBag, bool) {
poll, exists := s.polls[requestID]
if !exists {
s.log.Verbo("dropping vote from %s to an unknown poll with requestID: %d",
vdr,
requestID)
return nil, false
}
s.log.Verbo("processing vote from %s in the poll with requestID: %d with the votes %v",
vdr,
requestID,
votes)
poll.Vote(vdr, votes)
if !poll.Finished() {
return nil, false
}
s.log.Verbo("poll with requestID %d finished as %s", requestID, poll)
delete(s.polls, requestID) // remove the poll from the current set
s.durPolls.Observe(float64(time.Now().Sub(poll.start).Milliseconds()))
s.numPolls.Dec() // decrease the metrics
return poll.Result(), true
}
// Len returns the number of outstanding polls
func (s *set) Len() int { return len(s.polls) }
func (s *set) String() string {
sb := strings.Builder{}
sb.WriteString(fmt.Sprintf("current polls: (Size = %d)", len(s.polls)))
for requestID, poll := range s.polls {
sb.WriteString(fmt.Sprintf("\n %d: %s", requestID, poll))
}
return sb.String()
}

View File

@ -0,0 +1,97 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package poll
import (
"testing"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/utils/logging"
"github.com/prometheus/client_golang/prometheus"
)
func TestNewSetErrorOnMetrics(t *testing.T) {
factory := NewNoEarlyTermFactory()
log := logging.NoLog{}
namespace := ""
registerer := prometheus.NewRegistry()
registerer.Register(prometheus.NewCounter(prometheus.CounterOpts{
Name: "polls",
}))
registerer.Register(prometheus.NewCounter(prometheus.CounterOpts{
Name: "poll_duration",
}))
_ = NewSet(factory, log, namespace, registerer)
}
func TestCreateAndFinishPoll(t *testing.T) {
factory := NewNoEarlyTermFactory()
log := logging.NoLog{}
namespace := ""
registerer := prometheus.NewRegistry()
s := NewSet(factory, log, namespace, registerer)
vtxID := ids.NewID([32]byte{1})
votes := []ids.ID{vtxID}
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2}) // k = 2
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
)
if s.Len() != 0 {
t.Fatalf("Shouldn't have any active polls yet")
} else if !s.Add(0, vdrs) {
t.Fatalf("Should have been able to add a new poll")
} else if s.Len() != 1 {
t.Fatalf("Should only have one active poll")
} else if s.Add(0, vdrs) {
t.Fatalf("Shouldn't have been able to add a duplicated poll")
} else if s.Len() != 1 {
t.Fatalf("Should only have one active poll")
} else if _, finished := s.Vote(1, vdr1, votes); finished {
t.Fatalf("Shouldn't have been able to finish a non-existant poll")
} else if _, finished := s.Vote(0, vdr1, votes); finished {
t.Fatalf("Shouldn't have been able to finish an ongoing poll")
} else if _, finished := s.Vote(0, vdr1, votes); finished {
t.Fatalf("Should have dropped a duplicated poll")
} else if result, finished := s.Vote(0, vdr2, votes); !finished {
t.Fatalf("Should have finished the")
} else if list := result.List(); len(list) != 1 {
t.Fatalf("Wrong number of vertices returned")
} else if retVtxID := list[0]; !retVtxID.Equals(vtxID) {
t.Fatalf("Wrong vertex returned")
} else if set := result.GetSet(vtxID); set.Len() != 2 {
t.Fatalf("Wrong number of votes returned")
}
}
func TestSetString(t *testing.T) {
factory := NewNoEarlyTermFactory()
log := logging.NoLog{}
namespace := ""
registerer := prometheus.NewRegistry()
s := NewSet(factory, log, namespace, registerer)
vdr1 := ids.NewShortID([20]byte{1}) // k = 1
vdrs := ids.ShortSet{}
vdrs.Add(vdr1)
expected := "current polls: (Size = 1)\n" +
" 0: waiting on {6HgC8KRBEhXYbF4riJyJFLSHt37UNuRt}"
if !s.Add(0, vdrs) {
t.Fatalf("Should have been able to add a new poll")
} else if str := s.String(); expected != str {
t.Fatalf("Set return wrong string, Expected:\n%s\nReturned:\n%s",
expected,
str)
}
}

View File

@ -1,137 +0,0 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package avalanche
import (
"fmt"
"strings"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/utils/logging"
)
// TODO: There is a conservative early termination case that doesn't require dag
// traversals we may want to implement. The algorithm would go as follows:
// Keep track of the number of response that reference an ID. If an ID gets >=
// alpha responses, then remove it from all responses and place it into a chit
// list. Remove all empty responses. If the number of responses + the number of
// pending responses is less than alpha, terminate the poll.
// In the synchronous + virtuous case, when everyone returns the same hash, the
// poll now terminates after receiving alpha responses.
// In the rogue case, it is possible that the poll doesn't terminate as quickly
// as possible, because IDs may have the alpha threshold but only when counting
// transitive votes. In this case, we may wait even if it is no longer possible
// for another ID to earn alpha votes.
// Because alpha is typically set close to k, this may not be performance
// critical. However, early termination may be performance critical with crashed
// nodes.
type polls struct {
log logging.Logger
numPolls prometheus.Gauge
alpha int
m map[uint32]poll
}
func newPolls(alpha int, log logging.Logger, numPolls prometheus.Gauge) polls {
return polls{
log: log,
numPolls: numPolls,
alpha: alpha,
m: make(map[uint32]poll),
}
}
// Add to the current set of polls
// Returns true if the poll was registered correctly and the network sample
// should be made.
func (p *polls) Add(requestID uint32, vdrs ids.ShortSet) bool {
poll, exists := p.m[requestID]
if !exists {
poll.polled = vdrs
poll.alpha = p.alpha
p.m[requestID] = poll
p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics
}
return !exists
}
// Vote registers the connections response to a query for [id]. If there was no
// query, or the response has already be registered, nothing is performed.
func (p *polls) Vote(requestID uint32, vdr ids.ShortID, votes []ids.ID) (ids.UniqueBag, bool) {
p.log.Verbo("Vote. requestID: %d. validatorID: %s.", requestID, vdr)
poll, exists := p.m[requestID]
p.log.Verbo("Poll: %+v", poll)
if !exists {
return nil, false
}
poll.Vote(votes, vdr)
if poll.Finished() {
p.log.Verbo("Poll is finished")
delete(p.m, requestID)
p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics
return poll.votes, true
}
p.m[requestID] = poll
return nil, false
}
func (p *polls) String() string {
sb := strings.Builder{}
sb.WriteString(fmt.Sprintf("Current polls: (Size = %d)", len(p.m)))
for requestID, poll := range p.m {
sb.WriteString(fmt.Sprintf("\n %d: %s", requestID, poll))
}
return sb.String()
}
// poll represents the current state of a network poll for a vertex
type poll struct {
votes ids.UniqueBag
polled ids.ShortSet
alpha int
}
// Vote registers a vote for this poll
func (p *poll) Vote(votes []ids.ID, vdr ids.ShortID) {
if p.polled.Contains(vdr) {
p.polled.Remove(vdr)
p.votes.Add(uint(p.polled.Len()), votes...)
}
}
// Finished returns true if the poll has completed, with no more required
// responses
func (p poll) Finished() bool {
// If there are no outstanding queries, the poll is finished
numPending := p.polled.Len()
if numPending == 0 {
return true
}
// If there are still enough pending responses to include another vertex,
// then the poll must wait for more responses
if numPending > p.alpha {
return false
}
// Ignore any vertex that has already received alpha votes. To safely skip
// DAG traversal, assume that all votes for vertices with less than alpha
// votes will be applied to a single shared ancestor. In this case, the poll
// can terminate early, iff there are not enough pending votes for this
// ancestor to receive alpha votes.
partialVotes := ids.BitSet(0)
for _, vote := range p.votes.List() {
if voters := p.votes.GetSet(vote); voters.Len() < p.alpha {
partialVotes.Union(voters)
}
}
return partialVotes.Len()+numPending < p.alpha
}
func (p poll) String() string { return fmt.Sprintf("Waiting on %d chits", p.polled.Len()) }

View File

@ -1,99 +0,0 @@
package avalanche
import (
"testing"
"github.com/ava-labs/gecko/ids"
)
func TestPollTerminatesEarlyVirtuousCase(t *testing.T) {
alpha := 3
vtxID := GenerateID()
votes := []ids.ID{vtxID}
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2})
vdr3 := ids.NewShortID([20]byte{3})
vdr4 := ids.NewShortID([20]byte{4})
vdr5 := ids.NewShortID([20]byte{5}) // k = 5
vdrs := ids.ShortSet{}
vdrs.Add(vdr1)
vdrs.Add(vdr2)
vdrs.Add(vdr3)
vdrs.Add(vdr4)
vdrs.Add(vdr5)
poll := poll{
votes: make(ids.UniqueBag),
polled: vdrs,
alpha: alpha,
}
poll.Vote(votes, vdr1)
if poll.Finished() {
t.Fatalf("Poll finished after less than alpha votes")
}
poll.Vote(votes, vdr2)
if poll.Finished() {
t.Fatalf("Poll finished after less than alpha votes")
}
poll.Vote(votes, vdr3)
if !poll.Finished() {
t.Fatalf("Poll did not terminate early after receiving alpha votes for one vertex and none for other vertices")
}
}
func TestPollAccountsForSharedAncestor(t *testing.T) {
alpha := 4
vtxA := GenerateID()
vtxB := GenerateID()
vtxC := GenerateID()
vtxD := GenerateID()
// If validators 1-3 vote for frontier vertices
// B, C, and D respectively, which all share the common ancestor
// A, then we cannot terminate early with alpha = k = 4
// If the final vote is cast for any of A, B, C, or D, then
// vertex A will have transitively received alpha = 4 votes
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2})
vdr3 := ids.NewShortID([20]byte{3})
vdr4 := ids.NewShortID([20]byte{4})
vdrs := ids.ShortSet{}
vdrs.Add(vdr1)
vdrs.Add(vdr2)
vdrs.Add(vdr3)
vdrs.Add(vdr4)
poll := poll{
votes: make(ids.UniqueBag),
polled: vdrs,
alpha: alpha,
}
votes1 := []ids.ID{vtxB}
poll.Vote(votes1, vdr1)
if poll.Finished() {
t.Fatalf("Poll finished early after receiving one vote")
}
votes2 := []ids.ID{vtxC}
poll.Vote(votes2, vdr2)
if poll.Finished() {
t.Fatalf("Poll finished early after receiving two votes")
}
votes3 := []ids.ID{vtxD}
poll.Vote(votes3, vdr3)
if poll.Finished() {
t.Fatalf("Poll terminated early, when a shared ancestor could have received alpha votes")
}
votes4 := []ids.ID{vtxA}
poll.Vote(votes4, vdr4)
if !poll.Finished() {
t.Fatalf("Poll did not terminate after receiving all outstanding votes")
}
}

View File

@ -12,6 +12,7 @@ import (
"github.com/ava-labs/gecko/snow/choices"
"github.com/ava-labs/gecko/snow/consensus/avalanche"
"github.com/ava-labs/gecko/snow/consensus/snowstorm"
"github.com/ava-labs/gecko/snow/engine/avalanche/poll"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/ava-labs/gecko/snow/events"
"github.com/ava-labs/gecko/utils/formatting"
@ -31,7 +32,7 @@ type Transitive struct {
Config
bootstrapper
polls polls // track people I have asked for their preference
polls poll.Set // track people I have asked for their preference
// vtxReqs prevents asking validators for the same vertex
vtxReqs common.Requests
@ -57,7 +58,12 @@ func (t *Transitive) Initialize(config Config) error {
t.onFinished = t.finishBootstrapping
t.polls = newPolls(int(config.Params.Alpha), config.Context.Log, t.numPolls)
factory := poll.NewEarlyTermNoTraversalFactory(int(config.Params.Alpha))
t.polls = poll.NewSet(factory,
config.Context.Log,
config.Params.Namespace,
config.Params.Metrics,
)
return t.bootstrapper.Initialize(config.BootstrapConfig)
}
@ -309,7 +315,7 @@ func (t *Transitive) Notify(msg common.Message) error {
}
func (t *Transitive) repoll() error {
if len(t.polls.m) >= t.Params.ConcurrentRepolls || t.errs.Errored() {
if t.polls.Len() >= t.Params.ConcurrentRepolls || t.errs.Errored() {
return nil
}
@ -318,7 +324,7 @@ func (t *Transitive) repoll() error {
return err
}
for i := len(t.polls.m); i < t.Params.ConcurrentRepolls; i++ {
for i := t.polls.Len(); i < t.Params.ConcurrentRepolls; i++ {
if err := t.batch(nil, false /*=force*/, true /*=empty*/); err != nil {
return err
}

View File

@ -13,7 +13,7 @@ type metrics struct {
numPendingRequests, numBlocked prometheus.Gauge
numBootstrapped, numDropped prometheus.Counter
numPolls, numBlkRequests, numBlockedBlk prometheus.Gauge
numBlkRequests, numBlockedBlk prometheus.Gauge
}
// Initialize implements the Engine interface
@ -42,12 +42,6 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr
Name: "sm_bs_dropped",
Help: "Number of dropped bootstrap blocks",
})
m.numPolls = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "sm_polls",
Help: "Number of pending network polls",
})
m.numBlkRequests = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
@ -73,9 +67,6 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr
if err := registerer.Register(m.numDropped); err != nil {
log.Error("Failed to register sm_bs_dropped statistics due to %s", err)
}
if err := registerer.Register(m.numPolls); err != nil {
log.Error("Failed to register sm_polls statistics due to %s", err)
}
if err := registerer.Register(m.numBlkRequests); err != nil {
log.Error("Failed to register sm_blk_requests statistics due to %s", err)
}

View File

@ -0,0 +1,73 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package poll
import (
"fmt"
"github.com/ava-labs/gecko/ids"
)
type earlyTermNoTraversalFactory struct {
alpha int
}
// NewEarlyTermNoTraversalFactory returns a factory that returns polls with
// early termination, without doing DAG traversals
func NewEarlyTermNoTraversalFactory(alpha int) Factory {
return &earlyTermNoTraversalFactory{alpha: alpha}
}
func (f *earlyTermNoTraversalFactory) New(vdrs ids.ShortSet) Poll {
return &earlyTermNoTraversalPoll{
polled: vdrs,
alpha: f.alpha,
}
}
// earlyTermNoTraversalPoll finishes when any remaining validators can't change
// the result of the poll. However, does not terminate tightly with this bound.
// It terminates as quickly as it can without performing any DAG traversals.
type earlyTermNoTraversalPoll struct {
votes ids.Bag
polled ids.ShortSet
alpha int
}
// Vote registers a response for this poll
func (p *earlyTermNoTraversalPoll) Vote(vdr ids.ShortID, vote ids.ID) {
if !p.polled.Contains(vdr) {
// if the validator wasn't polled or already responded to this poll, we
// should just drop the vote
return
}
// make sure that a validator can't respond multiple times
p.polled.Remove(vdr)
// track the votes the validator responded with
p.votes.Add(vote)
}
// Drop any future response for this poll
func (p *earlyTermNoTraversalPoll) Drop(vdr ids.ShortID) {
p.polled.Remove(vdr)
}
// Finished returns true when all validators have voted
func (p *earlyTermNoTraversalPoll) Finished() bool {
remaining := p.polled.Len()
received := p.votes.Len()
_, freq := p.votes.Mode()
return remaining == 0 || // All k nodes responded
freq >= p.alpha || // An alpha majority has returned
received+remaining < p.alpha // An alpha majority can never return
}
// Result returns the result of this poll
func (p *earlyTermNoTraversalPoll) Result() ids.Bag { return p.votes }
func (p *earlyTermNoTraversalPoll) String() string {
return fmt.Sprintf("waiting on %s", p.polled)
}

View File

@ -0,0 +1,205 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package poll
import (
"testing"
"github.com/ava-labs/gecko/ids"
)
func TestEarlyTermNoTraversalResults(t *testing.T) {
alpha := 1
vtxID := ids.NewID([32]byte{1})
vdr1 := ids.NewShortID([20]byte{1}) // k = 1
vdrs := ids.ShortSet{}
vdrs.Add(vdr1)
factory := NewEarlyTermNoTraversalFactory(alpha)
poll := factory.New(vdrs)
poll.Vote(vdr1, vtxID)
if !poll.Finished() {
t.Fatalf("Poll did not terminate after receiving k votes")
}
result := poll.Result()
if list := result.List(); len(list) != 1 {
t.Fatalf("Wrong number of vertices returned")
} else if retVtxID := list[0]; !retVtxID.Equals(vtxID) {
t.Fatalf("Wrong vertex returned")
} else if result.Count(vtxID) != 1 {
t.Fatalf("Wrong number of votes returned")
}
}
func TestEarlyTermNoTraversalString(t *testing.T) {
alpha := 2
vtxID := ids.NewID([32]byte{1})
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2}) // k = 2
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
)
factory := NewEarlyTermNoTraversalFactory(alpha)
poll := factory.New(vdrs)
poll.Vote(vdr1, vtxID)
expected := "waiting on {BaMPFdqMUQ46BV8iRcwbVfsam55kMqcp}"
if result := poll.String(); expected != result {
t.Fatalf("Poll should have returned %s but returned %s", expected, result)
}
}
func TestEarlyTermNoTraversalDropsDuplicatedVotes(t *testing.T) {
alpha := 2
vtxID := ids.NewID([32]byte{1})
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2}) // k = 2
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
)
factory := NewEarlyTermNoTraversalFactory(alpha)
poll := factory.New(vdrs)
poll.Vote(vdr1, vtxID)
if poll.Finished() {
t.Fatalf("Poll finished after less than alpha votes")
}
poll.Vote(vdr1, vtxID)
if poll.Finished() {
t.Fatalf("Poll finished after getting a duplicated vote")
}
poll.Vote(vdr2, vtxID)
if !poll.Finished() {
t.Fatalf("Poll did not terminate after receiving k votes")
}
}
func TestEarlyTermNoTraversalTerminatesEarly(t *testing.T) {
alpha := 3
vtxID := ids.NewID([32]byte{1})
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2})
vdr3 := ids.NewShortID([20]byte{3})
vdr4 := ids.NewShortID([20]byte{4})
vdr5 := ids.NewShortID([20]byte{5}) // k = 5
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
vdr3,
vdr4,
vdr5,
)
factory := NewEarlyTermNoTraversalFactory(alpha)
poll := factory.New(vdrs)
poll.Vote(vdr1, vtxID)
if poll.Finished() {
t.Fatalf("Poll finished after less than alpha votes")
}
poll.Vote(vdr2, vtxID)
if poll.Finished() {
t.Fatalf("Poll finished after less than alpha votes")
}
poll.Vote(vdr3, vtxID)
if !poll.Finished() {
t.Fatalf("Poll did not terminate early after receiving alpha votes for one vertex and none for other vertices")
}
}
func TestEarlyTermNoTraversalForSharedAncestor(t *testing.T) {
alpha := 4
vtxA := ids.NewID([32]byte{1})
vtxB := ids.NewID([32]byte{2})
vtxC := ids.NewID([32]byte{3})
vtxD := ids.NewID([32]byte{4})
// If validators 1-3 vote for frontier vertices
// B, C, and D respectively, which all share the common ancestor
// A, then we cannot terminate early with alpha = k = 4
// If the final vote is cast for any of A, B, C, or D, then
// vertex A will have transitively received alpha = 4 votes
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2})
vdr3 := ids.NewShortID([20]byte{3})
vdr4 := ids.NewShortID([20]byte{4})
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
vdr3,
vdr4,
)
factory := NewEarlyTermNoTraversalFactory(alpha)
poll := factory.New(vdrs)
poll.Vote(vdr1, vtxB)
if poll.Finished() {
t.Fatalf("Poll finished early after receiving one vote")
}
poll.Vote(vdr2, vtxC)
if poll.Finished() {
t.Fatalf("Poll finished early after receiving two votes")
}
poll.Vote(vdr3, vtxD)
if poll.Finished() {
t.Fatalf("Poll terminated early, when a shared ancestor could have received alpha votes")
}
poll.Vote(vdr4, vtxA)
if !poll.Finished() {
t.Fatalf("Poll did not terminate after receiving all outstanding votes")
}
}
func TestEarlyTermNoTraversalWithFastDrops(t *testing.T) {
alpha := 2
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2})
vdr3 := ids.NewShortID([20]byte{3}) // k = 3
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
vdr3,
)
factory := NewEarlyTermNoTraversalFactory(alpha)
poll := factory.New(vdrs)
poll.Drop(vdr1)
if poll.Finished() {
t.Fatalf("Poll finished early after dropping one vote")
}
poll.Drop(vdr2)
if !poll.Finished() {
t.Fatalf("Poll did not terminate after dropping two votes")
}
}

View File

@ -0,0 +1,35 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package poll
import (
"fmt"
"github.com/ava-labs/gecko/ids"
)
// Set is a collection of polls
type Set interface {
fmt.Stringer
Add(requestID uint32, vdrs ids.ShortSet) bool
Vote(requestID uint32, vdr ids.ShortID, vote ids.ID) (ids.Bag, bool)
Drop(requestID uint32, vdr ids.ShortID) (ids.Bag, bool)
Len() int
}
// Poll is an outstanding poll
type Poll interface {
fmt.Stringer
Vote(vdr ids.ShortID, vote ids.ID)
Drop(vdr ids.ShortID)
Finished() bool
Result() ids.Bag
}
// Factory creates a new Poll
type Factory interface {
New(vdrs ids.ShortSet) Poll
}

View File

@ -0,0 +1,55 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package poll
import (
"fmt"
"github.com/ava-labs/gecko/ids"
)
type noEarlyTermFactory struct{}
// NewNoEarlyTermFactory returns a factory that returns polls with no early
// termination
func NewNoEarlyTermFactory() Factory { return noEarlyTermFactory{} }
func (noEarlyTermFactory) New(vdrs ids.ShortSet) Poll {
return &noEarlyTermPoll{polled: vdrs}
}
// noEarlyTermPoll finishes when all polled validators either respond to the
// query or a timeout occurs
type noEarlyTermPoll struct {
votes ids.Bag
polled ids.ShortSet
}
// Vote registers a response for this poll
func (p *noEarlyTermPoll) Vote(vdr ids.ShortID, vote ids.ID) {
if !p.polled.Contains(vdr) {
// if the validator wasn't polled or already responded to this poll, we
// should just drop the vote
return
}
// make sure that a validator can't respond multiple times
p.polled.Remove(vdr)
// track the votes the validator responded with
p.votes.Add(vote)
}
// Drop any future response for this poll
func (p *noEarlyTermPoll) Drop(vdr ids.ShortID) { p.polled.Remove(vdr) }
// Finished returns true when all validators have voted
func (p *noEarlyTermPoll) Finished() bool { return p.polled.Len() == 0 }
// Result returns the result of this poll
func (p *noEarlyTermPoll) Result() ids.Bag { return p.votes }
func (p *noEarlyTermPoll) String() string {
return fmt.Sprintf("waiting on %s", p.polled)
}

View File

@ -0,0 +1,92 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package poll
import (
"testing"
"github.com/ava-labs/gecko/ids"
)
func TestNoEarlyTermResults(t *testing.T) {
vtxID := ids.NewID([32]byte{1})
vdr1 := ids.NewShortID([20]byte{1}) // k = 1
vdrs := ids.ShortSet{}
vdrs.Add(vdr1)
factory := NewNoEarlyTermFactory()
poll := factory.New(vdrs)
poll.Vote(vdr1, vtxID)
if !poll.Finished() {
t.Fatalf("Poll did not terminate after receiving k votes")
}
result := poll.Result()
if list := result.List(); len(list) != 1 {
t.Fatalf("Wrong number of vertices returned")
} else if retVtxID := list[0]; !retVtxID.Equals(vtxID) {
t.Fatalf("Wrong vertex returned")
} else if result.Count(vtxID) != 1 {
t.Fatalf("Wrong number of votes returned")
}
}
func TestNoEarlyTermString(t *testing.T) {
vtxID := ids.NewID([32]byte{1})
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2}) // k = 2
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
)
factory := NewNoEarlyTermFactory()
poll := factory.New(vdrs)
poll.Vote(vdr1, vtxID)
expected := "waiting on {BaMPFdqMUQ46BV8iRcwbVfsam55kMqcp}"
if result := poll.String(); expected != result {
t.Fatalf("Poll should have returned %s but returned %s", expected, result)
}
}
func TestNoEarlyTermDropsDuplicatedVotes(t *testing.T) {
vtxID := ids.NewID([32]byte{1})
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2}) // k = 2
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
)
factory := NewNoEarlyTermFactory()
poll := factory.New(vdrs)
poll.Vote(vdr1, vtxID)
if poll.Finished() {
t.Fatalf("Poll finished after less than alpha votes")
}
poll.Vote(vdr1, vtxID)
if poll.Finished() {
t.Fatalf("Poll finished after getting a duplicated vote")
}
poll.Drop(vdr1)
if poll.Finished() {
t.Fatalf("Poll finished after getting a duplicated vote")
}
poll.Vote(vdr2, vtxID)
if !poll.Finished() {
t.Fatalf("Poll did not terminate after receiving k votes")
}
}

View File

@ -0,0 +1,158 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package poll
import (
"fmt"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/timer"
)
type poll struct {
Poll
start time.Time
}
type set struct {
log logging.Logger
numPolls prometheus.Gauge
durPolls prometheus.Histogram
factory Factory
polls map[uint32]poll
}
// NewSet returns a new empty set of polls
func NewSet(
factory Factory,
log logging.Logger,
namespace string,
registerer prometheus.Registerer,
) Set {
numPolls := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "polls",
Help: "Number of pending network polls",
})
if err := registerer.Register(numPolls); err != nil {
log.Error("failed to register polls statistics due to %s", err)
}
durPolls := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Name: "poll_duration",
Help: "Length of time the poll existed in milliseconds",
Buckets: timer.MillisecondsBuckets,
})
if err := registerer.Register(durPolls); err != nil {
log.Error("failed to register poll_duration statistics due to %s", err)
}
return &set{
log: log,
numPolls: numPolls,
durPolls: durPolls,
factory: factory,
polls: make(map[uint32]poll),
}
}
// Add to the current set of polls
// Returns true if the poll was registered correctly and the network sample
// should be made.
func (s *set) Add(requestID uint32, vdrs ids.ShortSet) bool {
if _, exists := s.polls[requestID]; exists {
s.log.Debug("dropping poll due to duplicated requestID: %d", requestID)
return false
}
s.log.Verbo("creating poll with requestID %d and validators %s",
requestID,
vdrs)
s.polls[requestID] = poll{
Poll: s.factory.New(vdrs), // create the new poll
start: time.Now(),
}
s.numPolls.Inc() // increase the metrics
return true
}
// Vote registers the connections response to a query for [id]. If there was no
// query, or the response has already be registered, nothing is performed.
func (s *set) Vote(
requestID uint32,
vdr ids.ShortID,
vote ids.ID,
) (ids.Bag, bool) {
poll, exists := s.polls[requestID]
if !exists {
s.log.Verbo("dropping vote from %s to an unknown poll with requestID: %d",
vdr,
requestID)
return ids.Bag{}, false
}
s.log.Verbo("processing vote from %s in the poll with requestID: %d with the vote %s",
vdr,
requestID,
vote)
poll.Vote(vdr, vote)
if !poll.Finished() {
return ids.Bag{}, false
}
s.log.Verbo("poll with requestID %d finished as %s", requestID, poll)
delete(s.polls, requestID) // remove the poll from the current set
s.durPolls.Observe(float64(time.Now().Sub(poll.start).Milliseconds()))
s.numPolls.Dec() // decrease the metrics
return poll.Result(), true
}
// Drop registers the connections response to a query for [id]. If there was no
// query, or the response has already be registered, nothing is performed.
func (s *set) Drop(requestID uint32, vdr ids.ShortID) (ids.Bag, bool) {
poll, exists := s.polls[requestID]
if !exists {
s.log.Verbo("dropping vote from %s to an unknown poll with requestID: %d",
vdr,
requestID)
return ids.Bag{}, false
}
s.log.Verbo("processing dropped vote from %s in the poll with requestID: %d",
vdr,
requestID)
poll.Drop(vdr)
if !poll.Finished() {
return ids.Bag{}, false
}
s.log.Verbo("poll with requestID %d finished as %s", requestID, poll)
delete(s.polls, requestID) // remove the poll from the current set
s.durPolls.Observe(float64(time.Now().Sub(poll.start).Milliseconds()))
s.numPolls.Dec() // decrease the metrics
return poll.Result(), true
}
// Len returns the number of outstanding polls
func (s *set) Len() int { return len(s.polls) }
func (s *set) String() string {
sb := strings.Builder{}
sb.WriteString(fmt.Sprintf("current polls: (Size = %d)", len(s.polls)))
for requestID, poll := range s.polls {
sb.WriteString(fmt.Sprintf("\n %d: %s", requestID, poll))
}
return sb.String()
}

View File

@ -0,0 +1,135 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package poll
import (
"testing"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/utils/logging"
"github.com/prometheus/client_golang/prometheus"
)
func TestNewSetErrorOnMetrics(t *testing.T) {
factory := NewNoEarlyTermFactory()
log := logging.NoLog{}
namespace := ""
registerer := prometheus.NewRegistry()
registerer.Register(prometheus.NewCounter(prometheus.CounterOpts{
Name: "polls",
}))
registerer.Register(prometheus.NewCounter(prometheus.CounterOpts{
Name: "poll_duration",
}))
_ = NewSet(factory, log, namespace, registerer)
}
func TestCreateAndFinishSuccessfulPoll(t *testing.T) {
factory := NewNoEarlyTermFactory()
log := logging.NoLog{}
namespace := ""
registerer := prometheus.NewRegistry()
s := NewSet(factory, log, namespace, registerer)
vtxID := ids.NewID([32]byte{1})
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2}) // k = 2
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
)
if s.Len() != 0 {
t.Fatalf("Shouldn't have any active polls yet")
} else if !s.Add(0, vdrs) {
t.Fatalf("Should have been able to add a new poll")
} else if s.Len() != 1 {
t.Fatalf("Should only have one active poll")
} else if s.Add(0, vdrs) {
t.Fatalf("Shouldn't have been able to add a duplicated poll")
} else if s.Len() != 1 {
t.Fatalf("Should only have one active poll")
} else if _, finished := s.Vote(1, vdr1, vtxID); finished {
t.Fatalf("Shouldn't have been able to finish a non-existant poll")
} else if _, finished := s.Vote(0, vdr1, vtxID); finished {
t.Fatalf("Shouldn't have been able to finish an ongoing poll")
} else if _, finished := s.Vote(0, vdr1, vtxID); finished {
t.Fatalf("Should have dropped a duplicated poll")
} else if result, finished := s.Vote(0, vdr2, vtxID); !finished {
t.Fatalf("Should have finished the")
} else if list := result.List(); len(list) != 1 {
t.Fatalf("Wrong number of vertices returned")
} else if retVtxID := list[0]; !retVtxID.Equals(vtxID) {
t.Fatalf("Wrong vertex returned")
} else if result.Count(vtxID) != 2 {
t.Fatalf("Wrong number of votes returned")
}
}
func TestCreateAndFinishFailedPoll(t *testing.T) {
factory := NewNoEarlyTermFactory()
log := logging.NoLog{}
namespace := ""
registerer := prometheus.NewRegistry()
s := NewSet(factory, log, namespace, registerer)
vdr1 := ids.NewShortID([20]byte{1})
vdr2 := ids.NewShortID([20]byte{2}) // k = 2
vdrs := ids.ShortSet{}
vdrs.Add(
vdr1,
vdr2,
)
if s.Len() != 0 {
t.Fatalf("Shouldn't have any active polls yet")
} else if !s.Add(0, vdrs) {
t.Fatalf("Should have been able to add a new poll")
} else if s.Len() != 1 {
t.Fatalf("Should only have one active poll")
} else if s.Add(0, vdrs) {
t.Fatalf("Shouldn't have been able to add a duplicated poll")
} else if s.Len() != 1 {
t.Fatalf("Should only have one active poll")
} else if _, finished := s.Drop(1, vdr1); finished {
t.Fatalf("Shouldn't have been able to finish a non-existant poll")
} else if _, finished := s.Drop(0, vdr1); finished {
t.Fatalf("Shouldn't have been able to finish an ongoing poll")
} else if _, finished := s.Drop(0, vdr1); finished {
t.Fatalf("Should have dropped a duplicated poll")
} else if result, finished := s.Drop(0, vdr2); !finished {
t.Fatalf("Should have finished the")
} else if list := result.List(); len(list) != 0 {
t.Fatalf("Wrong number of vertices returned")
}
}
func TestSetString(t *testing.T) {
factory := NewNoEarlyTermFactory()
log := logging.NoLog{}
namespace := ""
registerer := prometheus.NewRegistry()
s := NewSet(factory, log, namespace, registerer)
vdr1 := ids.NewShortID([20]byte{1}) // k = 1
vdrs := ids.ShortSet{}
vdrs.Add(vdr1)
expected := "current polls: (Size = 1)\n" +
" 0: waiting on {6HgC8KRBEhXYbF4riJyJFLSHt37UNuRt}"
if !s.Add(0, vdrs) {
t.Fatalf("Should have been able to add a new poll")
} else if str := s.String(); expected != str {
t.Fatalf("Set return wrong string, Expected:\n%s\nReturned:\n%s",
expected,
str)
}
}

View File

@ -1,115 +0,0 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package snowman
import (
"fmt"
"strings"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/utils/logging"
"github.com/prometheus/client_golang/prometheus"
)
type polls struct {
log logging.Logger
numPolls prometheus.Gauge
alpha int
m map[uint32]poll
}
// Add to the current set of polls
// Returns true if the poll was registered correctly and the network sample
// should be made.
func (p *polls) Add(requestID uint32, vdrs ids.ShortSet) bool {
poll, exists := p.m[requestID]
if !exists {
poll.alpha = p.alpha
poll.polled = vdrs
p.m[requestID] = poll
p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics
}
return !exists
}
// Vote registers the connections response to a query for [id]. If there was no
// query, or the response has already be registered, nothing is performed.
func (p *polls) Vote(requestID uint32, vdr ids.ShortID, vote ids.ID) (ids.Bag, bool) {
p.log.Verbo("[polls.Vote] Vote: requestID: %d. validatorID: %s. Vote: %s", requestID, vdr, vote)
poll, exists := p.m[requestID]
if !exists {
return ids.Bag{}, false
}
poll.Vote(vote, vdr)
if poll.Finished() {
delete(p.m, requestID)
p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics
return poll.votes, true
}
p.m[requestID] = poll
return ids.Bag{}, false
}
// CancelVote registers the connections failure to respond to a query for [id].
func (p *polls) CancelVote(requestID uint32, vdr ids.ShortID) (ids.Bag, bool) {
p.log.Verbo("CancelVote received. requestID: %d. validatorID: %s. Vote: %s", requestID, vdr)
poll, exists := p.m[requestID]
if !exists {
return ids.Bag{}, false
}
poll.CancelVote(vdr)
if poll.Finished() {
delete(p.m, requestID)
p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics
return poll.votes, true
}
p.m[requestID] = poll
return ids.Bag{}, false
}
func (p *polls) String() string {
sb := strings.Builder{}
sb.WriteString(fmt.Sprintf("Current polls: (Size = %d)", len(p.m)))
for requestID, poll := range p.m {
sb.WriteString(fmt.Sprintf("\n %d: %s", requestID, poll))
}
return sb.String()
}
// poll represents the current state of a network poll for a block
type poll struct {
alpha int
votes ids.Bag
polled ids.ShortSet
}
// Vote registers a vote for this poll
func (p *poll) CancelVote(vdr ids.ShortID) { p.polled.Remove(vdr) }
// Vote registers a vote for this poll
func (p *poll) Vote(vote ids.ID, vdr ids.ShortID) {
if p.polled.Contains(vdr) {
p.polled.Remove(vdr)
p.votes.Add(vote)
}
}
// Finished returns true if the poll has completed, with no more required
// responses
func (p poll) Finished() bool {
remaining := p.polled.Len()
received := p.votes.Len()
_, freq := p.votes.Mode()
return remaining == 0 || // All k nodes responded
freq >= p.alpha || // An alpha majority has returned
received+remaining < p.alpha // An alpha majority can never return
}
func (p poll) String() string {
return fmt.Sprintf("Waiting on %d chits from %s", p.polled.Len(), p.polled)
}

View File

@ -12,6 +12,7 @@ import (
"github.com/ava-labs/gecko/snow/choices"
"github.com/ava-labs/gecko/snow/consensus/snowman"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/ava-labs/gecko/snow/engine/snowman/poll"
"github.com/ava-labs/gecko/snow/events"
"github.com/ava-labs/gecko/utils/formatting"
"github.com/ava-labs/gecko/utils/wrappers"
@ -30,7 +31,7 @@ type Transitive struct {
bootstrapper
// track outstanding preference requests
polls polls
polls poll.Set
// blocks that have outstanding get requests
blkReqs common.Requests
@ -64,10 +65,12 @@ func (t *Transitive) Initialize(config Config) error {
t.onFinished = t.finishBootstrapping
t.polls.log = config.Context.Log
t.polls.numPolls = t.numPolls
t.polls.alpha = t.Params.Alpha
t.polls.m = make(map[uint32]poll)
factory := poll.NewEarlyTermNoTraversalFactory(int(config.Params.Alpha))
t.polls = poll.NewSet(factory,
config.Context.Log,
config.Params.Namespace,
config.Params.Metrics,
)
return t.bootstrapper.Initialize(config.BootstrapConfig)
}
@ -409,7 +412,7 @@ func (t *Transitive) repoll() {
// propagate the most likely branch as quickly as possible
prefID := t.Consensus.Preference()
for i := len(t.polls.m); i < t.Params.ConcurrentRepolls; i++ {
for i := t.polls.Len(); i < t.Params.ConcurrentRepolls; i++ {
t.pullSample(prefID)
}
}

View File

@ -810,13 +810,13 @@ func TestVoteCanceling(t *testing.T) {
te.insert(blk)
if len(te.polls.m) != 1 {
if te.polls.Len() != 1 {
t.Fatalf("Shouldn't have finished blocking issue")
}
te.QueryFailed(vdr0.ID(), *queryRequestID)
if len(te.polls.m) != 1 {
if te.polls.Len() != 1 {
t.Fatalf("Shouldn't have finished blocking issue")
}

View File

@ -32,7 +32,7 @@ func (v *voter) Update() {
results := ids.Bag{}
finished := false
if v.response.IsZero() {
results, finished = v.t.polls.CancelVote(v.requestID, v.vdr)
results, finished = v.t.polls.Drop(v.requestID, v.vdr)
} else {
results, finished = v.t.polls.Vote(v.requestID, v.vdr, v.response)
}