mirror of https://github.com/certusone/vouch.git
224 lines
8.6 KiB
Go
224 lines
8.6 KiB
Go
// Copyright © 2020 Attestant Limited.
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package standard
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"time"
|
|
|
|
eth2client "github.com/attestantio/go-eth2-client"
|
|
"github.com/attestantio/go-eth2-client/spec/phase0"
|
|
"github.com/attestantio/vouch/services/accountmanager"
|
|
"github.com/attestantio/vouch/services/attestationaggregator"
|
|
"github.com/attestantio/vouch/services/metrics"
|
|
"github.com/attestantio/vouch/services/signer"
|
|
"github.com/attestantio/vouch/services/submitter"
|
|
"github.com/pkg/errors"
|
|
"github.com/rs/zerolog"
|
|
zerologger "github.com/rs/zerolog/log"
|
|
e2wtypes "github.com/wealdtech/go-eth2-wallet-types/v2"
|
|
)
|
|
|
|
// Service is an attestation aggregator.
|
|
type Service struct {
|
|
monitor metrics.AttestationAggregationMonitor
|
|
targetAggregatorsPerCommittee uint64
|
|
slotsPerEpoch uint64
|
|
validatingAccountsProvider accountmanager.ValidatingAccountsProvider
|
|
aggregateAttestationProvider eth2client.AggregateAttestationProvider
|
|
prysmAggregateAttestationProvider eth2client.PrysmAggregateAttestationProvider
|
|
aggregateAttestationsSubmitter submitter.AggregateAttestationsSubmitter
|
|
slotSelectionSigner signer.SlotSelectionSigner
|
|
aggregateAndProofSigner signer.AggregateAndProofSigner
|
|
}
|
|
|
|
// module-wide log.
|
|
var log zerolog.Logger
|
|
|
|
// New creates a new attestation aggregator.
|
|
func New(ctx context.Context, params ...Parameter) (*Service, error) {
|
|
parameters, err := parseAndCheckParameters(params...)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "problem with parameters")
|
|
}
|
|
|
|
// Set logging.
|
|
log = zerologger.With().Str("service", "attestationaggregator").Str("impl", "standard").Logger()
|
|
if parameters.logLevel != log.GetLevel() {
|
|
log = log.Level(parameters.logLevel)
|
|
}
|
|
|
|
targetAggregatorsPerCommittee, err := parameters.targetAggregatorsPerCommitteeProvider.TargetAggregatorsPerCommittee(ctx)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to obtain target aggregators per committee")
|
|
}
|
|
slotsPerEpoch, err := parameters.slotsPerEpochProvider.SlotsPerEpoch(ctx)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to obtain slots per epoch")
|
|
}
|
|
|
|
s := &Service{
|
|
monitor: parameters.monitor,
|
|
targetAggregatorsPerCommittee: targetAggregatorsPerCommittee,
|
|
slotsPerEpoch: slotsPerEpoch,
|
|
validatingAccountsProvider: parameters.validatingAccountsProvider,
|
|
aggregateAttestationProvider: parameters.aggregateAttestationProvider,
|
|
prysmAggregateAttestationProvider: parameters.prysmAggregateAttestationProvider,
|
|
aggregateAttestationsSubmitter: parameters.aggregateAttestationsSubmitter,
|
|
slotSelectionSigner: parameters.slotSelectionSigner,
|
|
aggregateAndProofSigner: parameters.aggregateAndProofSigner,
|
|
}
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// Aggregate aggregates the attestations for a given slot/committee combination.
|
|
func (s *Service) Aggregate(ctx context.Context, data interface{}) {
|
|
started := time.Now()
|
|
|
|
duty, ok := data.(*attestationaggregator.Duty)
|
|
if !ok {
|
|
log.Error().Msg("Passed invalid data structure")
|
|
s.monitor.AttestationAggregationCompleted(started, "failed")
|
|
return
|
|
}
|
|
log := log.With().Uint64("slot", uint64(duty.Slot)).Str("attestation_data_root", fmt.Sprintf("%#x", duty.AttestationDataRoot)).Logger()
|
|
log.Trace().Msg("Aggregating")
|
|
|
|
// Obtain the aggregate attestation.
|
|
var aggregateAttestation *phase0.Attestation
|
|
var err error
|
|
if s.aggregateAttestationProvider != nil {
|
|
aggregateAttestation, err = s.aggregateAttestationProvider.AggregateAttestation(ctx, duty.Slot, duty.AttestationDataRoot)
|
|
} else {
|
|
var validatorPubKey phase0.BLSPubKey
|
|
if provider, isProvider := duty.Account.(e2wtypes.AccountCompositePublicKeyProvider); isProvider {
|
|
copy(validatorPubKey[:], provider.CompositePublicKey().Marshal())
|
|
} else {
|
|
copy(validatorPubKey[:], duty.Account.PublicKey().Marshal())
|
|
}
|
|
aggregateAttestation, err = s.prysmAggregateAttestationProvider.PrysmAggregateAttestation(ctx, duty.Attestation, validatorPubKey, duty.SlotSignature)
|
|
}
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to obtain aggregate attestation")
|
|
s.monitor.AttestationAggregationCompleted(started, "failed")
|
|
return
|
|
}
|
|
log.Trace().Dur("elapsed", time.Since(started)).Msg("Obtained aggregate attestation")
|
|
if aggregateAttestation == nil {
|
|
log.Debug().Msg("Obtained nil aggregate attestation")
|
|
return
|
|
}
|
|
|
|
// Fetch the validating account.
|
|
epoch := phase0.Epoch(uint64(aggregateAttestation.Data.Slot) / s.slotsPerEpoch)
|
|
accounts, err := s.validatingAccountsProvider.ValidatingAccountsForEpochByIndex(ctx, epoch, []phase0.ValidatorIndex{duty.ValidatorIndex})
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to obtain proposing validator account")
|
|
s.monitor.AttestationAggregationCompleted(started, "failed")
|
|
return
|
|
}
|
|
if len(accounts) != 1 {
|
|
log.Error().Err(err).Msg("Unknown proposing validator account")
|
|
s.monitor.AttestationAggregationCompleted(started, "failed")
|
|
return
|
|
}
|
|
account := accounts[duty.ValidatorIndex]
|
|
log.Trace().Dur("elapsed", time.Since(started)).Msg("Obtained aggregating account")
|
|
|
|
// Sign the aggregate attestation.
|
|
aggregateAndProof := &phase0.AggregateAndProof{
|
|
AggregatorIndex: duty.ValidatorIndex,
|
|
Aggregate: aggregateAttestation,
|
|
SelectionProof: duty.SlotSignature,
|
|
}
|
|
aggregateAndProofRoot, err := aggregateAndProof.HashTreeRoot()
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to generate hash tree root of aggregate and proof")
|
|
}
|
|
sig, err := s.aggregateAndProofSigner.SignAggregateAndProof(ctx, account, duty.Slot, phase0.Root(aggregateAndProofRoot))
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to sign aggregate and proof")
|
|
s.monitor.AttestationAggregationCompleted(started, "failed")
|
|
return
|
|
}
|
|
log.Trace().Dur("elapsed", time.Since(started)).Msg("Signed aggregate attestation")
|
|
|
|
// Submit the signed aggregate and proof.
|
|
signedAggregateAndProofs := []*phase0.SignedAggregateAndProof{
|
|
{
|
|
Message: aggregateAndProof,
|
|
Signature: sig,
|
|
},
|
|
}
|
|
if err := s.aggregateAttestationsSubmitter.SubmitAggregateAttestations(ctx, signedAggregateAndProofs); err != nil {
|
|
log.Error().Err(err).Msg("Failed to submit aggregate and proof")
|
|
s.monitor.AttestationAggregationCompleted(started, "failed")
|
|
return
|
|
}
|
|
log.Trace().Dur("elapsed", time.Since(started)).Msg("Submitted aggregate attestation")
|
|
|
|
frac := float64(aggregateAndProof.Aggregate.AggregationBits.Count()) /
|
|
float64(aggregateAndProof.Aggregate.AggregationBits.Len())
|
|
s.monitor.AttestationAggregationCoverage(frac)
|
|
s.monitor.AttestationAggregationCompleted(started, "succeeded")
|
|
}
|
|
|
|
// IsAggregator reports if we are an attestation aggregator for a given validator/committee/slot combination.
|
|
func (s *Service) IsAggregator(ctx context.Context,
|
|
validatorIndex phase0.ValidatorIndex,
|
|
committeeIndex phase0.CommitteeIndex,
|
|
slot phase0.Slot,
|
|
committeeSize uint64,
|
|
) (bool, phase0.BLSSignature, error) {
|
|
modulo := committeeSize / s.targetAggregatorsPerCommittee
|
|
if modulo == 0 {
|
|
// Modulo must be at least 1.
|
|
modulo = 1
|
|
}
|
|
|
|
// Fetch the validator from the account manager.
|
|
epoch := phase0.Epoch(uint64(slot) / s.slotsPerEpoch)
|
|
accounts, err := s.validatingAccountsProvider.ValidatingAccountsForEpochByIndex(ctx, epoch, []phase0.ValidatorIndex{validatorIndex})
|
|
if err != nil {
|
|
return false, phase0.BLSSignature{}, errors.Wrap(err, "failed to obtain validator")
|
|
}
|
|
if len(accounts) == 0 {
|
|
return false, phase0.BLSSignature{}, errors.New("validator unknown")
|
|
}
|
|
account := accounts[validatorIndex]
|
|
|
|
// Sign the slot.
|
|
signature, err := s.slotSelectionSigner.SignSlotSelection(ctx, account, slot)
|
|
if err != nil {
|
|
return false, phase0.BLSSignature{}, errors.Wrap(err, "failed to sign the slot")
|
|
}
|
|
|
|
// Hash the signature.
|
|
sigHash := sha256.New()
|
|
n, err := sigHash.Write(signature[:])
|
|
if err != nil {
|
|
return false, phase0.BLSSignature{}, errors.Wrap(err, "failed to hash the slot signature")
|
|
}
|
|
if n != len(signature) {
|
|
return false, phase0.BLSSignature{}, errors.New("failed to write all bytes of the slot signature to the hash")
|
|
}
|
|
hash := sigHash.Sum(nil)
|
|
|
|
return binary.LittleEndian.Uint64(hash[:8])%modulo == 0, signature, nil
|
|
}
|