Merge pull request #34 from bb-2/concurrent_repolling

Add ConcurrentRepolls to make the polling throughput configurable
This commit is contained in:
Stephen Buttolph 2020-03-30 20:34:54 -04:00 committed by GitHub
commit e0e11b939f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 160 additions and 105 deletions

View File

@ -93,6 +93,7 @@ func init() {
fs.IntVar(&Config.ConsensusParams.BetaRogue, "snow-rogue-commit-threshold", 30, "Beta value to use for rogue transactions")
fs.IntVar(&Config.ConsensusParams.Parents, "snow-avalanche-num-parents", 5, "Number of vertexes for reference from each new vertex")
fs.IntVar(&Config.ConsensusParams.BatchSize, "snow-avalanche-batch-size", 30, "Number of operations to batch in each new vertex")
fs.IntVar(&Config.ConsensusParams.ConcurrentRepolls, "snow-concurrent-repolls", 1, "Minimum number of concurrent polls for finalizing consensus")
// Enable/Disable APIs:
fs.BoolVar(&Config.AdminAPIEnabled, "api-admin-enabled", true, "If true, this node exposes the Admin API")

View File

@ -12,10 +12,11 @@ import (
func TestParametersValid(t *testing.T) {
p := Parameters{
Parameters: snowball.Parameters{
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 1,
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 1,
ConcurrentRepolls: 1,
},
Parents: 2,
BatchSize: 1,
@ -29,10 +30,11 @@ func TestParametersValid(t *testing.T) {
func TestParametersInvalidParents(t *testing.T) {
p := Parameters{
Parameters: snowball.Parameters{
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 1,
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 1,
ConcurrentRepolls: 1,
},
Parents: 1,
BatchSize: 1,
@ -46,10 +48,11 @@ func TestParametersInvalidParents(t *testing.T) {
func TestParametersInvalidBatchSize(t *testing.T) {
p := Parameters{
Parameters: snowball.Parameters{
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 1,
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 1,
ConcurrentRepolls: 1,
},
Parents: 2,
BatchSize: 0,

View File

@ -27,11 +27,12 @@ func TestTopologicalTxIssued(t *testing.T) { TxIssuedTest(t, TopologicalFactory{
func TestAvalancheVoting(t *testing.T) {
params := Parameters{
Parameters: snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 2,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
Metrics: prometheus.NewRegistry(),
K: 2,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
ConcurrentRepolls: 1,
},
Parents: 2,
BatchSize: 1,
@ -106,11 +107,12 @@ func TestAvalancheVoting(t *testing.T) {
func TestAvalancheTransitiveVoting(t *testing.T) {
params := Parameters{
Parameters: snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 2,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
Metrics: prometheus.NewRegistry(),
K: 2,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
ConcurrentRepolls: 1,
},
Parents: 2,
BatchSize: 1,
@ -199,11 +201,12 @@ func TestAvalancheTransitiveVoting(t *testing.T) {
func TestAvalancheSplitVoting(t *testing.T) {
params := Parameters{
Parameters: snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 2,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
Metrics: prometheus.NewRegistry(),
K: 2,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
ConcurrentRepolls: 1,
},
Parents: 2,
BatchSize: 1,
@ -262,11 +265,12 @@ func TestAvalancheSplitVoting(t *testing.T) {
func TestAvalancheTransitiveRejection(t *testing.T) {
params := Parameters{
Parameters: snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 2,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
Metrics: prometheus.NewRegistry(),
K: 2,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
ConcurrentRepolls: 1,
},
Parents: 2,
BatchSize: 1,
@ -363,11 +367,12 @@ func TestAvalancheTransitiveRejection(t *testing.T) {
func TestAvalancheVirtuous(t *testing.T) {
params := Parameters{
Parameters: snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 2,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
Metrics: prometheus.NewRegistry(),
K: 2,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
ConcurrentRepolls: 1,
},
Parents: 2,
BatchSize: 1,
@ -484,11 +489,12 @@ func TestAvalancheVirtuous(t *testing.T) {
func TestAvalancheIsVirtuous(t *testing.T) {
params := Parameters{
Parameters: snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 2,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
Metrics: prometheus.NewRegistry(),
K: 2,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
ConcurrentRepolls: 1,
},
Parents: 2,
BatchSize: 1,
@ -567,11 +573,12 @@ func TestAvalancheIsVirtuous(t *testing.T) {
func TestAvalancheQuiesce(t *testing.T) {
params := Parameters{
Parameters: snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 1,
Metrics: prometheus.NewRegistry(),
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 1,
ConcurrentRepolls: 1,
},
Parents: 2,
BatchSize: 1,
@ -660,11 +667,12 @@ func TestAvalancheQuiesce(t *testing.T) {
func TestAvalancheOrphans(t *testing.T) {
params := Parameters{
Parameters: snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 1,
Alpha: 1,
BetaVirtuous: math.MaxInt32,
BetaRogue: math.MaxInt32,
Metrics: prometheus.NewRegistry(),
K: 1,
Alpha: 1,
BetaVirtuous: math.MaxInt32,
BetaRogue: math.MaxInt32,
ConcurrentRepolls: 1,
},
Parents: 2,
BatchSize: 1,

View File

@ -22,7 +22,7 @@ func ParamsTest(t *testing.T, factory Factory) {
params := Parameters{
Metrics: prometheus.NewRegistry(),
K: 2, Alpha: 2, BetaVirtuous: 1, BetaRogue: 2,
K: 2, Alpha: 2, BetaVirtuous: 1, BetaRogue: 2, ConcurrentRepolls: 1,
}
sb.Initialize(params, Red)
@ -34,5 +34,7 @@ func ParamsTest(t *testing.T, factory Factory) {
t.Fatalf("Wrong Beta1 parameter")
} else if p.BetaRogue != params.BetaRogue {
t.Fatalf("Wrong Beta2 parameter")
} else if p.ConcurrentRepolls != params.ConcurrentRepolls {
t.Fatalf("Wrong Repoll parameter")
}
}

View File

@ -11,9 +11,9 @@ import (
// Parameters required for snowball consensus
type Parameters struct {
Namespace string
Metrics prometheus.Registerer
K, Alpha, BetaVirtuous, BetaRogue int
Namespace string
Metrics prometheus.Registerer
K, Alpha, BetaVirtuous, BetaRogue, ConcurrentRepolls int
}
// Valid returns nil if the parameters describe a valid initialization.
@ -27,6 +27,10 @@ func (p Parameters) Valid() error {
return fmt.Errorf("BetaVirtuous = %d: Fails the condition that: 0 < BetaVirtuous", p.BetaVirtuous)
case p.BetaRogue < p.BetaVirtuous:
return fmt.Errorf("BetaVirtuous = %d, BetaRogue = %d: Fails the condition that: BetaVirtuous <= BetaRogue", p.BetaVirtuous, p.BetaRogue)
case p.ConcurrentRepolls <= 0:
return fmt.Errorf("ConcurrentRepolls = %d: Fails the condition that: 0 < ConcurrentRepolls", p.ConcurrentRepolls)
case p.ConcurrentRepolls > p.BetaRogue:
return fmt.Errorf("ConcurrentRepolls = %d, BetaRogue = %d: Fails the condition that: ConcurrentRepolls <= BetaRogue", p.ConcurrentRepolls, p.BetaRogue)
default:
return nil
}

View File

@ -9,10 +9,11 @@ import (
func TestParametersValid(t *testing.T) {
p := Parameters{
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 1,
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 1,
ConcurrentRepolls: 1,
}
if err := p.Valid(); err != nil {
@ -22,10 +23,11 @@ func TestParametersValid(t *testing.T) {
func TestParametersInvalidK(t *testing.T) {
p := Parameters{
K: 0,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 1,
K: 0,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 1,
ConcurrentRepolls: 1,
}
if err := p.Valid(); err == nil {
@ -35,10 +37,11 @@ func TestParametersInvalidK(t *testing.T) {
func TestParametersInvalidAlpha(t *testing.T) {
p := Parameters{
K: 1,
Alpha: 0,
BetaVirtuous: 1,
BetaRogue: 1,
K: 1,
Alpha: 0,
BetaVirtuous: 1,
BetaRogue: 1,
ConcurrentRepolls: 1,
}
if err := p.Valid(); err == nil {
@ -48,10 +51,11 @@ func TestParametersInvalidAlpha(t *testing.T) {
func TestParametersInvalidBetaVirtuous(t *testing.T) {
p := Parameters{
K: 1,
Alpha: 1,
BetaVirtuous: 0,
BetaRogue: 1,
K: 1,
Alpha: 1,
BetaVirtuous: 0,
BetaRogue: 1,
ConcurrentRepolls: 1,
}
if err := p.Valid(); err == nil {
@ -61,13 +65,29 @@ func TestParametersInvalidBetaVirtuous(t *testing.T) {
func TestParametersInvalidBetaRogue(t *testing.T) {
p := Parameters{
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 0,
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 0,
ConcurrentRepolls: 1,
}
if err := p.Valid(); err == nil {
t.Fatalf("Should have failed due to invalid beta rogue")
}
}
func TestParametersInvalidConcurrentRepolls(t *testing.T) {
p := Parameters{
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 1,
ConcurrentRepolls: 2,
}
if err := p.Valid(); err == nil {
t.Fatalf("Should have failed due to invalid concurrent repolls")
}
}

View File

@ -26,11 +26,12 @@ func DefaultConfig() Config {
},
Params: avalanche.Parameters{
Parameters: snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 2,
Metrics: prometheus.NewRegistry(),
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 2,
ConcurrentRepolls: 1,
},
Parents: 2,
BatchSize: 1,

View File

@ -65,8 +65,10 @@ func (i *issuer) Update() {
}
i.t.RequestID++
polled := false
if numVdrs := len(vdrs); numVdrs == p.K && i.t.polls.Add(i.t.RequestID, vdrSet.Len()) {
i.t.Config.Sender.PushQuery(vdrSet, i.t.RequestID, vtxID, i.vtx.Bytes())
polled = true
} else if numVdrs < p.K {
i.t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", vtxID)
}
@ -75,6 +77,10 @@ func (i *issuer) Update() {
for _, tx := range i.vtx.Txs() {
i.t.txBlocked.Fulfill(tx.ID())
}
if polled && len(i.t.polls.m) < i.t.Params.ConcurrentRepolls {
i.t.repoll()
}
}
type vtxIssuer struct{ i *issuer }

View File

@ -340,11 +340,12 @@ func TestEngineMultipleQuery(t *testing.T) {
config.Params = avalanche.Parameters{
Parameters: snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 3,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
Metrics: prometheus.NewRegistry(),
K: 3,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
ConcurrentRepolls: 1,
},
Parents: 2,
BatchSize: 1,

View File

@ -58,7 +58,7 @@ func (v *voter) Update() {
v.t.Config.Context.Log.Verbo("Avalanche engine can't quiesce")
if len(v.t.polls.m) == 0 {
if len(v.t.polls.m) < v.t.Config.Params.ConcurrentRepolls {
v.t.repoll()
}
}

View File

@ -23,10 +23,11 @@ func DefaultConfig() Config {
},
Params: snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 2,
K: 1,
Alpha: 1,
BetaVirtuous: 1,
BetaRogue: 2,
ConcurrentRepolls: 1,
},
Consensus: &snowman.Topological{},
}

View File

@ -305,7 +305,7 @@ func (t *Transitive) pullSample(blkID ids.ID) {
}
}
func (t *Transitive) pushSample(blk snowman.Block) {
func (t *Transitive) pushSample(blk snowman.Block) bool {
t.Config.Context.Log.Verbo("About to sample from: %s", t.Config.Validators)
p := t.Consensus.Parameters()
vdrs := t.Config.Validators.Sample(p.K)
@ -315,11 +315,14 @@ func (t *Transitive) pushSample(blk snowman.Block) {
}
t.RequestID++
queryIssued := false
if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet.Len()) {
t.Config.Sender.PushQuery(vdrSet, t.RequestID, blk.ID(), blk.Bytes())
queryIssued = true
} else if numVdrs < p.K {
t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", blk.ID())
}
return queryIssued
}
func (t *Transitive) deliver(blk snowman.Block) {
@ -338,9 +341,8 @@ func (t *Transitive) deliver(blk snowman.Block) {
}
t.Config.Context.Log.Verbo("Adding block to consensus: %s", blkID)
t.Consensus.Add(blk)
t.pushSample(blk)
polled := t.pushSample(blk)
added := []snowman.Block{}
dropped := []snowman.Block{}
@ -373,6 +375,10 @@ func (t *Transitive) deliver(blk snowman.Block) {
t.blocked.Abandon(blkID)
}
if polled && len(t.polls.m) < t.Params.ConcurrentRepolls {
t.repoll()
}
// Tracks performance statistics
t.numBlkRequests.Set(float64(t.blkReqs.Len()))
t.numBlockedBlk.Set(float64(t.pending.Len()))

View File

@ -304,11 +304,12 @@ func TestEngineMultipleQuery(t *testing.T) {
config := DefaultConfig()
config.Params = snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 3,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
Metrics: prometheus.NewRegistry(),
K: 3,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
ConcurrentRepolls: 1,
}
vdr0 := validators.GenerateRandomValidator(1)
@ -672,11 +673,12 @@ func TestVoteCanceling(t *testing.T) {
config := DefaultConfig()
config.Params = snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 3,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
Metrics: prometheus.NewRegistry(),
K: 3,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
ConcurrentRepolls: 1,
}
vdr0 := validators.GenerateRandomValidator(1)

View File

@ -53,7 +53,7 @@ func (v *voter) Update() {
v.t.Config.Context.Log.Verbo("Snowman engine can't quiesce")
if len(v.t.polls.m) == 0 {
if len(v.t.polls.m) < v.t.Config.Params.ConcurrentRepolls {
v.t.repoll()
}
}