Ensure genesis attestes are scheduled appropriately.

This commit is contained in:
Jim McDonald 2020-10-04 08:12:34 +01:00
parent 58d38d0b90
commit e19fae7b4e
No known key found for this signature in database
GPG Key ID: 89CEB61B2AD2A5E7
2 changed files with 14 additions and 7 deletions

View File

@ -1,4 +1,5 @@
Development
- ensure genesis attesters are scheduled appropriately
- do not continue if attempt to acquire a semaphore fails
- fetch validators without balances, for (much) faster response from Prysm
- do not fetch validator status twice on startup

View File

@ -50,7 +50,7 @@ type Service struct {
attestationAggregator attestationaggregator.Service
beaconCommitteeSubscriber beaconcommitteesubscriber.Service
activeAccounts int
// Epoch => slot => committee => subscription info
// Epoch => slot => committee => subscription info cache.
subscriptionInfos map[uint64]map[uint64]map[uint64]*beaconcommitteesubscriber.Subscription
subscriptionInfosMutex sync.Mutex
}
@ -147,7 +147,9 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) {
func (s *Service) startTickers(ctx context.Context) error {
genesisTime := s.chainTimeService.GenesisTime()
now := time.Now()
waitedForGenesis := false
if now.Before(genesisTime) {
waitedForGenesis = true
// Wait for genesis.
log.Info().Str("genesis", fmt.Sprintf("%v", genesisTime)).Msg("Waiting for genesis")
time.Sleep(time.Until(genesisTime))
@ -157,7 +159,7 @@ func (s *Service) startTickers(ctx context.Context) error {
// Start epoch ticker.
log.Trace().Msg("Starting epoch ticker")
if err := s.startEpochTicker(ctx); err != nil {
if err := s.startEpochTicker(ctx, waitedForGenesis); err != nil {
return errors.Wrap(err, "failed to start epoch ticker")
}
@ -167,16 +169,18 @@ func (s *Service) startTickers(ctx context.Context) error {
type epochTickerData struct {
mutex sync.Mutex
latestEpochRan int64
atGenesis bool
}
// startEpochTicker starts a ticker that ticks at the beginning of each epoch.
func (s *Service) startEpochTicker(ctx context.Context) error {
func (s *Service) startEpochTicker(ctx context.Context, waitedForGenesis bool) error {
runtimeFunc := func(ctx context.Context, data interface{}) (time.Time, error) {
// Schedule for the beginning of the next epoch.
return s.chainTimeService.StartOfEpoch(s.chainTimeService.CurrentEpoch() + 1), nil
}
data := &epochTickerData{
latestEpochRan: -1,
atGenesis: waitedForGenesis,
}
if err := s.scheduler.SchedulePeriodicJob(ctx,
"Epoch ticker",
@ -201,9 +205,9 @@ func (s *Service) epochTicker(ctx context.Context, data interface{}) {
// Ensure we don't run for the same epoch twice.
epochTickerData := data.(*epochTickerData)
currentEpoch := s.chainTimeService.CurrentEpoch()
log.Trace().Uint64("epoch", currentEpoch).Msg("Starting per-epoch duties")
epochTickerData.mutex.Lock()
firstRun := epochTickerData.latestEpochRan == -1
log.Trace().Uint64("epoch", currentEpoch).Bool("first_run", firstRun).Msg("Starting per-epoch duties")
epochTickerData.mutex.Lock()
if epochTickerData.latestEpochRan >= int64(currentEpoch) {
log.Trace().Uint64("epoch", currentEpoch).Msg("Already ran for this epoch; skipping")
epochTickerData.mutex.Unlock()
@ -242,8 +246,8 @@ func (s *Service) epochTicker(ctx context.Context, data interface{}) {
}
// Create the jobs for our individual functions.
go s.createProposerJobs(ctx, currentEpoch, accounts, firstRun)
go s.createAttesterJobs(ctx, currentEpoch, accounts, firstRun)
go s.createProposerJobs(ctx, currentEpoch, accounts, firstRun && !epochTickerData.atGenesis)
go s.createAttesterJobs(ctx, currentEpoch, accounts, firstRun && !epochTickerData.atGenesis)
go func() {
// Update beacon committee subscriptions for the next epoch.
subscriptionInfo, err := s.beaconCommitteeSubscriber.Subscribe(ctx, currentEpoch+1, accounts)
@ -255,6 +259,8 @@ func (s *Service) epochTicker(ctx context.Context, data interface{}) {
s.subscriptionInfos[currentEpoch+1] = subscriptionInfo
s.subscriptionInfosMutex.Unlock()
}()
epochTickerData.atGenesis = false
}
// OnBeaconChainHeadUpdated runs attestations for a slot immediately, if the update is for the current slot.