Re-implement accountmanager metrics.

This reintroduces prometheus metrics for the account manager module.
The metrics track the validating state of each account, and are found
under the `vouch_accountmanager_accounts_total` metric.  The metrics are
differentiated using the `state` label.
This commit is contained in:
Jim McDonald 2020-12-16 13:46:00 +00:00
parent fda79aa31a
commit 4071dabf9e
No known key found for this signature in database
GPG Key ID: 89CEB61B2AD2A5E7
8 changed files with 236 additions and 74 deletions

View File

@ -1,3 +1,7 @@
Development:
- re-implement accountmanager metrics
1.0.2:
- avoid crash in "best" attestationdata strategy
1.0.1:
- include source and target epochs when scoring attestation data

View File

@ -36,6 +36,7 @@ import (
standardattester "github.com/attestantio/vouch/services/attester/standard"
standardbeaconblockproposer "github.com/attestantio/vouch/services/beaconblockproposer/standard"
standardbeaconcommitteesubscriber "github.com/attestantio/vouch/services/beaconcommitteesubscriber/standard"
"github.com/attestantio/vouch/services/chaintime"
standardchaintime "github.com/attestantio/vouch/services/chaintime/standard"
standardcontroller "github.com/attestantio/vouch/services/controller/standard"
"github.com/attestantio/vouch/services/graffitiprovider"
@ -281,7 +282,7 @@ func startServices(ctx context.Context, majordomo majordomo.Service) error {
}
log.Trace().Msg("Starting account manager")
accountManager, err := startAccountManager(ctx, monitor, eth2Client, validatorsManager, majordomo)
accountManager, err := startAccountManager(ctx, monitor, eth2Client, validatorsManager, majordomo, chainTime)
if err != nil {
return errors.Wrap(err, "failed to start account manager")
}
@ -573,7 +574,7 @@ func startSigner(ctx context.Context, monitor metrics.Service, eth2Client eth2cl
return signer, nil
}
func startAccountManager(ctx context.Context, monitor metrics.Service, eth2Client eth2client.Service, validatorsManager validatorsmanager.Service, majordomo majordomo.Service) (accountmanager.Service, error) {
func startAccountManager(ctx context.Context, monitor metrics.Service, eth2Client eth2client.Service, validatorsManager validatorsmanager.Service, majordomo majordomo.Service, chainTime chaintime.Service) (accountmanager.Service, error) {
var accountManager accountmanager.Service
if viper.Get("accountmanager.dirk") != nil {
log.Info().Msg("Starting dirk account manager")
@ -604,6 +605,7 @@ func startAccountManager(ctx context.Context, monitor metrics.Service, eth2Clien
dirkaccountmanager.WithCACert(caPEMBlock),
dirkaccountmanager.WithDomainProvider(eth2Client.(eth2client.DomainProvider)),
dirkaccountmanager.WithFarFutureEpochProvider(eth2Client.(eth2client.FarFutureEpochProvider)),
dirkaccountmanager.WithCurrentEpochProvider(chainTime),
)
if err != nil {
return nil, errors.Wrap(err, "failed to start dirk account manager service")

View File

@ -17,6 +17,7 @@ import (
"context"
eth2client "github.com/attestantio/go-eth2-client"
"github.com/attestantio/vouch/services/chaintime"
"github.com/attestantio/vouch/services/metrics"
nullmetrics "github.com/attestantio/vouch/services/metrics/null"
"github.com/attestantio/vouch/services/validatorsmanager"
@ -36,6 +37,7 @@ type parameters struct {
domainProvider eth2client.DomainProvider
validatorsManager validatorsmanager.Service
farFutureEpochProvider eth2client.FarFutureEpochProvider
currentEpochProvider chaintime.Service
}
// Parameter is the interface for service parameters.
@ -126,6 +128,13 @@ func WithFarFutureEpochProvider(provider eth2client.FarFutureEpochProvider) Para
})
}
// WithCurrentEpochProvider sets the current epoch provider.
func WithCurrentEpochProvider(provider chaintime.Service) Parameter {
return parameterFunc(func(p *parameters) {
p.currentEpochProvider = provider
})
}
// parseAndCheckParameters parses and checks parameters to ensure that mandatory parameters are present and correct.
func parseAndCheckParameters(params ...Parameter) (*parameters, error) {
parameters := parameters{
@ -166,6 +175,9 @@ func parseAndCheckParameters(params ...Parameter) (*parameters, error) {
if parameters.farFutureEpochProvider == nil {
return nil, errors.New("no far future epoch provider specified")
}
if parameters.currentEpochProvider == nil {
return nil, errors.New("no current epoch provider specified")
}
return &parameters, nil
}

View File

@ -26,6 +26,7 @@ import (
eth2client "github.com/attestantio/go-eth2-client"
api "github.com/attestantio/go-eth2-client/api/v1"
spec "github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/attestantio/vouch/services/chaintime"
"github.com/attestantio/vouch/services/metrics"
"github.com/attestantio/vouch/services/validatorsmanager"
"github.com/pkg/errors"
@ -39,18 +40,19 @@ import (
// Service is the manager for dirk accounts.
type Service struct {
mutex sync.RWMutex
monitor metrics.AccountManagerMonitor
clientMonitor metrics.ClientMonitor
endpoints []*dirk.Endpoint
accountPaths []string
credentials credentials.TransportCredentials
accounts map[spec.BLSPubKey]e2wtypes.Account
validatorsManager validatorsmanager.Service
domainProvider eth2client.DomainProvider
farFutureEpoch spec.Epoch
wallets map[string]e2wtypes.Wallet
walletsMutex sync.RWMutex
mutex sync.RWMutex
monitor metrics.AccountManagerMonitor
clientMonitor metrics.ClientMonitor
endpoints []*dirk.Endpoint
accountPaths []string
credentials credentials.TransportCredentials
accounts map[spec.BLSPubKey]e2wtypes.Account
validatorsManager validatorsmanager.Service
domainProvider eth2client.DomainProvider
farFutureEpoch spec.Epoch
currentEpochProvider chaintime.Service
wallets map[string]e2wtypes.Wallet
walletsMutex sync.RWMutex
}
// module-wide log.
@ -102,15 +104,16 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) {
}
s := &Service{
monitor: parameters.monitor,
clientMonitor: parameters.clientMonitor,
endpoints: endpoints,
accountPaths: parameters.accountPaths,
credentials: credentials,
domainProvider: parameters.domainProvider,
validatorsManager: parameters.validatorsManager,
farFutureEpoch: farFutureEpoch,
wallets: make(map[string]e2wtypes.Wallet),
monitor: parameters.monitor,
clientMonitor: parameters.clientMonitor,
endpoints: endpoints,
accountPaths: parameters.accountPaths,
credentials: credentials,
domainProvider: parameters.domainProvider,
validatorsManager: parameters.validatorsManager,
farFutureEpoch: farFutureEpoch,
currentEpochProvider: parameters.currentEpochProvider,
wallets: make(map[string]e2wtypes.Wallet),
}
if err := s.refreshAccounts(ctx); err != nil {
@ -233,6 +236,20 @@ func credentialsFromCerts(ctx context.Context, clientCert []byte, clientKey []by
// ValidatingAccountsForEpoch obtains the validating accounts for a given epoch.
func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch spec.Epoch) (map[spec.ValidatorIndex]e2wtypes.Account, error) {
// stateCount is used to update metrics.
stateCount := map[api.ValidatorState]uint64{
api.ValidatorStateUnknown: 0,
api.ValidatorStatePendingInitialized: 0,
api.ValidatorStatePendingQueued: 0,
api.ValidatorStateActiveOngoing: 0,
api.ValidatorStateActiveExiting: 0,
api.ValidatorStateActiveSlashed: 0,
api.ValidatorStateExitedUnslashed: 0,
api.ValidatorStateExitedSlashed: 0,
api.ValidatorStateWithdrawalPossible: 0,
api.ValidatorStateWithdrawalDone: 0,
}
validatingAccounts := make(map[spec.ValidatorIndex]e2wtypes.Account)
pubKeys := make([]spec.BLSPubKey, 0, len(s.accounts))
for pubKey := range s.accounts {
@ -242,8 +259,24 @@ func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch spec.Epo
validators := s.validatorsManager.ValidatorsByPubKey(ctx, pubKeys)
for index, validator := range validators {
state := api.ValidatorToState(validator, epoch, s.farFutureEpoch)
stateCount[state]++
if state == api.ValidatorStateActiveOngoing || state == api.ValidatorStateActiveExiting {
validatingAccounts[index] = s.accounts[validator.PublicKey]
account := s.accounts[validator.PublicKey]
log.Trace().
Str("name", account.Name()).
Str("public_key", fmt.Sprintf("%x", account.PublicKey().Marshal())).
Uint64("index", uint64(index)).
Str("state", state.String()).
Msg("Validating account")
validatingAccounts[index] = account
}
}
// Update metrics if this is the current epoch.
if epoch == s.currentEpochProvider.CurrentEpoch() {
stateCount[api.ValidatorStateUnknown] += uint64(len(s.accounts) - len(validators))
for state, count := range stateCount {
s.monitor.Accounts(strings.ToLower(state.String()), count)
}
}

View File

@ -17,9 +17,11 @@ import (
"context"
"regexp"
"testing"
"time"
spec "github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/attestantio/vouch/mock"
standardchaintime "github.com/attestantio/vouch/services/chaintime/standard"
nullmetrics "github.com/attestantio/vouch/services/metrics/null"
"github.com/attestantio/vouch/testing/logger"
"github.com/attestantio/vouch/testing/resources"
@ -171,6 +173,19 @@ func TestAccounts(t *testing.T) {
}
func setupService(ctx context.Context, t *testing.T, endpoints []string, accountPaths []string) (*Service, error) {
genesisTime := time.Now()
slotDuration := 12 * time.Second
slotsPerEpoch := uint64(32)
mockGenesisTimeProvider := mock.NewGenesisTimeProvider(genesisTime)
mockSlotDurationProvider := mock.NewSlotDurationProvider(slotDuration)
mockSlotsPerEpochProvider := mock.NewSlotsPerEpochProvider(slotsPerEpoch)
chainTime, err := standardchaintime.New(ctx,
standardchaintime.WithGenesisTimeProvider(mockGenesisTimeProvider),
standardchaintime.WithSlotDurationProvider(mockSlotDurationProvider),
standardchaintime.WithSlotsPerEpochProvider(mockSlotsPerEpochProvider),
)
require.NoError(t, err)
return New(ctx,
WithLogLevel(zerolog.TraceLevel),
WithMonitor(nullmetrics.New(context.Background())),
@ -183,6 +198,7 @@ func setupService(ctx context.Context, t *testing.T, endpoints []string, account
WithValidatorsManager(mock.NewValidatorsManager()),
WithDomainProvider(mock.NewDomainProvider()),
WithFarFutureEpochProvider(mock.NewFarFutureEpochProvider(0xffffffffffffffff)),
WithCurrentEpochProvider(chainTime),
)
}

View File

@ -16,9 +16,11 @@ package dirk_test
import (
"context"
"testing"
"time"
"github.com/attestantio/vouch/mock"
"github.com/attestantio/vouch/services/accountmanager/dirk"
standardchaintime "github.com/attestantio/vouch/services/chaintime/standard"
nullmetrics "github.com/attestantio/vouch/services/metrics/null"
"github.com/attestantio/vouch/testing/logger"
"github.com/attestantio/vouch/testing/resources"
@ -27,9 +29,24 @@ import (
)
func TestService(t *testing.T) {
ctx := context.Background()
genesisTime := time.Now()
slotDuration := 12 * time.Second
slotsPerEpoch := uint64(32)
mockGenesisTimeProvider := mock.NewGenesisTimeProvider(genesisTime)
mockSlotDurationProvider := mock.NewSlotDurationProvider(slotDuration)
mockSlotsPerEpochProvider := mock.NewSlotsPerEpochProvider(slotsPerEpoch)
domainProvider := mock.NewDomainProvider()
validatorsManager := mock.NewValidatorsManager()
farFutureEpochProvider := mock.NewFarFutureEpochProvider(0xffffffffffffffff)
chainTime, err := standardchaintime.New(ctx,
standardchaintime.WithGenesisTimeProvider(mockGenesisTimeProvider),
standardchaintime.WithSlotDurationProvider(mockSlotDurationProvider),
standardchaintime.WithSlotsPerEpochProvider(mockSlotsPerEpochProvider),
)
require.NoError(t, err)
tests := []struct {
name string
@ -51,6 +68,7 @@ func TestService(t *testing.T) {
dirk.WithValidatorsManager(validatorsManager),
dirk.WithDomainProvider(domainProvider),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
err: "problem with parameters: no monitor specified",
},
@ -58,7 +76,7 @@ func TestService(t *testing.T) {
name: "ClientMonitorNil",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.TraceLevel),
dirk.WithMonitor(nullmetrics.New(context.Background())),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nil),
dirk.WithEndpoints([]string{"localhost:12345", "localhost:12346"}),
dirk.WithAccountPaths([]string{"wallet1", "wallet2"}),
@ -68,6 +86,7 @@ func TestService(t *testing.T) {
dirk.WithValidatorsManager(validatorsManager),
dirk.WithDomainProvider(domainProvider),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
err: "problem with parameters: no client monitor specified",
},
@ -75,8 +94,8 @@ func TestService(t *testing.T) {
name: "EndpointsNil",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.TraceLevel),
dirk.WithMonitor(nullmetrics.New(context.Background())),
dirk.WithClientMonitor(nullmetrics.New(context.Background())),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nullmetrics.New(ctx)),
dirk.WithAccountPaths([]string{"wallet1", "wallet2"}),
dirk.WithClientCert([]byte(resources.ClientTest01Crt)),
dirk.WithClientKey([]byte(resources.ClientTest01Key)),
@ -84,6 +103,7 @@ func TestService(t *testing.T) {
dirk.WithValidatorsManager(validatorsManager),
dirk.WithDomainProvider(domainProvider),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
err: "problem with parameters: no endpoints specified",
},
@ -91,8 +111,8 @@ func TestService(t *testing.T) {
name: "EndpointsEmpty",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.TraceLevel),
dirk.WithMonitor(nullmetrics.New(context.Background())),
dirk.WithClientMonitor(nullmetrics.New(context.Background())),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nullmetrics.New(ctx)),
dirk.WithEndpoints([]string{}),
dirk.WithAccountPaths([]string{"wallet1", "wallet2"}),
dirk.WithClientCert([]byte(resources.ClientTest01Crt)),
@ -101,6 +121,7 @@ func TestService(t *testing.T) {
dirk.WithValidatorsManager(validatorsManager),
dirk.WithDomainProvider(domainProvider),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
err: "problem with parameters: no endpoints specified",
},
@ -108,8 +129,8 @@ func TestService(t *testing.T) {
name: "EndpointsMalformedEndpoint",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.TraceLevel),
dirk.WithMonitor(nullmetrics.New(context.Background())),
dirk.WithClientMonitor(nullmetrics.New(context.Background())),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nullmetrics.New(ctx)),
dirk.WithEndpoints([]string{""}),
dirk.WithAccountPaths([]string{"wallet1", "wallet2"}),
dirk.WithClientCert([]byte(resources.ClientTest01Crt)),
@ -118,6 +139,7 @@ func TestService(t *testing.T) {
dirk.WithValidatorsManager(validatorsManager),
dirk.WithDomainProvider(domainProvider),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
err: "no valid endpoints specified",
logEntry: "Malformed endpoint",
@ -126,8 +148,8 @@ func TestService(t *testing.T) {
name: "EndpointsMalformedPort",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.TraceLevel),
dirk.WithMonitor(nullmetrics.New(context.Background())),
dirk.WithClientMonitor(nullmetrics.New(context.Background())),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nullmetrics.New(ctx)),
dirk.WithEndpoints([]string{"host:bad"}),
dirk.WithAccountPaths([]string{"wallet1", "wallet2"}),
dirk.WithClientCert([]byte(resources.ClientTest01Crt)),
@ -136,6 +158,7 @@ func TestService(t *testing.T) {
dirk.WithValidatorsManager(validatorsManager),
dirk.WithDomainProvider(domainProvider),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
err: "no valid endpoints specified",
logEntry: "Malformed port",
@ -144,8 +167,8 @@ func TestService(t *testing.T) {
name: "EndpointsInvalidPort",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.TraceLevel),
dirk.WithMonitor(nullmetrics.New(context.Background())),
dirk.WithClientMonitor(nullmetrics.New(context.Background())),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nullmetrics.New(ctx)),
dirk.WithEndpoints([]string{"host:0"}),
dirk.WithAccountPaths([]string{"wallet1", "wallet2"}),
dirk.WithClientCert([]byte(resources.ClientTest01Crt)),
@ -154,6 +177,7 @@ func TestService(t *testing.T) {
dirk.WithValidatorsManager(validatorsManager),
dirk.WithDomainProvider(domainProvider),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
err: "no valid endpoints specified",
logEntry: "Invalid port",
@ -162,8 +186,8 @@ func TestService(t *testing.T) {
name: "AccountPathsNil",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.TraceLevel),
dirk.WithMonitor(nullmetrics.New(context.Background())),
dirk.WithClientMonitor(nullmetrics.New(context.Background())),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nullmetrics.New(ctx)),
dirk.WithEndpoints([]string{"localhost:12345", "localhost:12346"}),
dirk.WithClientCert([]byte(resources.ClientTest01Crt)),
dirk.WithClientKey([]byte(resources.ClientTest01Key)),
@ -171,6 +195,7 @@ func TestService(t *testing.T) {
dirk.WithValidatorsManager(validatorsManager),
dirk.WithDomainProvider(domainProvider),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
err: "problem with parameters: no account paths specified",
},
@ -178,8 +203,8 @@ func TestService(t *testing.T) {
name: "AccountPathsEmpty",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.TraceLevel),
dirk.WithMonitor(nullmetrics.New(context.Background())),
dirk.WithClientMonitor(nullmetrics.New(context.Background())),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nullmetrics.New(ctx)),
dirk.WithEndpoints([]string{"localhost:12345", "localhost:12346"}),
dirk.WithAccountPaths([]string{}),
dirk.WithClientCert([]byte(resources.ClientTest01Crt)),
@ -188,6 +213,7 @@ func TestService(t *testing.T) {
dirk.WithValidatorsManager(validatorsManager),
dirk.WithDomainProvider(domainProvider),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
err: "problem with parameters: no account paths specified",
},
@ -195,8 +221,8 @@ func TestService(t *testing.T) {
name: "ClientCertMissing",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.TraceLevel),
dirk.WithMonitor(nullmetrics.New(context.Background())),
dirk.WithClientMonitor(nullmetrics.New(context.Background())),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nullmetrics.New(ctx)),
dirk.WithEndpoints([]string{"localhost:12345", "localhost:12346"}),
dirk.WithAccountPaths([]string{"wallet1", "wallet2"}),
dirk.WithClientKey([]byte(resources.ClientTest01Key)),
@ -204,6 +230,7 @@ func TestService(t *testing.T) {
dirk.WithValidatorsManager(validatorsManager),
dirk.WithDomainProvider(domainProvider),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
err: "problem with parameters: no client certificate specified",
},
@ -211,8 +238,8 @@ func TestService(t *testing.T) {
name: "ClientKeyMissing",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.TraceLevel),
dirk.WithMonitor(nullmetrics.New(context.Background())),
dirk.WithClientMonitor(nullmetrics.New(context.Background())),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nullmetrics.New(ctx)),
dirk.WithEndpoints([]string{"localhost:12345", "localhost:12346"}),
dirk.WithAccountPaths([]string{"wallet1", "wallet2"}),
dirk.WithClientCert([]byte(resources.ClientTest01Crt)),
@ -220,6 +247,7 @@ func TestService(t *testing.T) {
dirk.WithValidatorsManager(validatorsManager),
dirk.WithDomainProvider(domainProvider),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
err: "problem with parameters: no client key specified",
},
@ -227,8 +255,8 @@ func TestService(t *testing.T) {
name: "ClientCertKeyMismatch",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.Disabled),
dirk.WithMonitor(nullmetrics.New(context.Background())),
dirk.WithClientMonitor(nullmetrics.New(context.Background())),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nullmetrics.New(ctx)),
dirk.WithEndpoints([]string{"localhost:12345", "localhost:12346"}),
dirk.WithAccountPaths([]string{"wallet1", "wallet2"}),
dirk.WithClientCert([]byte(resources.ClientTest01Crt)),
@ -237,6 +265,7 @@ func TestService(t *testing.T) {
dirk.WithValidatorsManager(validatorsManager),
dirk.WithDomainProvider(domainProvider),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
err: "failed to build credentials: failed to load client keypair: tls: private key does not match public key",
},
@ -244,8 +273,8 @@ func TestService(t *testing.T) {
name: "ValidatorsManagerMissing",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.TraceLevel),
dirk.WithMonitor(nullmetrics.New(context.Background())),
dirk.WithClientMonitor(nullmetrics.New(context.Background())),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nullmetrics.New(ctx)),
dirk.WithEndpoints([]string{"localhost:12345", "localhost:12346"}),
dirk.WithAccountPaths([]string{"wallet1", "wallet2"}),
dirk.WithClientCert([]byte(resources.ClientTest01Crt)),
@ -253,6 +282,7 @@ func TestService(t *testing.T) {
dirk.WithCACert([]byte(resources.CACrt)),
dirk.WithDomainProvider(domainProvider),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
err: "problem with parameters: no validators manager specified",
},
@ -260,8 +290,8 @@ func TestService(t *testing.T) {
name: "DomainProviderMissing",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.TraceLevel),
dirk.WithMonitor(nullmetrics.New(context.Background())),
dirk.WithClientMonitor(nullmetrics.New(context.Background())),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nullmetrics.New(ctx)),
dirk.WithEndpoints([]string{"localhost:12345", "localhost:12346"}),
dirk.WithAccountPaths([]string{"wallet1", "wallet2"}),
dirk.WithClientCert([]byte(resources.ClientTest01Crt)),
@ -269,6 +299,7 @@ func TestService(t *testing.T) {
dirk.WithCACert([]byte(resources.CACrt)),
dirk.WithValidatorsManager(validatorsManager),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
err: "problem with parameters: no domain provider specified",
},
@ -276,8 +307,8 @@ func TestService(t *testing.T) {
name: "FarFutureEpochProviderMissing",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.TraceLevel),
dirk.WithMonitor(nullmetrics.New(context.Background())),
dirk.WithClientMonitor(nullmetrics.New(context.Background())),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nullmetrics.New(ctx)),
dirk.WithEndpoints([]string{"localhost:12345", "localhost:12346"}),
dirk.WithAccountPaths([]string{"wallet1", "wallet2"}),
dirk.WithClientCert([]byte(resources.ClientTest01Crt)),
@ -285,15 +316,16 @@ func TestService(t *testing.T) {
dirk.WithCACert([]byte(resources.CACrt)),
dirk.WithValidatorsManager(validatorsManager),
dirk.WithDomainProvider(domainProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
err: "problem with parameters: no far future epoch provider specified",
},
{
name: "Good",
name: "CurrentEpochProviderMissing",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.Disabled),
dirk.WithMonitor(nullmetrics.New(context.Background())),
dirk.WithClientMonitor(nullmetrics.New(context.Background())),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nullmetrics.New(ctx)),
dirk.WithEndpoints([]string{"localhost:12345", "localhost:12346"}),
dirk.WithAccountPaths([]string{"wallet1", "wallet2"}),
dirk.WithClientCert([]byte(resources.ClientTest01Crt)),
@ -303,13 +335,31 @@ func TestService(t *testing.T) {
dirk.WithDomainProvider(domainProvider),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
},
err: "problem with parameters: no current epoch provider specified",
},
{
name: "Good",
params: []dirk.Parameter{
dirk.WithLogLevel(zerolog.Disabled),
dirk.WithMonitor(nullmetrics.New(ctx)),
dirk.WithClientMonitor(nullmetrics.New(ctx)),
dirk.WithEndpoints([]string{"localhost:12345", "localhost:12346"}),
dirk.WithAccountPaths([]string{"wallet1", "wallet2"}),
dirk.WithClientCert([]byte(resources.ClientTest01Crt)),
dirk.WithClientKey([]byte(resources.ClientTest01Key)),
dirk.WithCACert([]byte(resources.CACrt)),
dirk.WithValidatorsManager(validatorsManager),
dirk.WithDomainProvider(domainProvider),
dirk.WithFarFutureEpochProvider(farFutureEpochProvider),
dirk.WithCurrentEpochProvider(chainTime),
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
capture := logger.NewLogCapture()
_, err := dirk.New(context.Background(), test.params...)
_, err := dirk.New(ctx, test.params...)
if test.err != "" {
require.EqualError(t, err, test.err)
if test.logEntry != "" {

View File

@ -15,6 +15,7 @@ package wallet
import (
eth2client "github.com/attestantio/go-eth2-client"
"github.com/attestantio/vouch/services/chaintime"
"github.com/attestantio/vouch/services/metrics"
"github.com/attestantio/vouch/services/validatorsmanager"
"github.com/pkg/errors"
@ -31,6 +32,7 @@ type parameters struct {
slotsPerEpochProvider eth2client.SlotsPerEpochProvider
domainProvider eth2client.DomainProvider
farFutureEpochProvider eth2client.FarFutureEpochProvider
currentEpochProvider chaintime.Service
}
// Parameter is the interface for service parameters.
@ -107,6 +109,13 @@ func WithDomainProvider(provider eth2client.DomainProvider) Parameter {
})
}
// WithCurrentEpochProvider sets the current epoch provider.
func WithCurrentEpochProvider(provider chaintime.Service) Parameter {
return parameterFunc(func(p *parameters) {
p.currentEpochProvider = provider
})
}
// parseAndCheckParameters parses and checks parameters to ensure that mandatory parameters are present and correct.
func parseAndCheckParameters(params ...Parameter) (*parameters, error) {
parameters := parameters{
@ -139,6 +148,9 @@ func parseAndCheckParameters(params ...Parameter) (*parameters, error) {
if parameters.domainProvider == nil {
return nil, errors.New("no domain provider specified")
}
if parameters.currentEpochProvider == nil {
return nil, errors.New("no current epoch provider specified")
}
return &parameters, nil
}

View File

@ -23,6 +23,7 @@ import (
eth2client "github.com/attestantio/go-eth2-client"
api "github.com/attestantio/go-eth2-client/api/v1"
spec "github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/attestantio/vouch/services/chaintime"
"github.com/attestantio/vouch/services/metrics"
"github.com/attestantio/vouch/services/validatorsmanager"
"github.com/pkg/errors"
@ -36,16 +37,17 @@ import (
// Service is the manager for wallet accounts.
type Service struct {
mutex sync.RWMutex
monitor metrics.AccountManagerMonitor
stores []e2wtypes.Store
accountPaths []string
passphrases [][]byte
accounts map[spec.BLSPubKey]e2wtypes.Account
validatorsManager validatorsmanager.Service
slotsPerEpoch spec.Slot
domainProvider eth2client.DomainProvider
farFutureEpoch spec.Epoch
mutex sync.RWMutex
monitor metrics.AccountManagerMonitor
stores []e2wtypes.Store
accountPaths []string
passphrases [][]byte
accounts map[spec.BLSPubKey]e2wtypes.Account
validatorsManager validatorsmanager.Service
slotsPerEpoch spec.Slot
domainProvider eth2client.DomainProvider
farFutureEpoch spec.Epoch
currentEpochProvider chaintime.Service
}
// module-wide log.
@ -87,14 +89,15 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) {
}
s := &Service{
monitor: parameters.monitor,
stores: stores,
accountPaths: parameters.accountPaths,
passphrases: parameters.passphrases,
validatorsManager: parameters.validatorsManager,
slotsPerEpoch: spec.Slot(slotsPerEpoch),
domainProvider: parameters.domainProvider,
farFutureEpoch: farFutureEpoch,
monitor: parameters.monitor,
stores: stores,
accountPaths: parameters.accountPaths,
passphrases: parameters.passphrases,
validatorsManager: parameters.validatorsManager,
slotsPerEpoch: spec.Slot(slotsPerEpoch),
domainProvider: parameters.domainProvider,
farFutureEpoch: farFutureEpoch,
currentEpochProvider: parameters.currentEpochProvider,
}
if err := s.refreshAccounts(ctx); err != nil {
@ -181,6 +184,20 @@ func (s *Service) refreshValidators(ctx context.Context) error {
// ValidatingAccountsForEpoch obtains the validating accounts for a given epoch.
func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch spec.Epoch) (map[spec.ValidatorIndex]e2wtypes.Account, error) {
// stateCount is used to update metrics.
stateCount := map[api.ValidatorState]uint64{
api.ValidatorStateUnknown: 0,
api.ValidatorStatePendingInitialized: 0,
api.ValidatorStatePendingQueued: 0,
api.ValidatorStateActiveOngoing: 0,
api.ValidatorStateActiveExiting: 0,
api.ValidatorStateActiveSlashed: 0,
api.ValidatorStateExitedUnslashed: 0,
api.ValidatorStateExitedSlashed: 0,
api.ValidatorStateWithdrawalPossible: 0,
api.ValidatorStateWithdrawalDone: 0,
}
validatingAccounts := make(map[spec.ValidatorIndex]e2wtypes.Account)
pubKeys := make([]spec.BLSPubKey, 0, len(s.accounts))
for pubKey := range s.accounts {
@ -190,8 +207,24 @@ func (s *Service) ValidatingAccountsForEpoch(ctx context.Context, epoch spec.Epo
validators := s.validatorsManager.ValidatorsByPubKey(ctx, pubKeys)
for index, validator := range validators {
state := api.ValidatorToState(validator, epoch, s.farFutureEpoch)
stateCount[state]++
if state == api.ValidatorStateActiveOngoing || state == api.ValidatorStateActiveExiting {
validatingAccounts[index] = s.accounts[validator.PublicKey]
account := s.accounts[validator.PublicKey]
log.Trace().
Str("name", account.Name()).
Str("public_key", fmt.Sprintf("%x", account.PublicKey().Marshal())).
Uint64("index", uint64(index)).
Str("state", state.String()).
Msg("Validating account")
validatingAccounts[index] = account
}
}
// Update metrics if this is the current epoch.
if epoch == s.currentEpochProvider.CurrentEpoch() {
stateCount[api.ValidatorStateUnknown] += uint64(len(s.accounts) - len(validators))
for state, count := range stateCount {
s.monitor.Accounts(strings.ToLower(state.String()), count)
}
}