Merge branch 'proposal-slot-scaling'

This commit is contained in:
Jim McDonald 2020-11-14 12:26:53 +00:00
commit 9087add881
No known key found for this signature in database
GPG Key ID: 89CEB61B2AD2A5E7
12 changed files with 271 additions and 27 deletions

View File

@ -1,4 +1,5 @@
Development
- beacon block proposal strategy now scales per-node scores based on the distance between the slot and its parent
- add a default process concurrency for strategies
- fix race condition in "first" beacon block proposal strategy
- tidy up trace logging for scheduler

2
go.mod
View File

@ -46,3 +46,5 @@ require (
google.golang.org/grpc v1.33.2
gotest.tools v2.2.0+incompatible
)
replace github.com/attestantio/go-eth2-client => ../go-eth2-client

1
go.sum
View File

@ -68,6 +68,7 @@ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
github.com/attestantio/dirk v0.9.1 h1:oof1Xm0uI4a2T9vhQB+f3Wjlngd2rnfsKi8aj1wqNh0=
github.com/attestantio/dirk v0.9.1/go.mod h1:oWsyIb/OXdx9pvDQqS3hdFBB1eFaYnrNjuvLtVwo69w=
github.com/attestantio/dirk v0.9.3 h1:hJj/X63n7UV+DseKlR8Kjs+zLYtf+2Alqk9A6nI8mUg=
github.com/attestantio/dirk v0.9.3/go.mod h1:EfppeT+VjQXnE9Ti5/vxa6ptZJAN2vMXO6KZojvSOXA=
github.com/attestantio/go-eth2-client v0.6.9/go.mod h1:ODAZ4yS1YYYew/EsgGsVb/siNEoa505CrGsvlVFdkfo=
github.com/attestantio/go-eth2-client v0.6.10 h1:PMNBMLk6xfMEUqhaUnsI0/HZRrstZF18Gt6Dm5GelW4=

View File

@ -602,6 +602,7 @@ func selectBeaconBlockProposalProvider(ctx context.Context,
bestbeaconblockproposalstrategy.WithProcessConcurrency(viper.GetInt64("process-concurrency")),
bestbeaconblockproposalstrategy.WithLogLevel(logLevel(viper.GetString("strategies.beaconblockproposal.log-level"))),
bestbeaconblockproposalstrategy.WithBeaconBlockProposalProviders(beaconBlockProposalProviders),
bestbeaconblockproposalstrategy.WithSignedBeaconBlockProvider(eth2Client.(eth2client.SignedBeaconBlockProvider)),
)
if err != nil {
return nil, errors.Wrap(err, "failed to start best beacon block proposal strategy")

View File

@ -75,7 +75,7 @@ func (s *Service) Prepare(ctx context.Context, data interface{}) error {
if !ok {
return errors.New("passed invalid data structure")
}
log := log.With().Uint64("slot", uint64(duty.Slot())).Uint64("validator_index", uint64(duty.ValidatorIndex())).Logger()
log := log.With().Uint64("proposing_slot", uint64(duty.Slot())).Uint64("validator_index", uint64(duty.ValidatorIndex())).Logger()
log.Trace().Msg("Preparing")
// Fetch the validating account.
@ -114,7 +114,7 @@ func (s *Service) Propose(ctx context.Context, data interface{}) {
s.monitor.BeaconBlockProposalCompleted(started, "failed")
return
}
log := log.With().Uint64("slot", uint64(duty.Slot())).Uint64("validator_index", uint64(duty.ValidatorIndex())).Logger()
log := log.With().Uint64("proposing_slot", uint64(duty.Slot())).Uint64("validator_index", uint64(duty.ValidatorIndex())).Logger()
log.Trace().Msg("Proposing")
var graffiti []byte

View File

@ -16,6 +16,7 @@ package standard
import (
"context"
"fmt"
"time"
api "github.com/attestantio/go-eth2-client/api/v1"
spec "github.com/attestantio/go-eth2-client/spec/phase0"
@ -98,6 +99,7 @@ func (s *Service) createAttesterJobs(ctx context.Context,
// AttestAndScheduleAggregate attests, then schedules aggregation jobs as required.
func (s *Service) AttestAndScheduleAggregate(ctx context.Context, data interface{}) {
started := time.Now()
duty, ok := data.(*attester.Duty)
if !ok {
log.Error().Msg("Passed invalid data")
@ -109,6 +111,7 @@ func (s *Service) AttestAndScheduleAggregate(ctx context.Context, data interface
log.Warn().Err(err).Msg("Failed to attest")
return
}
log.Trace().Dur("elapsed", time.Since(started)).Msg("Attested")
if len(attestations) == 0 || attestations[0].Data == nil {
log.Debug().Msg("No attestations; nothing to aggregate")
@ -125,7 +128,7 @@ func (s *Service) AttestAndScheduleAggregate(ctx context.Context, data interface
}
for _, attestation := range attestations {
log := log.With().Uint64("attestation_slot", uint64(attestation.Data.Slot)).Logger()
log := log.With().Uint64("attestation_slot", uint64(attestation.Data.Slot)).Uint64("committee_index", uint64(attestation.Data.Index)).Logger()
slotInfoMap, exists := subscriptionInfoMap[attestation.Data.Slot]
if !exists {
log.Debug().Msg("No slot info; not aggregating")

View File

@ -65,8 +65,12 @@ func (s *Service) createProposerJobs(ctx context.Context,
currentSlot := s.chainTimeService.CurrentSlot()
for _, duty := range duties {
// Do not schedule proposals for past slots (or the current slot if we've just started).
if duty.Slot() < currentSlot || firstRun && duty.Slot() == currentSlot {
log.Debug().Uint64("proposal_slot", uint64(duty.Slot())).Uint64("current_slot", uint64(currentSlot)).Msg("Proposal in the past; not scheduling")
if duty.Slot() < currentSlot {
log.Debug().Uint64("proposal_slot", uint64(duty.Slot())).Uint64("current_slot", uint64(currentSlot)).Msg("Proposal for a past slot; not scheduling")
continue
}
if firstRun && duty.Slot() == currentSlot {
log.Debug().Uint64("proposal_slot", uint64(duty.Slot())).Uint64("current_slot", uint64(currentSlot)).Msg("Proposal for the current slot and this is our first run; not scheduling")
continue
}
go func(duty *beaconblockproposer.Duty) {

View File

@ -15,6 +15,7 @@ package best
import (
"context"
"fmt"
"sync"
"time"
@ -56,8 +57,23 @@ func (s *Service) BeaconBlockProposal(ctx context.Context, slot spec.Slot, randa
log.Trace().Dur("elapsed", time.Since(started)).Msg("Obtained beacon block proposal")
cancel()
// Obtain the slot of the block to which the proposal refers.
// We use this to allow the scorer to score blocks with earlier parents lower.
var parentSlot spec.Slot
parentBlock, err := s.signedBeaconBlockProvider.SignedBeaconBlock(ctx, fmt.Sprintf("%#x", proposal.ParentRoot[:]))
switch {
case err != nil:
log.Warn().Err(err).Msg("Failed to obtain parent block")
parentSlot = proposal.Slot - 1
case parentBlock == nil:
log.Warn().Err(err).Msg("Failed to obtain parent block")
parentSlot = proposal.Slot - 1
default:
parentSlot = parentBlock.Message.Slot
}
mu.Lock()
score := scoreBeaconBlockProposal(ctx, name, proposal)
score := scoreBeaconBlockProposal(ctx, name, parentSlot, proposal)
if score > bestScore || bestProposal == nil {
bestScore = score
bestProposal = proposal

View File

@ -32,6 +32,7 @@ type parameters struct {
clientMonitor metrics.ClientMonitor
processConcurrency int64
beaconBlockProposalProviders map[string]eth2client.BeaconBlockProposalProvider
signedBeaconBlockProvider eth2client.SignedBeaconBlockProvider
timeout time.Duration
}
@ -53,6 +54,13 @@ func WithLogLevel(logLevel zerolog.Level) Parameter {
})
}
// WithTimeout sets the timeout for beacon block proposal requests.
func WithTimeout(timeout time.Duration) Parameter {
return parameterFunc(func(p *parameters) {
p.timeout = timeout
})
}
// WithClientMonitor sets the client monitor for the service.
func WithClientMonitor(monitor metrics.ClientMonitor) Parameter {
return parameterFunc(func(p *parameters) {
@ -74,10 +82,10 @@ func WithBeaconBlockProposalProviders(providers map[string]eth2client.BeaconBloc
})
}
// WithTimeout sets the timeout for beacon block proposal requests.
func WithTimeout(timeout time.Duration) Parameter {
// WithSignedBeaconBlockProvider sets the signed beacon block provider.
func WithSignedBeaconBlockProvider(provider eth2client.SignedBeaconBlockProvider) Parameter {
return parameterFunc(func(p *parameters) {
p.timeout = timeout
p.signedBeaconBlockProvider = provider
})
}
@ -107,6 +115,9 @@ func parseAndCheckParameters(params ...Parameter) (*parameters, error) {
if len(parameters.beaconBlockProposalProviders) == 0 {
return nil, errors.New("no beacon block proposal providers specified")
}
if parameters.signedBeaconBlockProvider == nil {
return nil, errors.New("no signed beacon block provider specified")
}
return &parameters, nil
}

View File

@ -22,7 +22,7 @@ import (
// scoreBeaconBlockPropsal generates a score for a beacon block.
// The score is relative to the reward expected by proposing the block.
func scoreBeaconBlockProposal(ctx context.Context, name string, blockProposal *spec.BeaconBlock) float64 {
func scoreBeaconBlockProposal(ctx context.Context, name string, parentSlot spec.Slot, blockProposal *spec.BeaconBlock) float64 {
if blockProposal == nil {
return 0
}
@ -83,17 +83,28 @@ func scoreBeaconBlockProposal(ctx context.Context, name string, blockProposal *s
}
attesterSlashingScore := slashingWeight * float64(indicesSlashed)
// Scale scores by the distance between the proposal and parent slots.
scale := uint64(1)
if blockProposal.Slot <= parentSlot {
log.Warn().Uint64("slot", uint64(blockProposal.Slot)).Uint64("parent_slot", uint64(parentSlot)).Msg("Invalid parent slot for proposal")
scale = 32
} else {
scale = uint64(blockProposal.Slot - parentSlot)
}
log.Trace().
Uint64("slot", uint64(blockProposal.Slot)).
Uint64("parent_slot", uint64(parentSlot)).
Str("provider", name).
Float64("immediate_attestations", immediateAttestationScore).
Float64("attestations", attestationScore).
Float64("proposer_slashings", proposerSlashingScore).
Float64("attester_slashings", attesterSlashingScore).
Float64("total", attestationScore+proposerSlashingScore+attesterSlashingScore).
Uint64("scale", scale).
Float64("total", (attestationScore+proposerSlashingScore+attesterSlashingScore)/float64(scale)).
Msg("Scored block")
return attestationScore + proposerSlashingScore + attesterSlashingScore
return (attestationScore + proposerSlashingScore + attesterSlashingScore) / float64(scale)
}
// intersection returns a list of items common between the two sets.

View File

@ -15,6 +15,8 @@ package best
import (
"context"
"encoding/hex"
"strings"
"testing"
spec "github.com/attestantio/go-eth2-client/spec/phase0"
@ -22,6 +24,14 @@ import (
"github.com/stretchr/testify/assert"
)
func _bytes(input string) []byte {
res, err := hex.DecodeString(strings.TrimPrefix(input, "0x"))
if err != nil {
panic(err)
}
return res
}
func aggregationBits(set uint64, total uint64) bitfield.Bitlist {
bits := bitfield.NewBitlist(total)
for i := uint64(0); i < set; i++ {
@ -40,19 +50,22 @@ func specificAggregationBits(set []uint64, total uint64) bitfield.Bitlist {
func TestScore(t *testing.T) {
tests := []struct {
name string
block *spec.BeaconBlock
score float64
err string
name string
block *spec.BeaconBlock
parentSlot uint64
score float64
err string
}{
{
name: "Nil",
score: 0,
name: "Nil",
parentSlot: 1,
score: 0,
},
{
name: "Empty",
block: &spec.BeaconBlock{},
score: 0,
name: "Empty",
block: &spec.BeaconBlock{},
parentSlot: 1,
score: 0,
},
{
name: "SingleAttestation",
@ -69,7 +82,26 @@ func TestScore(t *testing.T) {
},
},
},
score: 1,
parentSlot: 12344,
score: 1,
},
{
name: "SingleAttestationParentRootDistance2",
block: &spec.BeaconBlock{
Slot: 12345,
Body: &spec.BeaconBlockBody{
Attestations: []*spec.Attestation{
{
AggregationBits: aggregationBits(1, 128),
Data: &spec.AttestationData{
Slot: 12344,
},
},
},
},
},
parentSlot: 12343,
score: 0.5,
},
{
name: "SingleAttestationDistance2",
@ -86,7 +118,8 @@ func TestScore(t *testing.T) {
},
},
},
score: 0.875,
parentSlot: 12344,
score: 0.875,
},
{
name: "TwoAttestations",
@ -109,7 +142,8 @@ func TestScore(t *testing.T) {
},
},
},
score: 2.8125,
parentSlot: 12344,
score: 2.8125,
},
{
name: "AttesterSlashing",
@ -136,7 +170,8 @@ func TestScore(t *testing.T) {
},
},
},
score: 1450,
parentSlot: 12344,
score: 1450,
},
{
name: "DuplicateAttestations",
@ -159,13 +194,170 @@ func TestScore(t *testing.T) {
},
},
},
score: 4,
parentSlot: 12344,
score: 4,
},
{
name: "Full",
block: &spec.BeaconBlock{
Slot: 12345,
Body: &spec.BeaconBlockBody{
Attestations: []*spec.Attestation{
{
AggregationBits: aggregationBits(50, 128),
Data: &spec.AttestationData{
Slot: 12344,
},
},
},
AttesterSlashings: []*spec.AttesterSlashing{
{
Attestation1: &spec.IndexedAttestation{
AttestingIndices: []uint64{1, 2, 3},
},
Attestation2: &spec.IndexedAttestation{
AttestingIndices: []uint64{2, 3, 4},
},
},
},
ProposerSlashings: []*spec.ProposerSlashing{
{
Header1: &spec.SignedBeaconBlockHeader{
Message: &spec.BeaconBlockHeader{
Slot: 10,
ProposerIndex: 1,
ParentRoot: _bytes("0x0101010101010101010101010101010101010101010101010101010101010101"),
StateRoot: _bytes("0x0202020202020202020202020202020202020202020202020202020202020202"),
BodyRoot: _bytes("0x0303030303030303030303030303030303030303030303030303030303030303"),
},
Signature: _bytes("0x040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404"),
},
Header2: &spec.SignedBeaconBlockHeader{
Message: &spec.BeaconBlockHeader{
Slot: 10,
ProposerIndex: 1,
ParentRoot: _bytes("0x0404040404040404040404040404040404040404040404040404040404040404"),
StateRoot: _bytes("0x0202020202020202020202020202020202020202020202020202020202020202"),
BodyRoot: _bytes("0x0303030303030303030303030303030303030303030303030303030303030303"),
},
Signature: _bytes("0x040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404"),
},
},
},
},
},
parentSlot: 12344,
score: 2150,
},
{
name: "FullParentRootDistance2",
block: &spec.BeaconBlock{
Slot: 12345,
Body: &spec.BeaconBlockBody{
Attestations: []*spec.Attestation{
{
AggregationBits: aggregationBits(50, 128),
Data: &spec.AttestationData{
Slot: 12344,
},
},
},
AttesterSlashings: []*spec.AttesterSlashing{
{
Attestation1: &spec.IndexedAttestation{
AttestingIndices: []uint64{1, 2, 3},
},
Attestation2: &spec.IndexedAttestation{
AttestingIndices: []uint64{2, 3, 4},
},
},
},
ProposerSlashings: []*spec.ProposerSlashing{
{
Header1: &spec.SignedBeaconBlockHeader{
Message: &spec.BeaconBlockHeader{
Slot: 10,
ProposerIndex: 1,
ParentRoot: _bytes("0x0101010101010101010101010101010101010101010101010101010101010101"),
StateRoot: _bytes("0x0202020202020202020202020202020202020202020202020202020202020202"),
BodyRoot: _bytes("0x0303030303030303030303030303030303030303030303030303030303030303"),
},
Signature: _bytes("0x040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404"),
},
Header2: &spec.SignedBeaconBlockHeader{
Message: &spec.BeaconBlockHeader{
Slot: 10,
ProposerIndex: 1,
ParentRoot: _bytes("0x0404040404040404040404040404040404040404040404040404040404040404"),
StateRoot: _bytes("0x0202020202020202020202020202020202020202020202020202020202020202"),
BodyRoot: _bytes("0x0303030303030303030303030303030303030303030303030303030303030303"),
},
Signature: _bytes("0x040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404"),
},
},
},
},
},
parentSlot: 12343,
score: 1075,
},
{
name: "FullParentRootDistance4",
block: &spec.BeaconBlock{
Slot: 12345,
Body: &spec.BeaconBlockBody{
Attestations: []*spec.Attestation{
{
AggregationBits: aggregationBits(50, 128),
Data: &spec.AttestationData{
Slot: 12344,
},
},
},
AttesterSlashings: []*spec.AttesterSlashing{
{
Attestation1: &spec.IndexedAttestation{
AttestingIndices: []uint64{1, 2, 3},
},
Attestation2: &spec.IndexedAttestation{
AttestingIndices: []uint64{2, 3, 4},
},
},
},
ProposerSlashings: []*spec.ProposerSlashing{
{
Header1: &spec.SignedBeaconBlockHeader{
Message: &spec.BeaconBlockHeader{
Slot: 10,
ProposerIndex: 1,
ParentRoot: _bytes("0x0101010101010101010101010101010101010101010101010101010101010101"),
StateRoot: _bytes("0x0202020202020202020202020202020202020202020202020202020202020202"),
BodyRoot: _bytes("0x0303030303030303030303030303030303030303030303030303030303030303"),
},
Signature: _bytes("0x040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404"),
},
Header2: &spec.SignedBeaconBlockHeader{
Message: &spec.BeaconBlockHeader{
Slot: 10,
ProposerIndex: 1,
ParentRoot: _bytes("0x0404040404040404040404040404040404040404040404040404040404040404"),
StateRoot: _bytes("0x0202020202020202020202020202020202020202020202020202020202020202"),
BodyRoot: _bytes("0x0303030303030303030303030303030303030303030303030303030303030303"),
},
Signature: _bytes("0x040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404040404"),
},
},
},
},
},
parentSlot: 12341,
score: 537.5,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
score := scoreBeaconBlockProposal(context.Background(), test.name, test.block)
score := scoreBeaconBlockProposal(context.Background(), test.name, test.parentSlot, test.block)
assert.Equal(t, test.score, score)
})
}

View File

@ -29,6 +29,7 @@ type Service struct {
clientMonitor metrics.ClientMonitor
processConcurrency int64
beaconBlockProposalProviders map[string]eth2client.BeaconBlockProposalProvider
signedBeaconBlockProvider eth2client.SignedBeaconBlockProvider
timeout time.Duration
}
@ -51,6 +52,7 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) {
s := &Service{
processConcurrency: parameters.processConcurrency,
beaconBlockProposalProviders: parameters.beaconBlockProposalProviders,
signedBeaconBlockProvider: parameters.signedBeaconBlockProvider,
timeout: parameters.timeout,
clientMonitor: parameters.clientMonitor,
}