From 2a17229b39286de643040f9e077055aa0ba3e086 Mon Sep 17 00:00:00 2001 From: Jim McDonald Date: Sun, 1 Aug 2021 15:26:22 +0100 Subject: [PATCH] Tidy ups for sync committee scheduling. --- main.go | 1 + mock/eth2client.go | 24 ++++++ services/controller/standard/events.go | 39 ++++++---- services/controller/standard/parameters.go | 11 +++ services/controller/standard/service.go | 74 +++++++++++++++++-- services/controller/standard/service_test.go | 41 ++++++++++ .../standard/synccommitteemessenger.go | 44 +++++++---- .../standard/service.go | 2 +- .../standard/service.go | 35 +-------- 9 files changed, 204 insertions(+), 67 deletions(-) diff --git a/main.go b/main.go index a1a441c..14d17bb 100644 --- a/main.go +++ b/main.go @@ -464,6 +464,7 @@ func startServices(ctx context.Context, majordomo majordomo.Service) error { standardcontroller.WithLogLevel(logLevel(viper.GetString("controller.log-level"))), standardcontroller.WithMonitor(monitor.(metrics.ControllerMonitor)), standardcontroller.WithSpecProvider(eth2Client.(eth2client.SpecProvider)), + standardcontroller.WithForkScheduleProvider(eth2Client.(eth2client.ForkScheduleProvider)), standardcontroller.WithChainTimeService(chainTime), standardcontroller.WithProposerDutiesProvider(eth2Client.(eth2client.ProposerDutiesProvider)), standardcontroller.WithAttesterDutiesProvider(eth2Client.(eth2client.AttesterDutiesProvider)), diff --git a/mock/eth2client.go b/mock/eth2client.go index 11cc00a..6838650 100644 --- a/mock/eth2client.go +++ b/mock/eth2client.go @@ -693,6 +693,30 @@ func (m *SpecProvider) Spec(ctx context.Context) (map[string]interface{}, error) }, nil } +// ForkScheduleProvider is a mock for eth2client.ForkScheduleProvider. +type ForkScheduleProvider struct{} + +// NewForkScheduleProvider returns a mock fork schedule provider. +func NewForkScheduleProvider() eth2client.ForkScheduleProvider { + return &ForkScheduleProvider{} +} + +// ForkSchedule is a mock. +func (m *ForkScheduleProvider) ForkSchedule(ctx context.Context) ([]*phase0.Fork, error) { + return []*phase0.Fork{ + { + PreviousVersion: phase0.Version{0x00, 0x01, 0x02, 0x03}, + CurrentVersion: phase0.Version{0x00, 0x01, 0x02, 0x03}, + Epoch: 0, + }, + { + PreviousVersion: phase0.Version{0x00, 0x01, 0x02, 0x03}, + CurrentVersion: phase0.Version{0x01, 0x02, 0x03, 0x04}, + Epoch: 10, + }, + }, nil +} + // DomainProvider is a mock for eth2client.DomainProvider. type DomainProvider struct{} diff --git a/services/controller/standard/events.go b/services/controller/standard/events.go index 39a9881..8f80974 100644 --- a/services/controller/standard/events.go +++ b/services/controller/standard/events.go @@ -131,13 +131,10 @@ func (s *Service) handleCurrentDependentRootChanged(ctx context.Context) { // We need to refresh the proposer duties for this epoch. go s.refreshProposerDutiesForEpoch(ctx, s.chainTimeService.CurrentEpoch()) - // We need to refresh the sync committee duties for this epoch if we are + // We need to refresh the sync committee duties for the next period if we are // at the appropriate boundary. if uint64(s.chainTimeService.CurrentEpoch())%s.epochsPerSyncCommitteePeriod == 0 { - // TODO is this correct? - // Check if this is the correct sync committee period (should it be the next one?) - // Check if this should only be recalculated on the sync committee period boundary. - go s.refreshSyncCommitteeDutiesForEpoch(ctx, s.chainTimeService.CurrentEpoch()) + go s.refreshSyncCommitteeDutiesForEpochPeriod(ctx, s.chainTimeService.CurrentEpoch()+phase0.Epoch(s.epochsPerSyncCommitteePeriod)) } // We need to refresh the attester duties for the next epoch. go s.refreshAttesterDutiesForEpoch(ctx, s.chainTimeService.CurrentEpoch()+1) @@ -199,26 +196,40 @@ func (s *Service) refreshAttesterDutiesForEpoch(ctx context.Context, epoch phase s.subscriptionInfosMutex.Unlock() } -// TODO this should refresh for the entire period. -func (s *Service) refreshSyncCommitteeDutiesForEpoch(ctx context.Context, epoch phase0.Epoch) { +// refreshSyncCommitteeDutiesForEpochPeriod refreshes sync committee duties for all epochs in the +// given sync period. +func (s *Service) refreshSyncCommitteeDutiesForEpochPeriod(ctx context.Context, epoch phase0.Epoch) { if !s.handlingAltair { // Not handling Altair, nothing to do. return } + // Work out start and end epoch for the period. + period := uint64(epoch) / s.epochsPerSyncCommitteePeriod + firstEpoch := s.firstEpochOfSyncPeriod(period) + firstSlot := s.chainTimeService.FirstSlotOfEpoch(firstEpoch) + lastEpoch := s.firstEpochOfSyncPeriod(period+1) - 1 + lastSlot := s.chainTimeService.FirstSlotOfEpoch(lastEpoch+1) - 1 + // First thing we do is cancel all scheduled sync committee message jobs. - firstSlot := s.chainTimeService.FirstSlotOfEpoch(epoch) - syncCommitteePeriod := uint64(s.chainTimeService.SlotToEpoch(firstSlot)) / s.epochsPerSyncCommitteePeriod - lastSlot := s.chainTimeService.FirstSlotOfEpoch(phase0.Epoch((syncCommitteePeriod+1)*s.epochsPerSyncCommitteePeriod)) - 1 for slot := firstSlot; slot <= lastSlot; slot++ { - if err := s.scheduler.CancelJob(ctx, fmt.Sprintf("Sync committee messages for slot %d", slot)); err != nil { - log.Debug().Err(err).Msg("Failed to cancel sync committee message job") + prepareJobName := fmt.Sprintf("Prepare sync committee messages for slot %d", slot) + if err := s.scheduler.CancelJob(ctx, prepareJobName); err != nil { + log.Debug().Str("job_name", prepareJobName).Err(err).Msg("Failed to cancel prepare sync committee message job") + } + messageJobName := fmt.Sprintf("Sync committee messages for slot %d", slot) + if err := s.scheduler.CancelJob(ctx, messageJobName); err != nil { + log.Debug().Str("job_name", messageJobName).Err(err).Msg("Failed to cancel sync committee message job") + } + aggregateJobName := fmt.Sprintf("Sync committee aggregation for slot %d", slot) + if err := s.scheduler.CancelJob(ctx, aggregateJobName); err != nil { + log.Debug().Str("job_name", aggregateJobName).Err(err).Msg("Failed to cancel sync committee aggregate job") } } - _, validatorIndices, err := s.accountsAndIndicesForEpoch(ctx, epoch) + _, validatorIndices, err := s.accountsAndIndicesForEpoch(ctx, firstEpoch) if err != nil { - log.Error().Err(err).Uint64("epoch", uint64(epoch)).Msg("Failed to obtain active validators for epoch") + log.Error().Err(err).Uint64("epoch", uint64(firstEpoch)).Msg("Failed to obtain active validators for epoch") return } diff --git a/services/controller/standard/parameters.go b/services/controller/standard/parameters.go index 0859150..ccc2a0f 100644 --- a/services/controller/standard/parameters.go +++ b/services/controller/standard/parameters.go @@ -37,6 +37,7 @@ type parameters struct { logLevel zerolog.Level monitor metrics.ControllerMonitor specProvider eth2client.SpecProvider + forkScheduleProvider eth2client.ForkScheduleProvider chainTimeService chaintime.Service proposerDutiesProvider eth2client.ProposerDutiesProvider attesterDutiesProvider eth2client.AttesterDutiesProvider @@ -89,6 +90,13 @@ func WithSpecProvider(provider eth2client.SpecProvider) Parameter { }) } +// WithForkScheduleProvider sets the fork schedule provider. +func WithForkScheduleProvider(provider eth2client.ForkScheduleProvider) Parameter { + return parameterFunc(func(p *parameters) { + p.forkScheduleProvider = provider + }) +} + // WithChainTimeService sets the chain time service. func WithChainTimeService(service chaintime.Service) Parameter { return parameterFunc(func(p *parameters) { @@ -232,6 +240,9 @@ func parseAndCheckParameters(params ...Parameter) (*parameters, error) { if parameters.specProvider == nil { return nil, errors.New("no spec provider specified") } + if parameters.forkScheduleProvider == nil { + return nil, errors.New("no fork schedule provider specified") + } if parameters.chainTimeService == nil { return nil, errors.New("no chain time service specified") } diff --git a/services/controller/standard/service.go b/services/controller/standard/service.go index d99c1c2..5355bf7 100644 --- a/services/controller/standard/service.go +++ b/services/controller/standard/service.go @@ -1,4 +1,4 @@ -// Copyright © 2020 Attestant Limited. +// Copyright © 2020, 2021 Attestant Limited. // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,6 +14,7 @@ package standard import ( + "bytes" "context" "fmt" "sync" @@ -67,7 +68,8 @@ type Service struct { reorgs bool // Hard fork control - handlingAltair bool + handlingAltair bool + altairForkEpoch phase0.Epoch // Tracking for reorgs. lastBlockRoot phase0.Root @@ -126,6 +128,20 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) { // Handling altair if we have the service and spec to do so. handlingAltair := parameters.syncCommitteeAggregator != nil && epochsPerSyncCommitteePeriod != 0 + if !handlingAltair { + log.Trace().Msg("Not handling Altair") + } + // Fetch the altair fork epoch from the fork schedule. + var altairForkEpoch phase0.Epoch + if handlingAltair { + altairForkEpoch, err = fetchAltairForkEpoch(ctx, parameters.forkScheduleProvider) + if err != nil { + // Not handling altair after all. + handlingAltair = false + } else { + log.Trace().Uint64("epoch", uint64(altairForkEpoch)).Msg("Obtained Altair fork epoch") + } + } s := &Service{ monitor: parameters.monitor, @@ -150,6 +166,7 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) { reorgs: parameters.reorgs, subscriptionInfos: make(map[phase0.Epoch]map[phase0.Slot]map[phase0.CommitteeIndex]*beaconcommitteesubscriber.Subscription), handlingAltair: handlingAltair, + altairForkEpoch: altairForkEpoch, } // Subscribe to head events. This allows us to go early for attestations if a block arrives, as well as @@ -181,7 +198,7 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) { go s.scheduleAttestations(ctx, epoch, validatorIndices, true /* notCurrentSlot */) if handlingAltair { go s.scheduleSyncCommitteeMessages(ctx, epoch, validatorIndices) - nextSyncCommitteePeriodStartEpoch := phase0.Epoch((uint64(epoch)/s.epochsPerSyncCommitteePeriod + 1) * s.epochsPerSyncCommitteePeriod) + nextSyncCommitteePeriodStartEpoch := s.firstEpochOfSyncPeriod(uint64(epoch)/s.epochsPerSyncCommitteePeriod + 1) go s.scheduleSyncCommitteeMessages(ctx, nextSyncCommitteePeriodStartEpoch, validatorIndices) } go s.scheduleAttestations(ctx, epoch+1, nextEpochValidatorIndices, true /* notCurrentSlot */) @@ -312,9 +329,15 @@ func (s *Service) epochTicker(ctx context.Context, data interface{}) { go s.scheduleProposals(ctx, currentEpoch, validatorIndices, false /* notCurrentSlot */) go s.scheduleAttestations(ctx, currentEpoch+1, nextEpochValidatorIndices, false /* notCurrentSlot */) if s.handlingAltair { - // Only update if we are on an EPOCHS_PER_SYNC_COMMITTEE_PERIOD boundary. + // Handle the Altair hard fork transition epoch. + if currentEpoch == s.altairForkEpoch { + log.Trace().Msg("At Altair fork epoch") + go s.handleAltairForkEpoch(ctx) + } + + // Update the _next_ period if we are on an EPOCHS_PER_SYNC_COMMITTEE_PERIOD boundary. if uint64(currentEpoch)%s.epochsPerSyncCommitteePeriod == 0 { - go s.scheduleSyncCommitteeMessages(ctx, currentEpoch, validatorIndices) + go s.scheduleSyncCommitteeMessages(ctx, currentEpoch+phase0.Epoch(s.epochsPerSyncCommitteePeriod), validatorIndices) } } go func() { @@ -352,3 +375,44 @@ func (s *Service) accountsAndIndicesForEpoch(ctx context.Context, return accounts, validatorIndices, nil } + +func fetchAltairForkEpoch(ctx context.Context, forkScheduleProvider eth2client.ForkScheduleProvider) (phase0.Epoch, error) { + forkSchedule, err := forkScheduleProvider.ForkSchedule(ctx) + if err != nil { + return 0, err + } + for i := range forkSchedule { + if bytes.Equal(forkSchedule[i].CurrentVersion[:], forkSchedule[i].PreviousVersion[:]) { + // This is the genesis fork; ignore it. + continue + } + return forkSchedule[i].Epoch, nil + } + return 0, errors.New("no altair fork obtained") +} + +// handleAltairForkEpoch handles changes that need to take place at the Altair hard fork boundary. +func (s *Service) handleAltairForkEpoch(ctx context.Context) { + if !s.handlingAltair { + return + } + + go func() { + _, validatorIndices, err := s.accountsAndIndicesForEpoch(ctx, s.altairForkEpoch) + if err != nil { + log.Error().Err(err).Msg("Failed to obtain active validator indices for the Altair fork epoch") + return + } + go s.scheduleSyncCommitteeMessages(ctx, s.altairForkEpoch, validatorIndices) + }() + + go func() { + nextPeriodEpoch := phase0.Epoch((uint64(s.altairForkEpoch)/s.epochsPerSyncCommitteePeriod + 1) * s.epochsPerSyncCommitteePeriod) + _, validatorIndices, err := s.accountsAndIndicesForEpoch(ctx, nextPeriodEpoch) + if err != nil { + log.Error().Err(err).Msg("Failed to obtain active validator indices for the period following the Altair fork epoch") + return + } + go s.scheduleSyncCommitteeMessages(ctx, nextPeriodEpoch, validatorIndices) + }() +} diff --git a/services/controller/standard/service_test.go b/services/controller/standard/service_test.go index de0f410..48e7503 100644 --- a/services/controller/standard/service_test.go +++ b/services/controller/standard/service_test.go @@ -45,6 +45,7 @@ func TestService(t *testing.T) { slotDurationProvider := mock.NewSlotDurationProvider(slotDuration) slotsPerEpochProvider := mock.NewSlotsPerEpochProvider(slotsPerEpoch) specProvider := mock.NewSpecProvider() + forkScheduleProvider := mock.NewForkScheduleProvider() proposerDutiesProvider := mock.NewProposerDutiesProvider() attesterDutiesProvider := mock.NewAttesterDutiesProvider() @@ -78,6 +79,7 @@ func TestService(t *testing.T) { params: []standard.Parameter{ standard.WithLogLevel(zerolog.Disabled), standard.WithSpecProvider(specProvider), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithChainTimeService(chainTime), standard.WithProposerDutiesProvider(proposerDutiesProvider), standard.WithAttesterDutiesProvider(attesterDutiesProvider), @@ -100,6 +102,7 @@ func TestService(t *testing.T) { name: "SpecProviderMissing", params: []standard.Parameter{ standard.WithLogLevel(zerolog.Disabled), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithMonitor(nullmetrics.New(ctx)), standard.WithChainTimeService(chainTime), standard.WithProposerDutiesProvider(proposerDutiesProvider), @@ -119,12 +122,37 @@ func TestService(t *testing.T) { }, err: "problem with parameters: no spec provider specified", }, + { + name: "ForkScheduleProviderMissing", + params: []standard.Parameter{ + standard.WithLogLevel(zerolog.Disabled), + standard.WithSpecProvider(specProvider), + standard.WithMonitor(nullmetrics.New(ctx)), + standard.WithChainTimeService(chainTime), + standard.WithProposerDutiesProvider(proposerDutiesProvider), + standard.WithAttesterDutiesProvider(attesterDutiesProvider), + standard.WithSyncCommitteeDutiesProvider(syncCommitteeDutiesProvider), + standard.WithEventsProvider(mockEventsProvider), + standard.WithValidatingAccountsProvider(mockValidatingAccountsProvider), + standard.WithScheduler(mockScheduler), + standard.WithAttester(mockAttester), + standard.WithSyncCommitteeMessenger(mockSyncCommitteeMessenger), + standard.WithSyncCommitteeSubscriber(mockSyncCommitteeSubscriber), + standard.WithBeaconBlockProposer(mockBeaconBlockProposer), + standard.WithBeaconCommitteeSubscriber(mockBeaconCommitteeSubscriber), + standard.WithAttestationAggregator(mockAttestationAggregator), + standard.WithAccountsRefresher(mockAccountsRefresher), + standard.WithMaxAttestationDelay(4 * time.Second), + }, + err: "problem with parameters: no fork schedule provider specified", + }, { name: "SpecProviderErrors", params: []standard.Parameter{ standard.WithLogLevel(zerolog.Disabled), standard.WithMonitor(nullmetrics.New(ctx)), standard.WithSpecProvider(mock.NewErroringSpecProvider()), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithChainTimeService(chainTime), standard.WithProposerDutiesProvider(proposerDutiesProvider), standard.WithAttesterDutiesProvider(attesterDutiesProvider), @@ -149,6 +177,7 @@ func TestService(t *testing.T) { standard.WithLogLevel(zerolog.Disabled), standard.WithMonitor(nullmetrics.New(ctx)), standard.WithSpecProvider(specProvider), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithProposerDutiesProvider(proposerDutiesProvider), standard.WithAttesterDutiesProvider(attesterDutiesProvider), standard.WithSyncCommitteeDutiesProvider(syncCommitteeDutiesProvider), @@ -172,6 +201,7 @@ func TestService(t *testing.T) { standard.WithLogLevel(zerolog.Disabled), standard.WithMonitor(nullmetrics.New(ctx)), standard.WithSpecProvider(specProvider), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithChainTimeService(chainTime), standard.WithAttesterDutiesProvider(attesterDutiesProvider), standard.WithSyncCommitteeDutiesProvider(syncCommitteeDutiesProvider), @@ -195,6 +225,7 @@ func TestService(t *testing.T) { standard.WithLogLevel(zerolog.Disabled), standard.WithMonitor(nullmetrics.New(ctx)), standard.WithSpecProvider(specProvider), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithChainTimeService(chainTime), standard.WithProposerDutiesProvider(proposerDutiesProvider), standard.WithSyncCommitteeDutiesProvider(syncCommitteeDutiesProvider), @@ -218,6 +249,7 @@ func TestService(t *testing.T) { standard.WithLogLevel(zerolog.Disabled), standard.WithMonitor(nullmetrics.New(ctx)), standard.WithSpecProvider(specProvider), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithChainTimeService(chainTime), standard.WithProposerDutiesProvider(proposerDutiesProvider), standard.WithAttesterDutiesProvider(attesterDutiesProvider), @@ -241,6 +273,7 @@ func TestService(t *testing.T) { standard.WithLogLevel(zerolog.Disabled), standard.WithMonitor(nullmetrics.New(ctx)), standard.WithSpecProvider(specProvider), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithChainTimeService(chainTime), standard.WithProposerDutiesProvider(proposerDutiesProvider), standard.WithAttesterDutiesProvider(attesterDutiesProvider), @@ -264,6 +297,7 @@ func TestService(t *testing.T) { standard.WithLogLevel(zerolog.Disabled), standard.WithMonitor(nullmetrics.New(ctx)), standard.WithSpecProvider(specProvider), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithChainTimeService(chainTime), standard.WithProposerDutiesProvider(proposerDutiesProvider), standard.WithAttesterDutiesProvider(attesterDutiesProvider), @@ -287,6 +321,7 @@ func TestService(t *testing.T) { standard.WithLogLevel(zerolog.Disabled), standard.WithMonitor(nullmetrics.New(ctx)), standard.WithSpecProvider(specProvider), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithChainTimeService(chainTime), standard.WithProposerDutiesProvider(proposerDutiesProvider), standard.WithAttesterDutiesProvider(attesterDutiesProvider), @@ -310,6 +345,7 @@ func TestService(t *testing.T) { standard.WithLogLevel(zerolog.Disabled), standard.WithMonitor(nullmetrics.New(ctx)), standard.WithSpecProvider(specProvider), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithChainTimeService(chainTime), standard.WithProposerDutiesProvider(proposerDutiesProvider), standard.WithAttesterDutiesProvider(attesterDutiesProvider), @@ -333,6 +369,7 @@ func TestService(t *testing.T) { standard.WithLogLevel(zerolog.Disabled), standard.WithMonitor(nullmetrics.New(ctx)), standard.WithSpecProvider(specProvider), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithChainTimeService(chainTime), standard.WithProposerDutiesProvider(proposerDutiesProvider), standard.WithAttesterDutiesProvider(attesterDutiesProvider), @@ -356,6 +393,7 @@ func TestService(t *testing.T) { standard.WithLogLevel(zerolog.Disabled), standard.WithMonitor(nullmetrics.New(ctx)), standard.WithSpecProvider(specProvider), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithChainTimeService(chainTime), standard.WithProposerDutiesProvider(proposerDutiesProvider), standard.WithAttesterDutiesProvider(attesterDutiesProvider), @@ -379,6 +417,7 @@ func TestService(t *testing.T) { standard.WithLogLevel(zerolog.Disabled), standard.WithMonitor(nullmetrics.New(ctx)), standard.WithSpecProvider(specProvider), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithChainTimeService(chainTime), standard.WithProposerDutiesProvider(proposerDutiesProvider), standard.WithAttesterDutiesProvider(attesterDutiesProvider), @@ -402,6 +441,7 @@ func TestService(t *testing.T) { standard.WithLogLevel(zerolog.Disabled), standard.WithMonitor(nullmetrics.New(ctx)), standard.WithSpecProvider(specProvider), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithChainTimeService(chainTime), standard.WithProposerDutiesProvider(proposerDutiesProvider), standard.WithAttesterDutiesProvider(attesterDutiesProvider), @@ -426,6 +466,7 @@ func TestService(t *testing.T) { standard.WithLogLevel(zerolog.Disabled), standard.WithMonitor(nullmetrics.New(ctx)), standard.WithSpecProvider(specProvider), + standard.WithForkScheduleProvider(forkScheduleProvider), standard.WithChainTimeService(chainTime), standard.WithProposerDutiesProvider(proposerDutiesProvider), standard.WithAttesterDutiesProvider(attesterDutiesProvider), diff --git a/services/controller/standard/synccommitteemessenger.go b/services/controller/standard/synccommitteemessenger.go index da07412..ade02a9 100644 --- a/services/controller/standard/synccommitteemessenger.go +++ b/services/controller/standard/synccommitteemessenger.go @@ -24,7 +24,7 @@ import ( e2wtypes "github.com/wealdtech/go-eth2-wallet-types/v2" ) -// scheduleSyncCommitteeMessages schedules sync committee messages for the given epoch and validator indices. +// scheduleSyncCommitteeMessages schedules sync committee messages for the given period and validator indices. func (s *Service) scheduleSyncCommitteeMessages(ctx context.Context, epoch phase0.Epoch, validatorIndices []phase0.ValidatorIndex, @@ -33,11 +33,21 @@ func (s *Service) scheduleSyncCommitteeMessages(ctx context.Context, // Nothing to do. return } + if s.chainTimeService.CurrentEpoch() < s.altairForkEpoch { + // Not yet at the Altair epoch; don't schedule anything. + return + } + + period := uint64(epoch) / s.epochsPerSyncCommitteePeriod + firstEpoch := s.firstEpochOfSyncPeriod(period) + firstSlot := s.chainTimeService.FirstSlotOfEpoch(firstEpoch) + lastEpoch := s.firstEpochOfSyncPeriod(period+1) - 1 + lastSlot := s.chainTimeService.FirstSlotOfEpoch(lastEpoch+1) - 1 started := time.Now() - log.Trace().Uint64("epoch", uint64(epoch)).Msg("Scheduling sync committee messages") + log.Trace().Uint64("period", period).Uint64("first_epoch", uint64(firstEpoch)).Uint64("last_epoch", uint64(lastEpoch)).Msg("Scheduling sync committee messages") - resp, err := s.syncCommitteeDutiesProvider.SyncCommitteeDuties(ctx, epoch, validatorIndices) + resp, err := s.syncCommitteeDutiesProvider.SyncCommitteeDuties(ctx, firstEpoch, validatorIndices) if err != nil { log.Error().Err(err).Msg("Failed to fetch sync committee message duties") return @@ -49,28 +59,24 @@ func (s *Service) scheduleSyncCommitteeMessages(ctx context.Context, for _, duty := range resp { messageIndices[duty.ValidatorIndex] = duty.ValidatorSyncCommitteeIndices } - // log.Trace().Dur("elapsed", time.Since(started)).Str("duties", duty.String()).Msg("Generated sync committee message indices") // Obtain the accounts for the validator indices. - accounts, err := s.validatingAccountsProvider.ValidatingAccountsForEpochByIndex(ctx, epoch, validatorIndices) + accounts, err := s.validatingAccountsProvider.ValidatingAccountsForEpochByIndex(ctx, firstEpoch, validatorIndices) if err != nil { log.Error().Err(err).Msg("Failed to obtain validating accounts for epoch") return } // Now we have the messages we can subscribe to the relevant subnets. - - syncCommitteePeriodFirstSlot := s.chainTimeService.FirstSlotOfEpoch(phase0.Epoch((uint64(epoch) / s.epochsPerSyncCommitteePeriod) * s.epochsPerSyncCommitteePeriod)) - syncCommitteePeriodLastSlot := syncCommitteePeriodFirstSlot + phase0.Slot(s.slotsPerEpoch*s.epochsPerSyncCommitteePeriod) - 1 - if syncCommitteePeriodFirstSlot < s.chainTimeService.CurrentSlot() { - syncCommitteePeriodFirstSlot = s.chainTimeService.CurrentSlot() + if firstSlot < s.chainTimeService.CurrentSlot() { + firstSlot = s.chainTimeService.CurrentSlot() } log.Trace(). - Uint64("first_slot", uint64(syncCommitteePeriodFirstSlot)). - Uint64("last_slot", uint64(syncCommitteePeriodLastSlot)). + Uint64("first_slot", uint64(firstSlot)). + Uint64("last_slot", uint64(lastSlot)). Msg("Setting sync committee duties for period") - for slot := syncCommitteePeriodFirstSlot; slot <= syncCommitteePeriodLastSlot; slot++ { + for slot := firstSlot; slot <= lastSlot; slot++ { go func(duty *synccommitteemessenger.Duty, accounts map[phase0.ValidatorIndex]e2wtypes.Account) { for _, validatorIndex := range duty.ValidatorIndices() { account, exists := accounts[validatorIndex] @@ -106,8 +112,7 @@ func (s *Service) scheduleSyncCommitteeMessages(ctx context.Context, } log.Trace().Dur("elapsed", time.Since(started)).Msg("Scheduled sync committee messages") - // TODO Obi-wan? - if err := s.syncCommitteesSubscriber.Subscribe(ctx, s.chainTimeService.SlotToEpoch(syncCommitteePeriodLastSlot), resp); err != nil { + if err := s.syncCommitteesSubscriber.Subscribe(ctx, firstEpoch, resp); err != nil { log.Error().Err(err).Msg("Failed to submit sync committee subscribers") return } @@ -173,3 +178,12 @@ func (s *Service) messageSyncCommittee(ctx context.Context, data interface{}) { } log.Trace().Dur("elapsed", time.Since(started)).Msg("Messaged") } + +// firstEpochOfSyncPeriod calculates the first epoch of the given sync period. +func (s *Service) firstEpochOfSyncPeriod(period uint64) phase0.Epoch { + epoch := phase0.Epoch(period * s.epochsPerSyncCommitteePeriod) + if epoch < s.altairForkEpoch { + epoch = s.altairForkEpoch + } + return epoch +} diff --git a/services/synccommitteeaggregator/standard/service.go b/services/synccommitteeaggregator/standard/service.go index bb54332..0f850ef 100644 --- a/services/synccommitteeaggregator/standard/service.go +++ b/services/synccommitteeaggregator/standard/service.go @@ -169,7 +169,7 @@ func (s *Service) Aggregate(ctx context.Context, data interface{}) { for _, validatorIndex := range duty.ValidatorIndices { for subcommitteeIndex := range duty.SelectionProofs[validatorIndex] { - log.Warn().Uint64("validator_index", uint64(validatorIndex)).Uint64("subcommittee_index", subcommitteeIndex).Str("beacon_block_root", fmt.Sprintf("%#x", *beaconBlockRoot)).Msg("jgm Aggregating") + log.Trace().Uint64("validator_index", uint64(validatorIndex)).Uint64("subcommittee_index", subcommitteeIndex).Str("beacon_block_root", fmt.Sprintf("%#x", *beaconBlockRoot)).Msg("Aggregating") contribution, err := s.syncCommitteeContributionProvider.SyncCommitteeContribution(ctx, duty.Slot, subcommitteeIndex, *beaconBlockRoot) if err != nil { log.Warn().Err(err).Msg("Failed to obtain sync committee contribution") diff --git a/services/synccommitteemessenger/standard/service.go b/services/synccommitteemessenger/standard/service.go index 3f332c1..51ee467 100644 --- a/services/synccommitteemessenger/standard/service.go +++ b/services/synccommitteemessenger/standard/service.go @@ -188,7 +188,7 @@ func (s *Service) Message(ctx context.Context, data interface{}) ([]*altair.Sync log.Error().Err(err).Msg("Failed to sign sync committee message") return } - log.Trace().Str("signature", fmt.Sprintf("%#x", sig)).Msg("Signed sync committee message") + log.Trace().Uint64("slot", uint64(duty.Slot())).Uint64("validator_index", uint64(validatorIndices[i])).Str("signature", fmt.Sprintf("%#x", sig)).Msg("Signed sync committee message") msg := &altair.SyncCommitteeMessage{ Slot: duty.Slot(), @@ -201,39 +201,10 @@ func (s *Service) Message(ctx context.Context, data interface{}) ([]*altair.Sync msgsMu.Unlock() }(ctx, sem, &wg, i) } - - // msgs := make([]*altair.SyncCommitteeMessage, 0, len(duty.ContributionIndices())) - // validatorIndices := make([]phase0.ValidatorIndex, 0, len(duty.ContributionIndices())) - // for validatorIndex := range duty.ContributionIndices() { - // validatorIndices = append(validatorIndices, validatorIndex) - // } - // _, err = util.Scatter(len(duty.ContributionIndices()), func(offset int, entries int, mu *sync.RWMutex) (interface{}, error) { - // for i := offset; i < offset+entries; i++ { - // sig, err := s.contribute(ctx, duty.Account(validatorIndices[i]), s.chainTimeService.SlotToEpoch(duty.Slot()), *beaconBlockRoot) - // if err != nil { - // log.Error().Err(err).Msg("Failed to sign sync committee message") - // continue - // } - // log.Trace().Str("signature", fmt.Sprintf("%#x", sig)).Msg("Signed sync committee message") - // - // msg := &altair.SyncCommitteeMessage{ - // Slot: duty.Slot(), - // BeaconBlockRoot: *beaconBlockRoot, - // ValidatorIndex: validatorIndices[i], - // Signature: sig, - // } - // mu.Lock() - // msgs = append(msgs, msg) - // mu.Unlock() - // } - // return nil, nil - // }) - // if err != nil { - // s.monitor.SyncCommitteeMessagesCompleted(started, len(msgs), "failed") - // log.Error().Err(err).Str("result", "failed").Msg("Failed to obtain committee messages") - // } + wg.Wait() if err := s.syncCommitteeMessagesSubmitter.SubmitSyncCommitteeMessages(ctx, msgs); err != nil { + log.Trace().Dur("elapsed", time.Since(started)).Err(err).Msg("Failed to submit sync committee messages") s.monitor.SyncCommitteeMessagesCompleted(started, len(msgs), "failed") return nil, errors.Wrap(err, "failed to submit sync committee messages") }