From 64a6e671c0475d6d972e179df6bf33dc3ae476e5 Mon Sep 17 00:00:00 2001 From: Jim McDonald Date: Wed, 28 Oct 2020 19:29:37 +0000 Subject: [PATCH] Add framework for best attestation --- .../attestationdata/best/attestationdata.go | 73 ++++++++++++ .../best/attestationdata_test.go | 103 ++++++++++++++++ strategies/attestationdata/best/parameters.go | 112 ++++++++++++++++++ strategies/attestationdata/best/score.go | 41 +++++++ strategies/attestationdata/best/service.go | 59 +++++++++ .../attestationdata/best/service_test.go | 105 ++++++++++++++++ .../attestationdata/first/attestationdata.go | 62 ++++++++++ .../first/attestationdata_test.go | 103 ++++++++++++++++ strategies/attestationdata/first/service.go | 40 ------- .../attestationdata/first/service_test.go | 77 ------------ 10 files changed, 658 insertions(+), 117 deletions(-) create mode 100644 strategies/attestationdata/best/attestationdata.go create mode 100644 strategies/attestationdata/best/attestationdata_test.go create mode 100644 strategies/attestationdata/best/parameters.go create mode 100644 strategies/attestationdata/best/score.go create mode 100644 strategies/attestationdata/best/service.go create mode 100644 strategies/attestationdata/best/service_test.go create mode 100644 strategies/attestationdata/first/attestationdata.go create mode 100644 strategies/attestationdata/first/attestationdata_test.go diff --git a/strategies/attestationdata/best/attestationdata.go b/strategies/attestationdata/best/attestationdata.go new file mode 100644 index 0000000..e965726 --- /dev/null +++ b/strategies/attestationdata/best/attestationdata.go @@ -0,0 +1,73 @@ +// Copyright © 2020 Attestant Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package best + +import ( + "context" + "sync" + "time" + + eth2client "github.com/attestantio/go-eth2-client" + spec "github.com/attestantio/go-eth2-client/spec/phase0" + "golang.org/x/sync/semaphore" +) + +// AttestationData provides the best attestation data from a number of beacon nodes. +func (s *Service) AttestationData(ctx context.Context, slot uint64, committeeIndex uint64) (*spec.AttestationData, error) { + var mu sync.Mutex + bestScore := float64(0) + var bestAttestationData *spec.AttestationData + + sem := semaphore.NewWeighted(s.processConcurrency) + var wg sync.WaitGroup + for name, provider := range s.attestationDataProviders { + wg.Add(1) + go func(ctx context.Context, sem *semaphore.Weighted, wg *sync.WaitGroup, name string, provider eth2client.AttestationDataProvider, mu *sync.Mutex) { + defer wg.Done() + + if err := sem.Acquire(ctx, 1); err != nil { + log.Error().Err(err).Msg("Failed to acquire semaphore") + return + } + defer sem.Release(1) + log := log.With().Str("provider", name).Uint64("slot", slot).Logger() + + opCtx, cancel := context.WithTimeout(ctx, s.timeout) + started := time.Now() + attestationData, err := provider.AttestationData(opCtx, slot, committeeIndex) + s.clientMonitor.ClientOperation(name, "attestation data", err == nil, time.Since(started)) + if err != nil { + log.Warn().Err(err).Msg("Failed to obtain attestation data") + cancel() + return + } + log.Trace().Dur("elapsed", time.Since(started)).Msg("Obtained attestation data") + cancel() + if attestationData == nil { + return + } + + mu.Lock() + score := s.scoreAttestationData(ctx, name, attestationData) + if score > bestScore || bestAttestationData == nil { + bestScore = score + bestAttestationData = attestationData + } + mu.Unlock() + }(ctx, sem, &wg, name, provider, &mu) + } + wg.Wait() + + return bestAttestationData, nil +} diff --git a/strategies/attestationdata/best/attestationdata_test.go b/strategies/attestationdata/best/attestationdata_test.go new file mode 100644 index 0000000..5833de7 --- /dev/null +++ b/strategies/attestationdata/best/attestationdata_test.go @@ -0,0 +1,103 @@ +// Copyright © 2020 Attestant Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package best_test + +import ( + "context" + "testing" + "time" + + eth2client "github.com/attestantio/go-eth2-client" + "github.com/attestantio/vouch/mock" + "github.com/attestantio/vouch/strategies/attestationdata/best" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func TestAttestationData(t *testing.T) { + tests := []struct { + name string + params []best.Parameter + slot uint64 + committeeIndex uint64 + err string + }{ + { + name: "Good", + params: []best.Parameter{ + best.WithLogLevel(zerolog.Disabled), + best.WithAttestationDataProviders(map[string]eth2client.AttestationDataProvider{ + "good": mock.NewAttestationDataProvider(), + }), + }, + slot: 12345, + committeeIndex: 3, + }, + { + name: "Timeout", + params: []best.Parameter{ + best.WithLogLevel(zerolog.Disabled), + best.WithTimeout(time.Second), + best.WithAttestationDataProviders(map[string]eth2client.AttestationDataProvider{ + "sleepy": mock.NewSleepyAttestationDataProvider(5*time.Second, mock.NewAttestationDataProvider()), + }), + }, + slot: 12345, + committeeIndex: 3, + err: "failed to obtain attestation data before timeout", + }, + { + name: "NilResponse", + params: []best.Parameter{ + best.WithLogLevel(zerolog.Disabled), + best.WithTimeout(time.Second), + best.WithAttestationDataProviders(map[string]eth2client.AttestationDataProvider{ + "nil": mock.NewNilAttestationDataProvider(), + }), + }, + slot: 12345, + committeeIndex: 3, + // Nil response is invalid, so expect a timeout. + err: "failed to obtain attestation data before timeout", + }, + { + name: "GoodMixed", + params: []best.Parameter{ + best.WithLogLevel(zerolog.Disabled), + best.WithTimeout(2 * time.Second), + best.WithAttestationDataProviders(map[string]eth2client.AttestationDataProvider{ + "error": mock.NewErroringAttestationDataProvider(), + "sleepy": mock.NewSleepyAttestationDataProvider(time.Second, mock.NewAttestationDataProvider()), + }), + }, + slot: 12345, + committeeIndex: 3, + }, + } + + for _, test := range tests { + s, err := best.New(context.Background(), test.params...) + require.NoError(t, err) + + t.Run(test.name, func(t *testing.T) { + attestationData, err := s.AttestationData(context.Background(), test.slot, test.committeeIndex) + if test.err != "" { + require.EqualError(t, err, test.err) + } else { + require.NoError(t, err) + require.NotNil(t, attestationData) + } + }) + } +} diff --git a/strategies/attestationdata/best/parameters.go b/strategies/attestationdata/best/parameters.go new file mode 100644 index 0000000..44ee842 --- /dev/null +++ b/strategies/attestationdata/best/parameters.go @@ -0,0 +1,112 @@ +// Copyright © 2020 Attestant Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package best is a strategy that obtains attestation from multiple +// nodes and selects the best one. +package best + +import ( + "context" + "runtime" + "time" + + eth2client "github.com/attestantio/go-eth2-client" + "github.com/attestantio/vouch/services/metrics" + nullmetrics "github.com/attestantio/vouch/services/metrics/null" + "github.com/pkg/errors" + "github.com/rs/zerolog" +) + +type parameters struct { + logLevel zerolog.Level + clientMonitor metrics.ClientMonitor + processConcurrency int64 + attestationDataProviders map[string]eth2client.AttestationDataProvider + timeout time.Duration +} + +// Parameter is the interface for service parameters. +type Parameter interface { + apply(*parameters) +} + +type parameterFunc func(*parameters) + +func (f parameterFunc) apply(p *parameters) { + f(p) +} + +// WithLogLevel sets the log level for the module. +func WithLogLevel(logLevel zerolog.Level) Parameter { + return parameterFunc(func(p *parameters) { + p.logLevel = logLevel + }) +} + +// WithClientMonitor sets the client monitor for the service. +func WithClientMonitor(monitor metrics.ClientMonitor) Parameter { + return parameterFunc(func(p *parameters) { + p.clientMonitor = monitor + }) +} + +// WithProcessConcurrency sets the concurrency for the service. +func WithProcessConcurrency(concurrency int64) Parameter { + return parameterFunc(func(p *parameters) { + p.processConcurrency = concurrency + }) +} + +// WithAttestationDataProviders sets the beacon block proposal providers. +func WithAttestationDataProviders(providers map[string]eth2client.AttestationDataProvider) Parameter { + return parameterFunc(func(p *parameters) { + p.attestationDataProviders = providers + }) +} + +// WithTimeout sets the timeout for beacon block proposal requests. +func WithTimeout(timeout time.Duration) Parameter { + return parameterFunc(func(p *parameters) { + p.timeout = timeout + }) +} + +// parseAndCheckParameters parses and checks parameters to ensure that mandatory parameters are present and correct. +func parseAndCheckParameters(params ...Parameter) (*parameters, error) { + parameters := parameters{ + logLevel: zerolog.GlobalLevel(), + timeout: 2 * time.Second, + clientMonitor: nullmetrics.New(context.Background()), + processConcurrency: int64(runtime.GOMAXPROCS(-1)), + } + for _, p := range params { + if params != nil { + p.apply(¶meters) + } + } + + if parameters.timeout == 0 { + return nil, errors.New("no timeout specified") + } + if parameters.clientMonitor == nil { + return nil, errors.New("no client monitor specified") + } + if parameters.processConcurrency == 0 { + return nil, errors.New("no process concurrency specified") + } + if len(parameters.attestationDataProviders) == 0 { + return nil, errors.New("no attestation data providers specified") + } + + return ¶meters, nil +} diff --git a/strategies/attestationdata/best/score.go b/strategies/attestationdata/best/score.go new file mode 100644 index 0000000..49a42d2 --- /dev/null +++ b/strategies/attestationdata/best/score.go @@ -0,0 +1,41 @@ +// Copyright © 2020 Attestant Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package best + +import ( + "context" + + spec "github.com/attestantio/go-eth2-client/spec/phase0" +) + +// scoreAttestationData generates a score for attestation data. +// The score is relative to the reward expected by proposing the block. +func (s *Service) scoreAttestationData(ctx context.Context, name string, attestationData *spec.AttestationData) float64 { + if attestationData == nil { + return 0 + } + + // log.Trace(). + // Uint64("slot", blockProposal.Slot). + // Str("provider", name). + // Float64("immediate_attestations", immediateAttestationScore). + // Float64("attestations", attestationScore). + // Float64("proposer_slashings", proposerSlashingScore). + // Float64("attester_slashings", attesterSlashingScore). + // Float64("total", attestationScore+proposerSlashingScore+attesterSlashingScore). + // Msg("Scored block") + // + // return attestationScore + proposerSlashingScore + attesterSlashingScore + return 0 +} diff --git a/strategies/attestationdata/best/service.go b/strategies/attestationdata/best/service.go new file mode 100644 index 0000000..e2ef58a --- /dev/null +++ b/strategies/attestationdata/best/service.go @@ -0,0 +1,59 @@ +// Copyright © 2020 Attestant Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package best + +import ( + "context" + "time" + + eth2client "github.com/attestantio/go-eth2-client" + "github.com/attestantio/vouch/services/metrics" + "github.com/pkg/errors" + "github.com/rs/zerolog" + zerologger "github.com/rs/zerolog/log" +) + +// Service is the provider for attestation data. +type Service struct { + clientMonitor metrics.ClientMonitor + processConcurrency int64 + attestationDataProviders map[string]eth2client.AttestationDataProvider + timeout time.Duration +} + +// module-wide log. +var log zerolog.Logger + +// New creates a new attestation data strategy. +func New(ctx context.Context, params ...Parameter) (*Service, error) { + parameters, err := parseAndCheckParameters(params...) + if err != nil { + return nil, errors.Wrap(err, "problem with parameters") + } + + // Set logging. + log = zerologger.With().Str("strategy", "attestationdata").Str("impl", "best").Logger() + if parameters.logLevel != log.GetLevel() { + log = log.Level(parameters.logLevel) + } + + s := &Service{ + timeout: parameters.timeout, + clientMonitor: parameters.clientMonitor, + processConcurrency: parameters.processConcurrency, + attestationDataProviders: parameters.attestationDataProviders, + } + + return s, nil +} diff --git a/strategies/attestationdata/best/service_test.go b/strategies/attestationdata/best/service_test.go new file mode 100644 index 0000000..d52c707 --- /dev/null +++ b/strategies/attestationdata/best/service_test.go @@ -0,0 +1,105 @@ +// Copyright © 2020 Attestant Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package best_test + +import ( + "context" + "testing" + "time" + + eth2client "github.com/attestantio/go-eth2-client" + "github.com/attestantio/vouch/mock" + "github.com/attestantio/vouch/strategies/attestationdata/best" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func TestService(t *testing.T) { + attestationDataProviders := map[string]eth2client.AttestationDataProvider{ + "localhost:1": mock.NewAttestationDataProvider(), + } + + tests := []struct { + name string + params []best.Parameter + err string + }{ + { + name: "TimeoutZero", + params: []best.Parameter{ + best.WithLogLevel(zerolog.TraceLevel), + best.WithTimeout(0), + best.WithAttestationDataProviders(attestationDataProviders), + }, + err: "problem with parameters: no timeout specified", + }, + { + name: "ClientMonitorMissing", + params: []best.Parameter{ + best.WithLogLevel(zerolog.TraceLevel), + best.WithClientMonitor(nil), + best.WithAttestationDataProviders(attestationDataProviders), + }, + err: "problem with parameters: no client monitor specified", + }, + { + name: "AttestationDataProvidersNil", + params: []best.Parameter{ + best.WithLogLevel(zerolog.TraceLevel), + best.WithAttestationDataProviders(nil), + }, + err: "problem with parameters: no attestation data providers specified", + }, + { + name: "AttestationDataProvidersEmpty", + params: []best.Parameter{ + best.WithLogLevel(zerolog.TraceLevel), + best.WithAttestationDataProviders(map[string]eth2client.AttestationDataProvider{}), + }, + err: "problem with parameters: no attestation data providers specified", + }, + { + name: "Good", + params: []best.Parameter{ + best.WithLogLevel(zerolog.TraceLevel), + best.WithTimeout(10 * time.Second), + best.WithAttestationDataProviders(attestationDataProviders), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + _, err := best.New(context.Background(), test.params...) + if test.err != "" { + require.EqualError(t, err, test.err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestInterfaces(t *testing.T) { + attestationDataProviders := map[string]eth2client.AttestationDataProvider{ + "localhost:1": mock.NewAttestationDataProvider(), + } + + s, err := best.New(context.Background(), + best.WithLogLevel(zerolog.Disabled), + best.WithAttestationDataProviders(attestationDataProviders), + ) + require.NoError(t, err) + require.Implements(t, (*eth2client.AttestationDataProvider)(nil), s) +} diff --git a/strategies/attestationdata/first/attestationdata.go b/strategies/attestationdata/first/attestationdata.go new file mode 100644 index 0000000..206c2c7 --- /dev/null +++ b/strategies/attestationdata/first/attestationdata.go @@ -0,0 +1,62 @@ +// Copyright © 2020 Attestant Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package first + +import ( + "context" + "time" + + eth2client "github.com/attestantio/go-eth2-client" + spec "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/pkg/errors" +) + +// AttestationData provides the first attestation data from a number of beacon nodes. +func (s *Service) AttestationData(ctx context.Context, slot uint64, committeeIndex uint64) (*spec.AttestationData, error) { + // We create a cancelable context with a timeout. As soon as the first provider has responded we + // cancel the context to cancel the other requests. + ctx, cancel := context.WithTimeout(ctx, s.timeout) + + attestationDataCh := make(chan *spec.AttestationData, 1) + for name, provider := range s.attestationDataProviders { + go func(ctx context.Context, name string, provider eth2client.AttestationDataProvider, ch chan *spec.AttestationData) { + log := log.With().Str("provider", name).Uint64("slot", slot).Logger() + + started := time.Now() + attestationData, err := provider.AttestationData(ctx, slot, committeeIndex) + s.clientMonitor.ClientOperation(name, "attestation data", err == nil, time.Since(started)) + if err != nil { + log.Warn().Dur("elapsed", time.Since(started)).Err(err).Msg("Failed to obtain attestation data") + return + } + if attestationData == nil { + log.Warn().Dur("elapsed", time.Since(started)).Err(err).Msg("Returned empty attestation data") + return + } + log.Trace().Dur("elapsed", time.Since(started)).Msg("Obtained attestation data") + + ch <- attestationData + }(ctx, name, provider, attestationDataCh) + } + + select { + case <-ctx.Done(): + cancel() + log.Warn().Msg("Failed to obtain attestation data before timeout") + return nil, errors.New("failed to obtain attestation data before timeout") + case attestationData := <-attestationDataCh: + cancel() + return attestationData, nil + } +} diff --git a/strategies/attestationdata/first/attestationdata_test.go b/strategies/attestationdata/first/attestationdata_test.go new file mode 100644 index 0000000..558d541 --- /dev/null +++ b/strategies/attestationdata/first/attestationdata_test.go @@ -0,0 +1,103 @@ +// Copyright © 2020 Attestant Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package first_test + +import ( + "context" + "testing" + "time" + + eth2client "github.com/attestantio/go-eth2-client" + "github.com/attestantio/vouch/mock" + "github.com/attestantio/vouch/strategies/attestationdata/first" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func TestAttestationData(t *testing.T) { + tests := []struct { + name string + params []first.Parameter + slot uint64 + committeeIndex uint64 + err string + }{ + { + name: "Good", + params: []first.Parameter{ + first.WithLogLevel(zerolog.Disabled), + first.WithAttestationDataProviders(map[string]eth2client.AttestationDataProvider{ + "good": mock.NewAttestationDataProvider(), + }), + }, + slot: 12345, + committeeIndex: 3, + }, + { + name: "Timeout", + params: []first.Parameter{ + first.WithLogLevel(zerolog.Disabled), + first.WithTimeout(time.Second), + first.WithAttestationDataProviders(map[string]eth2client.AttestationDataProvider{ + "sleepy": mock.NewSleepyAttestationDataProvider(5*time.Second, mock.NewAttestationDataProvider()), + }), + }, + slot: 12345, + committeeIndex: 3, + err: "failed to obtain attestation data before timeout", + }, + { + name: "NilResponse", + params: []first.Parameter{ + first.WithLogLevel(zerolog.Disabled), + first.WithTimeout(time.Second), + first.WithAttestationDataProviders(map[string]eth2client.AttestationDataProvider{ + "nil": mock.NewNilAttestationDataProvider(), + }), + }, + slot: 12345, + committeeIndex: 3, + // Nil response is invalid, so expect a timeout. + err: "failed to obtain attestation data before timeout", + }, + { + name: "GoodMixed", + params: []first.Parameter{ + first.WithLogLevel(zerolog.Disabled), + first.WithTimeout(2 * time.Second), + first.WithAttestationDataProviders(map[string]eth2client.AttestationDataProvider{ + "error": mock.NewErroringAttestationDataProvider(), + "sleepy": mock.NewSleepyAttestationDataProvider(time.Second, mock.NewAttestationDataProvider()), + }), + }, + slot: 12345, + committeeIndex: 3, + }, + } + + for _, test := range tests { + s, err := first.New(context.Background(), test.params...) + require.NoError(t, err) + + t.Run(test.name, func(t *testing.T) { + attestationData, err := s.AttestationData(context.Background(), test.slot, test.committeeIndex) + if test.err != "" { + require.EqualError(t, err, test.err) + } else { + require.NoError(t, err) + require.NotNil(t, attestationData) + } + }) + } +} diff --git a/strategies/attestationdata/first/service.go b/strategies/attestationdata/first/service.go index 87e84d9..eff2c7e 100644 --- a/strategies/attestationdata/first/service.go +++ b/strategies/attestationdata/first/service.go @@ -18,7 +18,6 @@ import ( "time" eth2client "github.com/attestantio/go-eth2-client" - spec "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/attestantio/vouch/services/metrics" "github.com/pkg/errors" "github.com/rs/zerolog" @@ -56,42 +55,3 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) { return s, nil } - -// AttestationData provides the first attestation data from a number of beacon nodes. -func (s *Service) AttestationData(ctx context.Context, slot uint64, committeeIndex uint64) (*spec.AttestationData, error) { - // We create a cancelable context with a timeout. As soon as the first provider has responded we - // cancel the context to cancel the other requests. - ctx, cancel := context.WithTimeout(ctx, s.timeout) - - attestationDataCh := make(chan *spec.AttestationData, 1) - for name, provider := range s.attestationDataProviders { - go func(ctx context.Context, name string, provider eth2client.AttestationDataProvider, ch chan *spec.AttestationData) { - log := log.With().Str("provider", name).Uint64("slot", slot).Logger() - - started := time.Now() - attestationData, err := provider.AttestationData(ctx, slot, committeeIndex) - s.clientMonitor.ClientOperation(name, "attestation data", err == nil, time.Since(started)) - if err != nil { - log.Warn().Dur("elapsed", time.Since(started)).Err(err).Msg("Failed to obtain attestation data") - return - } - if attestationData == nil { - log.Warn().Dur("elapsed", time.Since(started)).Err(err).Msg("Returned empty attestation data") - return - } - log.Trace().Dur("elapsed", time.Since(started)).Msg("Obtained attestation data") - - ch <- attestationData - }(ctx, name, provider, attestationDataCh) - } - - select { - case <-ctx.Done(): - cancel() - log.Warn().Msg("Failed to obtain attestation data before timeout") - return nil, errors.New("failed to obtain attestation data before timeout") - case attestationData := <-attestationDataCh: - cancel() - return attestationData, nil - } -} diff --git a/strategies/attestationdata/first/service_test.go b/strategies/attestationdata/first/service_test.go index b04af1f..5c15d83 100644 --- a/strategies/attestationdata/first/service_test.go +++ b/strategies/attestationdata/first/service_test.go @@ -103,80 +103,3 @@ func TestInterfaces(t *testing.T) { require.NoError(t, err) require.Implements(t, (*eth2client.AttestationDataProvider)(nil), s) } - -func TestAttestationData(t *testing.T) { - tests := []struct { - name string - params []first.Parameter - slot uint64 - committeeIndex uint64 - err string - }{ - { - name: "Good", - params: []first.Parameter{ - first.WithLogLevel(zerolog.Disabled), - first.WithAttestationDataProviders(map[string]eth2client.AttestationDataProvider{ - "good": mock.NewAttestationDataProvider(), - }), - }, - slot: 12345, - committeeIndex: 3, - }, - { - name: "Timeout", - params: []first.Parameter{ - first.WithLogLevel(zerolog.Disabled), - first.WithTimeout(time.Second), - first.WithAttestationDataProviders(map[string]eth2client.AttestationDataProvider{ - "sleepy": mock.NewSleepyAttestationDataProvider(5*time.Second, mock.NewAttestationDataProvider()), - }), - }, - slot: 12345, - committeeIndex: 3, - err: "failed to obtain attestation data before timeout", - }, - { - name: "NilResponse", - params: []first.Parameter{ - first.WithLogLevel(zerolog.Disabled), - first.WithTimeout(time.Second), - first.WithAttestationDataProviders(map[string]eth2client.AttestationDataProvider{ - "nil": mock.NewNilAttestationDataProvider(), - }), - }, - slot: 12345, - committeeIndex: 3, - // Nil response is invalid, so expect a timeout. - err: "failed to obtain attestation data before timeout", - }, - { - name: "GoodMixed", - params: []first.Parameter{ - first.WithLogLevel(zerolog.Disabled), - first.WithTimeout(2 * time.Second), - first.WithAttestationDataProviders(map[string]eth2client.AttestationDataProvider{ - "error": mock.NewErroringAttestationDataProvider(), - "sleepy": mock.NewSleepyAttestationDataProvider(time.Second, mock.NewAttestationDataProvider()), - }), - }, - slot: 12345, - committeeIndex: 3, - }, - } - - for _, test := range tests { - s, err := first.New(context.Background(), test.params...) - require.NoError(t, err) - - t.Run(test.name, func(t *testing.T) { - attestationData, err := s.AttestationData(context.Background(), test.slot, test.committeeIndex) - if test.err != "" { - require.EqualError(t, err, test.err) - } else { - require.NoError(t, err) - require.NotNil(t, attestationData) - } - }) - } -}