
355 lines
13 KiB

// Copyright © 2020 Attestant Limited.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package standard
import (
eth2client "github.com/attestantio/go-eth2-client"
zerologger "github.com/rs/zerolog/log"
e2wtypes "github.com/wealdtech/go-eth2-wallet-types/v2"
// Service is the co-ordination system for vouch.
// It runs purely against clock events, setting up jobs for the validator's processes of block proposal, attestation
// creation and attestation aggregation.
type Service struct {
monitor metrics.ControllerMonitor
slotDuration time.Duration
slotsPerEpoch uint64
epochsPerSyncCommitteePeriod uint64
chainTimeService chaintime.Service
proposerDutiesProvider eth2client.ProposerDutiesProvider
attesterDutiesProvider eth2client.AttesterDutiesProvider
syncCommitteeDutiesProvider eth2client.SyncCommitteeDutiesProvider
validatingAccountsProvider accountmanager.ValidatingAccountsProvider
scheduler scheduler.Service
attester attester.Service
syncCommitteeMessenger synccommitteemessenger.Service
syncCommitteeAggregator synccommitteeaggregator.Service
syncCommitteesSubscriber synccommitteesubscriber.Service
beaconBlockProposer beaconblockproposer.Service
attestationAggregator attestationaggregator.Service
beaconCommitteeSubscriber beaconcommitteesubscriber.Service
activeValidators int
subscriptionInfos map[phase0.Epoch]map[phase0.Slot]map[phase0.CommitteeIndex]*beaconcommitteesubscriber.Subscription
subscriptionInfosMutex sync.Mutex
accountsRefresher accountmanager.Refresher
maxSyncCommitteeMessageDelay time.Duration
reorgs bool
// Hard fork control
handlingAltair bool
// Tracking for reorgs.
lastBlockRoot phase0.Root
lastBlockEpoch phase0.Epoch
currentDutyDependentRoot phase0.Root
previousDutyDependentRoot phase0.Root
// module-wide log.
var log zerolog.Logger
// New creates a new controller.
func New(ctx context.Context, params ...Parameter) (*Service, error) {
parameters, err := parseAndCheckParameters(params...)
if err != nil {
return nil, errors.Wrap(err, "problem with parameters")
// Set logging.
log = zerologger.With().Str("service", "controller").Str("impl", "standard").Logger()
if parameters.logLevel != log.GetLevel() {
log = log.Level(parameters.logLevel)
spec, err := parameters.specProvider.Spec(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to obtain spec")
tmp, exists := spec["SECONDS_PER_SLOT"]
if !exists {
return nil, errors.New("SECONDS_PER_SLOT not found in spec")
slotDuration, ok := tmp.(time.Duration)
if !ok {
return nil, errors.New("SECONDS_PER_SLOT of unexpected type")
tmp, exists = spec["SLOTS_PER_EPOCH"]
if !exists {
return nil, errors.New("SLOTS_PER_EPOCH not found in spec")
slotsPerEpoch, ok := tmp.(uint64)
if !ok {
return nil, errors.New("SLOTS_PER_EPOCH of unexpected type")
var epochsPerSyncCommitteePeriod uint64
if tmp, exists := spec["EPOCHS_PER_SYNC_COMMITTEE_PERIOD"]; exists {
tmp2, ok := tmp.(uint64)
if !ok {
return nil, errors.New("EPOCHS_PER_SYNC_COMMITTEE_PERIOD of unexpected type")
epochsPerSyncCommitteePeriod = tmp2
// Handling altair if we have the service and spec to do so.
handlingAltair := parameters.syncCommitteeAggregator != nil && epochsPerSyncCommitteePeriod != 0
s := &Service{
monitor: parameters.monitor,
slotDuration: slotDuration,
slotsPerEpoch: slotsPerEpoch,
epochsPerSyncCommitteePeriod: epochsPerSyncCommitteePeriod,
chainTimeService: parameters.chainTimeService,
proposerDutiesProvider: parameters.proposerDutiesProvider,
attesterDutiesProvider: parameters.attesterDutiesProvider,
syncCommitteeDutiesProvider: parameters.syncCommitteeDutiesProvider,
syncCommitteesSubscriber: parameters.syncCommitteesSubscriber,
validatingAccountsProvider: parameters.validatingAccountsProvider,
scheduler: parameters.scheduler,
attester: parameters.attester,
syncCommitteeMessenger: parameters.syncCommitteeMessenger,
syncCommitteeAggregator: parameters.syncCommitteeAggregator,
beaconBlockProposer: parameters.beaconBlockProposer,
attestationAggregator: parameters.attestationAggregator,
beaconCommitteeSubscriber: parameters.beaconCommitteeSubscriber,
accountsRefresher: parameters.accountsRefresher,
maxSyncCommitteeMessageDelay: parameters.maxSyncCommitteeMessageDelay,
reorgs: parameters.reorgs,
subscriptionInfos: make(map[phase0.Epoch]map[phase0.Slot]map[phase0.CommitteeIndex]*beaconcommitteesubscriber.Subscription),
handlingAltair: handlingAltair,
// Subscribe to head events. This allows us to go early for attestations if a block arrives, as well as
// re-request duties if there is a change in beacon block.
// This also allows us to re-request duties if the dependent roots change.
if err := parameters.eventsProvider.Events(ctx, []string{"head"}, s.HandleHeadEvent); err != nil {
return nil, errors.Wrap(err, "failed to add head event handler")
if err := s.startTickers(ctx); err != nil {
return nil, errors.Wrap(err, "failed to start controller tickers")
// Run specific actions now so we can carry out duties for the remainder of this epoch.
epoch := s.chainTimeService.CurrentEpoch()
accounts, validatorIndices, err := s.accountsAndIndicesForEpoch(ctx, epoch)
if err != nil {
return nil, errors.Wrap(err, "failed to obtain active validator indices for the current epoch")
if len(validatorIndices) != s.activeValidators {
log.Info().Int("old_validators", s.activeValidators).Int("new_validators", len(validatorIndices)).Msg("Change in number of active validators")
s.activeValidators = len(validatorIndices)
nextEpochAccounts, nextEpochValidatorIndices, err := s.accountsAndIndicesForEpoch(ctx, epoch+1)
if err != nil {
return nil, errors.Wrap(err, "failed to obtain active validator indices for the next epoch")
go s.scheduleProposals(ctx, epoch, validatorIndices, true /* notCurrentSlot */)
go s.scheduleAttestations(ctx, epoch, validatorIndices, true /* notCurrentSlot */)
if handlingAltair {
go s.scheduleSyncCommitteeMessages(ctx, epoch, validatorIndices)
nextSyncCommitteePeriodStartEpoch := phase0.Epoch((uint64(epoch)/s.epochsPerSyncCommitteePeriod + 1) * s.epochsPerSyncCommitteePeriod)
go s.scheduleSyncCommitteeMessages(ctx, nextSyncCommitteePeriodStartEpoch, validatorIndices)
go s.scheduleAttestations(ctx, epoch+1, nextEpochValidatorIndices, true /* notCurrentSlot */)
// Update beacon committee subscriptions this and the next epoch.
go func() {
subscriptionInfo, err := s.beaconCommitteeSubscriber.Subscribe(ctx, epoch, accounts)
if err != nil {
log.Warn().Err(err).Msg("Failed to subscribe to beacon committees")
s.subscriptionInfos[epoch] = subscriptionInfo
go func() {
subscriptionInfo, err := s.beaconCommitteeSubscriber.Subscribe(ctx, epoch+1, nextEpochAccounts)
if err != nil {
log.Warn().Err(err).Msg("Failed to subscribe to beacon committees")
s.subscriptionInfos[epoch+1] = subscriptionInfo
return s, nil
// startTickers starts the various tickers for the controller's operations.
func (s *Service) startTickers(ctx context.Context) error {
genesisTime := s.chainTimeService.GenesisTime()
now := time.Now()
waitedForGenesis := false
if now.Before(genesisTime) {
waitedForGenesis = true
// Wait for genesis.
log.Info().Str("genesis", fmt.Sprintf("%v", genesisTime)).Msg("Waiting for genesis")
// Start epoch tickers.
log.Trace().Msg("Starting epoch tickers")
if err := s.startEpochTicker(ctx, waitedForGenesis); err != nil {
return errors.Wrap(err, "failed to start epoch ticker")
// Start account refresher.
log.Trace().Msg("Starting accounts refresher")
if err := s.startAccountsRefresher(ctx); err != nil {
return errors.Wrap(err, "failed to start accounts refresher")
return nil
type epochTickerData struct {
mutex sync.Mutex
latestEpochRan int64
atGenesis bool
// startEpochTicker starts a ticker that ticks at the beginning of each epoch.
func (s *Service) startEpochTicker(ctx context.Context, waitedForGenesis bool) error {
runtimeFunc := func(ctx context.Context, data interface{}) (time.Time, error) {
// Schedule for the beginning of the next epoch.
return s.chainTimeService.StartOfEpoch(s.chainTimeService.CurrentEpoch() + 1), nil
data := &epochTickerData{
latestEpochRan: -1,
atGenesis: waitedForGenesis,
if err := s.scheduler.SchedulePeriodicJob(ctx,
"Epoch ticker",
); err != nil {
return errors.Wrap(err, "Failed to schedule epoch ticker")
return nil
// epochTicker sets up the jobs for proposal, attestation and aggregation.
func (s *Service) epochTicker(ctx context.Context, data interface{}) {
// Ensure we don't run for the same epoch twice.
epochTickerData := data.(*epochTickerData)
currentEpoch := s.chainTimeService.CurrentEpoch()
log.Trace().Uint64("epoch", uint64(currentEpoch)).Msg("Starting per-epoch job")
if epochTickerData.latestEpochRan >= int64(currentEpoch) {
log.Trace().Uint64("epoch", uint64(currentEpoch)).Msg("Already ran for this epoch; skipping")
epochTickerData.latestEpochRan = int64(currentEpoch)
// We wait for the beacon node to update, but keep ourselves busy in the meantime.
waitCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
_, validatorIndices, err := s.accountsAndIndicesForEpoch(ctx, currentEpoch)
if err != nil {
log.Error().Err(err).Uint64("epoch", uint64(currentEpoch)).Msg("Failed to obtain active validators for epoch")
nextEpochAccounts, nextEpochValidatorIndices, err := s.accountsAndIndicesForEpoch(ctx, currentEpoch+1)
if err != nil {
log.Error().Err(err).Uint64("epoch", uint64(currentEpoch)).Msg("Failed to obtain active validators for next epoch")
// Expect at least one validator.
if len(validatorIndices) == 0 && len(nextEpochValidatorIndices) == 0 {
log.Warn().Msg("No active validators; not validating")
// Done the preparation work available to us; wait for the end of the timer.
go s.scheduleProposals(ctx, currentEpoch, validatorIndices, false /* notCurrentSlot */)
go s.scheduleAttestations(ctx, currentEpoch+1, nextEpochValidatorIndices, false /* notCurrentSlot */)
if s.handlingAltair {
// Only update if we are on an EPOCHS_PER_SYNC_COMMITTEE_PERIOD boundary.
if uint64(currentEpoch)%s.epochsPerSyncCommitteePeriod == 0 {
go s.scheduleSyncCommitteeMessages(ctx, currentEpoch, validatorIndices)
go func() {
// Update beacon committee subscriptions for the next epoch.
subscriptionInfo, err := s.beaconCommitteeSubscriber.Subscribe(ctx, currentEpoch+1, nextEpochAccounts)
if err != nil {
log.Warn().Err(err).Msg("Failed to subscribe to beacon committees")
s.subscriptionInfos[currentEpoch+1] = subscriptionInfo
epochTickerData.atGenesis = false
// accountsAndIndicesForEpoch obtains the accounts and validator indices for the specified epoch.
func (s *Service) accountsAndIndicesForEpoch(ctx context.Context,
epoch phase0.Epoch,
) (
) {
accounts, err := s.validatingAccountsProvider.ValidatingAccountsForEpoch(ctx, epoch)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to obtain accounts")
validatorIndices := make([]phase0.ValidatorIndex, 0, len(accounts))
for index := range accounts {
validatorIndices = append(validatorIndices, index)
return accounts, validatorIndices, nil