Merge branch 'multisign'

This commit is contained in:
Jim McDonald 2020-11-15 07:29:19 +00:00
commit 4cb1527b87
No known key found for this signature in database
GPG Key ID: 89CEB61B2AD2A5E7
9 changed files with 602 additions and 92 deletions

4
go.sum
View File

@ -70,6 +70,8 @@ github.com/attestantio/dirk v0.9.1 h1:oof1Xm0uI4a2T9vhQB+f3Wjlngd2rnfsKi8aj1wqNh
github.com/attestantio/dirk v0.9.1/go.mod h1:oWsyIb/OXdx9pvDQqS3hdFBB1eFaYnrNjuvLtVwo69w=
github.com/attestantio/dirk v0.9.3 h1:hJj/X63n7UV+DseKlR8Kjs+zLYtf+2Alqk9A6nI8mUg=
github.com/attestantio/dirk v0.9.3/go.mod h1:EfppeT+VjQXnE9Ti5/vxa6ptZJAN2vMXO6KZojvSOXA=
github.com/attestantio/go-eth2-client v0.6.8 h1:Lsjx5P0pB8ruZBfJUbqy5hpevD4Zt8Z0Lg4V5m2s53E=
github.com/attestantio/go-eth2-client v0.6.8/go.mod h1:lYEayGHzZma9HMUJgyxFIzDWRck8n2IedP7KTkIwe0g=
github.com/attestantio/go-eth2-client v0.6.9/go.mod h1:ODAZ4yS1YYYew/EsgGsVb/siNEoa505CrGsvlVFdkfo=
github.com/attestantio/go-eth2-client v0.6.10 h1:PMNBMLk6xfMEUqhaUnsI0/HZRrstZF18Gt6Dm5GelW4=
github.com/attestantio/go-eth2-client v0.6.10/go.mod h1:ODAZ4yS1YYYew/EsgGsVb/siNEoa505CrGsvlVFdkfo=
@ -629,6 +631,7 @@ github.com/wealdtech/go-eth2-wallet-store-scratch v1.6.0 h1:41H6hnVsI/csBx20UHpI
github.com/wealdtech/go-eth2-wallet-store-scratch v1.6.0/go.mod h1:XtXHbl4OV/XenQsvGmXbh+bVXaGS788oa30DB7kDInA=
github.com/wealdtech/go-eth2-wallet-store-scratch v1.6.1 h1:vv9lR8K76FUSNbzUU25MN4HNhZIBBI1kJBNfHq2WjRY=
github.com/wealdtech/go-eth2-wallet-store-scratch v1.6.1/go.mod h1:qnI6/VRpFyKGV+DhzdC1zmx2sA7mRRanCFlk4RYzoYs=
github.com/wealdtech/go-eth2-wallet-types/v2 v2.5.0 h1:J29mbkSCUMl2xdu8Lg6U+JptFGfmli6xl04DAHtq9aM=
github.com/wealdtech/go-eth2-wallet-types/v2 v2.5.0/go.mod h1:X9kYUH/E5YMqFMZ4xL6MJanABUkJGaH/yPZRT2o+yYA=
github.com/wealdtech/go-eth2-wallet-types/v2 v2.6.0/go.mod h1:X9kYUH/E5YMqFMZ4xL6MJanABUkJGaH/yPZRT2o+yYA=
github.com/wealdtech/go-eth2-wallet-types/v2 v2.8.0 h1:30sYrHQBchcOv+N2yIB2APnqf1RjwAbgXK+IzmXS6cw=
@ -929,6 +932,7 @@ google.golang.org/api v0.24.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0M
google.golang.org/api v0.28.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE=
google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM=
google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc=
google.golang.org/api v0.32.0 h1:Le77IccnTqEa8ryp9wIpX5W3zYm7Gf9LhOp9PHcwFts=
google.golang.org/api v0.32.0/go.mod h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg=
google.golang.org/api v0.33.0 h1:+gL0XvACeMIvpwLZ5rQZzLn5cwOsgg8dIcfJ2SYfBVw=
google.golang.org/api v0.33.0/go.mod h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg=

View File

@ -20,6 +20,8 @@ import (
eth2client "github.com/attestantio/go-eth2-client"
api "github.com/attestantio/go-eth2-client/api/v1"
spec "github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/attestantio/vouch/services/accountmanager"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
e2wtypes "github.com/wealdtech/go-eth2-wallet-types/v2"
)
@ -176,6 +178,58 @@ func (d *ValidatingAccount) SignBeaconAttestation(ctx context.Context,
return signature, nil
}
// SignBeaconAttestations signs multiple beacon attestations.
func (d *ValidatingAccount) SignBeaconAttestations(ctx context.Context,
slot spec.Slot,
accounts []accountmanager.ValidatingAccount,
committeeIndices []spec.CommitteeIndex,
blockRoot spec.Root,
sourceEpoch spec.Epoch,
sourceRoot spec.Root,
targetEpoch spec.Epoch,
targetRoot spec.Root) ([]spec.BLSSignature, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "dirk.SignBeaconAttestations")
defer span.Finish()
signatureDomain, err := d.domainProvider.Domain(ctx,
d.accountManager.beaconAttesterDomainType,
spec.Epoch(slot/d.accountManager.slotsPerEpoch))
if err != nil {
return nil, errors.Wrap(err, "failed to obtain signature domain for beacon attestation")
}
e2Accounts := make([]e2wtypes.Account, len(accounts))
for i := range accounts {
e2Accounts[i] = accounts[i].(*ValidatingAccount).account
}
uintCommitteeIndices := make([]uint64, len(committeeIndices))
for i := range committeeIndices {
uintCommitteeIndices[i] = uint64(committeeIndices[i])
}
sigs, err := d.account.(e2wtypes.AccountProtectingMultiSigner).SignBeaconAttestations(ctx,
uint64(slot),
e2Accounts,
uintCommitteeIndices,
blockRoot[:],
uint64(sourceEpoch),
sourceRoot[:],
uint64(targetEpoch),
targetRoot[:],
signatureDomain[:],
)
if err != nil {
return nil, errors.Wrap(err, "failed to sign beacon attestation")
}
res := make([]spec.BLSSignature, len(sigs))
for i := range sigs {
if sigs[i] != nil {
copy(res[i][:], sigs[i].Marshal())
}
}
return res, nil
}
// SignAggregateAndProof signs an aggregate and proof item.
func (d *ValidatingAccount) SignAggregateAndProof(ctx context.Context, slot spec.Slot, aggregateAndProofRoot spec.Root) (spec.BLSSignature, error) {
// Fetch the domain.

View File

@ -116,6 +116,20 @@ type BeaconAttestationSigner interface {
targetRoot spec.Root) (spec.BLSSignature, error)
}
// BeaconAttestationsSigner provides methods to sign multiple beacon attestations.
type BeaconAttestationsSigner interface {
// SignBeaconAttestation signs multiple beacon attestations.
SignBeaconAttestations(ctx context.Context,
slot spec.Slot,
accounts []ValidatingAccount,
committeeIndices []spec.CommitteeIndex,
blockRoot spec.Root,
sourceEpoch spec.Epoch,
sourceRoot spec.Root,
targetEpoch spec.Epoch,
targetRoot spec.Root) ([]spec.BLSSignature, error)
}
// AggregateAndProofSigner provides methods to sign aggregate and proofs.
type AggregateAndProofSigner interface {
// SignAggregateAndProof signs an aggregate attestation for given slot and root.

View File

@ -23,7 +23,7 @@ import (
// MergeDuties merges attester duties given by an Ethereum 2 client into vouch's per-slot structure.
func MergeDuties(ctx context.Context, attesterDuties []*api.AttesterDuty) ([]*Duty, error) {
duties := make([]*Duty, 0, len(attesterDuties))
duties := make([]*Duty, 0)
if len(attesterDuties) == 0 {
return duties, nil
}
@ -68,6 +68,7 @@ func MergeDuties(ctx context.Context, attesterDuties []*api.AttesterDuty) ([]*Du
committeeLengths[duty.Slot] = make(map[spec.CommitteeIndex]uint64)
committeesAtSlots[duty.Slot] = duty.CommitteesAtSlot
}
committeesAtSlots[duty.Slot] = duty.CommitteesAtSlot
validatorIndices[duty.Slot] = append(validatorIndices[duty.Slot], duty.ValidatorIndex)
committeeIndices[duty.Slot] = append(committeeIndices[duty.Slot], duty.CommitteeIndex)
committeeLengths[duty.Slot][duty.CommitteeIndex] = duty.CommitteeLength

View File

@ -20,7 +20,7 @@ import (
spec "github.com/attestantio/go-eth2-client/spec/phase0"
)
// Duty contains information about a beacon block attester duty.
// Duty contains information about beacon block attester duties for a given slot.
type Duty struct {
slot spec.Slot
committeesAtSlot uint64

View File

@ -14,6 +14,7 @@
package standard
import (
"bytes"
"context"
"fmt"
"sync"
@ -25,11 +26,11 @@ import (
"github.com/attestantio/vouch/services/attester"
"github.com/attestantio/vouch/services/metrics"
"github.com/attestantio/vouch/services/submitter"
"github.com/attestantio/vouch/util"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/rs/zerolog"
zerologger "github.com/rs/zerolog/log"
"golang.org/x/sync/semaphore"
)
// Service is a beacon block attester.
@ -85,11 +86,11 @@ func (s *Service) Attest(ctx context.Context, data interface{}) ([]*spec.Attesta
s.monitor.AttestationCompleted(started, "failed")
return nil, errors.New("passed invalid data structure")
}
log := log.With().Uint64("slot", uint64(duty.Slot())).Logger()
log.Trace().Strs("duties", duty.Tuples()).Msg("Attesting")
attestations := make([]*spec.Attestation, 0, len(duty.ValidatorIndices()))
var attestationsMutex sync.Mutex
uints := make([]uint64, len(duty.ValidatorIndices()))
for i := range duty.ValidatorIndices() {
uints[i] = uint64(duty.ValidatorIndices()[i])
}
log := log.With().Uint64("slot", uint64(duty.Slot())).Uints64("validator_indices", uints).Logger()
// Fetch the attestation data.
attestationData, err := s.attestationDataProvider.AttestationData(ctx, duty.Slot(), duty.CommitteeIndices()[0])
@ -103,6 +104,14 @@ func (s *Service) Attest(ctx context.Context, data interface{}) ([]*spec.Attesta
s.monitor.AttestationCompleted(started, "failed")
return nil, fmt.Errorf("attestation request for slot %d returned data for slot %d", duty.Slot(), attestationData.Slot)
}
if attestationData.Source.Epoch > attestationData.Target.Epoch {
s.monitor.AttestationCompleted(started, "failed")
return nil, fmt.Errorf("attestation request for slot %d returned source epoch %d greater than target epoch %d", duty.Slot(), attestationData.Source.Epoch, attestationData.Target.Epoch)
}
if attestationData.Target.Epoch > spec.Epoch(uint64(duty.Slot())/s.slotsPerEpoch) {
s.monitor.AttestationCompleted(started, "failed")
return nil, fmt.Errorf("attestation request for slot %d returned target epoch %d greater than current epoch %d", duty.Slot(), attestationData.Target.Epoch, spec.Epoch(uint64(duty.Slot())/s.slotsPerEpoch))
}
// Fetch the validating accounts.
accounts, err := s.validatingAccountsProvider.AccountsByIndex(ctx, duty.ValidatorIndices())
@ -110,107 +119,246 @@ func (s *Service) Attest(ctx context.Context, data interface{}) ([]*spec.Attesta
s.monitor.AttestationCompleted(started, "failed")
return nil, errors.New("failed to obtain attesting validator accounts")
}
log.Trace().Dur("elapsed", time.Since(started)).Strs("tuples", duty.Tuples()).Msg("Obtained validating accounts")
log.Trace().Dur("elapsed", time.Since(started)).Msg("Obtained validating accounts")
// Run the attestations in parallel, up to a concurrency limit.
// Set the per-validator information.
validatorIndexToArrayIndexMap := make(map[spec.ValidatorIndex]int)
for i := range duty.ValidatorIndices() {
validatorIndexToArrayIndexMap[duty.ValidatorIndices()[i]] = i
}
sem := semaphore.NewWeighted(s.processConcurrency)
var wg sync.WaitGroup
for _, account := range accounts {
wg.Add(1)
go func(sem *semaphore.Weighted, wg *sync.WaitGroup, account accountmanager.ValidatingAccount, attestations *[]*spec.Attestation, attestationsMutex *sync.Mutex) {
defer wg.Done()
if err := sem.Acquire(ctx, 1); err != nil {
log.Error().Err(err).Msg("Failed to acquire semaphore")
return
}
defer sem.Release(1)
validatorIndex, err := account.Index(ctx)
if err != nil {
log.Warn().Err(err).Msg("Failed to obtain validator index")
return
}
log := log.With().Uint64("validator_index", uint64(validatorIndex)).Logger()
attestation, err := s.attest(ctx,
duty.Slot(),
duty.CommitteeIndices()[validatorIndexToArrayIndexMap[validatorIndex]],
duty.ValidatorCommitteeIndices()[validatorIndexToArrayIndexMap[validatorIndex]],
duty.CommitteeSize(duty.CommitteeIndices()[validatorIndexToArrayIndexMap[validatorIndex]]),
account,
attestationData,
)
if err != nil {
log.Warn().Err(err).Msg("Failed to attest")
s.monitor.AttestationCompleted(started, "failed")
return
}
log.Trace().Dur("elapsed", time.Since(started)).Msg("Attested")
s.monitor.AttestationCompleted(started, "succeeded")
attestationsMutex.Lock()
*attestations = append(*attestations, attestation)
attestationsMutex.Unlock()
}(sem, &wg, account, &attestations, &attestationsMutex)
committeeIndices := make([]spec.CommitteeIndex, len(accounts))
validatorCommitteeIndices := make([]spec.ValidatorIndex, len(accounts))
committeeSizes := make([]uint64, len(accounts))
for i := range accounts {
validatorIndex, err := accounts[i].Index(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to obtain validator index")
}
committeeIndices[i] = duty.CommitteeIndices()[validatorIndexToArrayIndexMap[validatorIndex]]
validatorCommitteeIndices[i] = spec.ValidatorIndex(duty.ValidatorCommitteeIndices()[validatorIndexToArrayIndexMap[validatorIndex]])
committeeSizes[i] = duty.CommitteeSize(committeeIndices[i])
}
attestations, err := s.attest(ctx,
duty.Slot(),
duty,
accounts,
committeeIndices,
validatorCommitteeIndices,
committeeSizes,
attestationData,
started,
)
if err != nil {
log.Error().Err(err).Msg("Failed to attest")
}
wg.Wait()
return attestations, nil
}
// // Attest carries out attestations for a slot.
// // It returns a map of attestations made, keyed on the validator index.
// func (s *Service) Attest(ctx context.Context, data interface{}) ([]*spec.Attestation, error) {
// started := time.Now()
//
// duty, ok := data.(*attester.Duty)
// if !ok {
// s.monitor.AttestationCompleted(started, "failed")
// return nil, errors.New("passed invalid data structure")
// }
// log := log.With().Uint64("slot", uint64(duty.Slot())).Logger()
// log.Trace().Strs("duties", duty.Tuples()).Msg("Attesting")
//
// // Fetch the attestation data.
// attestationData, err := s.attestationDataProvider.AttestationData(ctx, duty.Slot(), duty.CommitteeIndices()[0])
// if err != nil {
// s.monitor.AttestationCompleted(started, "failed")
// return nil, errors.Wrap(err, "failed to obtain attestation data")
// }
// log.Trace().Dur("elapsed", time.Since(started)).Msg("Obtained attestation data")
//
// if attestationData.Slot != duty.Slot() {
// s.monitor.AttestationCompleted(started, "failed")
// return nil, fmt.Errorf("attestation request for slot %d returned data for slot %d", duty.Slot(), attestationData.Slot)
// }
// if attestationData.Source.Epoch > attestationData.Target.Epoch {
// s.monitor.AttestationCompleted(started, "failed")
// return nil, fmt.Errorf("attestation request for slot %d returned source epoch %d greater than target epoch %d", duty.Slot(), attestationData.Source.Epoch, attestationData.Target.Epoch)
// }
// if attestationData.Target.Epoch > duty.Slot()/s.slotsPerEpoch {
// s.monitor.AttestationCompleted(started, "failed")
// return nil, fmt.Errorf("attestation request for slot %d returned target epoch %d greater than current epoch %d", duty.Slot(), attestationData.Target.Epoch, duty.Slot()/s.slotsPerEpoch)
// }
//
// // Fetch the validating accounts.
// accounts, err := s.validatingAccountsProvider.AccountsByIndex(ctx, duty.ValidatorIndices())
// if err != nil {
// s.monitor.AttestationCompleted(started, "failed")
// return nil, errors.New("failed to obtain attesting validator accounts")
// }
// log.Trace().Dur("elapsed", time.Since(started)).Strs("tuples", duty.Tuples()).Msg("Obtained validating accounts")
//
// // Run the attestations in parallel, up to a concurrency limit.
// validatorIndexToArrayIndexMap := make(map[spec.ValidatorIndex]int)
// for i := range duty.ValidatorIndices() {
// validatorIndexToArrayIndexMap[duty.ValidatorIndices()[i]] = i
// }
// sem := semaphore.NewWeighted(s.processConcurrency)
// var wg sync.WaitGroup
// for _, account := range accounts {
// wg.Add(1)
// go func(sem *semaphore.Weighted, wg *sync.WaitGroup, account accountmanager.ValidatingAccount, attestations *[]*spec.Attestation, attestationsMutex *sync.Mutex) {
// // TODO update to common code format.
// defer wg.Done()
// if err := sem.Acquire(ctx, 1); err != nil {
// log.Error().Err(err).Msg("Failed to acquire semaphore")
// return
// }
// defer sem.Release(1)
//
// validatorIndex, err := account.Index(ctx)
// if err != nil {
// log.Warn().Err(err).Msg("Failed to obtain validator index")
// return
// }
// log := log.With().Uint64("validator_index", uint64(validatorIndex)).Logger()
// attestation, err := s.attest(ctx,
// duty.Slot(),
// duty.CommitteeIndices()[validatorIndexToArrayIndexMap[validatorIndex]],
// duty.ValidatorCommitteeIndices()[validatorIndexToArrayIndexMap[validatorIndex]],
// duty.CommitteeSize(duty.CommitteeIndices()[validatorIndexToArrayIndexMap[validatorIndex]]),
// account,
// attestationData,
// )
// if err != nil {
// log.Warn().Err(err).Msg("Failed to attest")
// s.monitor.AttestationCompleted(started, "failed")
// return
// }
// log.Trace().Dur("elapsed", time.Since(started)).Msg("Attested")
// s.monitor.AttestationCompleted(started, "succeeded")
// attestationsMutex.Lock()
// *attestations = append(*attestations, attestation)
// attestationsMutex.Unlock()
// // // Set the per-validator information.
// // validatorIndexToArrayIndexMap := make(map[uint64]int)
// // for i := range duty.ValidatorIndices() {
// // validatorIndexToArrayIndexMap[duty.ValidatorIndices()[i]] = i
// // }
// // committeeIndices := make([]uint64, len(accounts))
// // validatorCommitteeIndices := make([]uint64, len(accounts))
// // committeeSizes := make([]uint64, len(accounts))
// // for i := range accounts {
// // validatorIndex, err := accounts[i].Index(ctx)
// // if err != nil {
// // return nil, errors.Wrap(err, "failed to obtain validator index")
// // }
// // committeeIndices[i] = duty.CommitteeIndices()[validatorIndexToArrayIndexMap[validatorIndex]]
// // validatorCommitteeIndices[i] = duty.ValidatorCommitteeIndices()[validatorIndexToArrayIndexMap[validatorIndex]]
// // committeeSizes[i] = duty.CommitteeSize(committeeIndices[i])
// // }
//
// attestations, err := s.attest(ctx,
// slot,
// duty,
// accounts,
// committeeIndices,
// validatorCommitteeIndices,
// committeeSizes,
// attestationData,
// started,
// )
// if err != nil {
// log.Error().Err(err).Msg("Failed to attest")
// }
// }
//
// return attestations, nil
// }
func (s *Service) attest(
ctx context.Context,
slot spec.Slot,
committeeIndex spec.CommitteeIndex,
validatorCommitteeIndex uint64,
committeeSize uint64,
account accountmanager.ValidatingAccount,
attestationData *spec.AttestationData,
) (*spec.Attestation, error) {
// Sign the attestation.
signer, isSigner := account.(accountmanager.BeaconAttestationSigner)
duty *attester.Duty,
accounts []accountmanager.ValidatingAccount,
committeeIndices []spec.CommitteeIndex,
validatorCommitteeIndices []spec.ValidatorIndex,
committeeSizes []uint64,
data *spec.AttestationData,
started time.Time,
) ([]*spec.Attestation, error) {
// Multisign the attestation for all validating accounts.
signer, isSigner := accounts[0].(accountmanager.BeaconAttestationsSigner)
if !isSigner {
return nil, errors.New("account is not a beacon attestation signer")
return nil, errors.New("account is not a beacon attestations signer")
}
sig, err := signer.SignBeaconAttestation(ctx,
slot,
committeeIndex,
attestationData.BeaconBlockRoot,
attestationData.Source.Epoch,
attestationData.Source.Root,
attestationData.Target.Epoch,
attestationData.Target.Root)
uintCommitteeIndices := make([]uint64, len(committeeIndices))
for i := range committeeIndices {
uintCommitteeIndices[i] = uint64(committeeIndices[i])
}
sigs, err := signer.SignBeaconAttestations(ctx,
duty.Slot(),
accounts,
committeeIndices,
data.BeaconBlockRoot,
data.Source.Epoch,
data.Source.Root,
data.Target.Epoch,
data.Target.Root,
)
if err != nil {
return nil, errors.Wrap(err, "failed to sign beacon attestation")
return nil, errors.Wrap(err, "failed to sign beacon attestations")
}
log.Trace().Msg("Signed")
log.Trace().Dur("elapsed", time.Since(started)).Msg("Signed")
// Submit the attestation.
aggregationBits := bitfield.NewBitlist(committeeSize)
aggregationBits.SetBitAt(validatorCommitteeIndex, true)
attestation := &spec.Attestation{
AggregationBits: aggregationBits,
Data: &spec.AttestationData{
Slot: slot,
Index: committeeIndex,
BeaconBlockRoot: attestationData.BeaconBlockRoot,
Source: &spec.Checkpoint{
Epoch: attestationData.Source.Epoch,
Root: attestationData.Source.Root,
},
Target: &spec.Checkpoint{
Epoch: attestationData.Target.Epoch,
Root: attestationData.Target.Root,
},
},
Signature: sig,
// Submit the attestations.
zeroSig := spec.BLSSignature{}
attestations := make([]*spec.Attestation, len(sigs))
_, err = util.Scatter(len(sigs), func(offset int, entries int, _ *sync.RWMutex) (interface{}, error) {
for i := offset; i < offset+entries; i++ {
validatorIndex, err := accounts[i].Index(ctx)
if err != nil {
log.Warn().Err(err).Msg("failed to obtain validator index")
continue
}
log := log.With().Uint64("slot", uint64(duty.Slot())).Uint64("validator_index", uint64(validatorIndex)).Logger()
if bytes.Equal(sigs[i][:], zeroSig[:]) {
log.Warn().Msg("No signature for validator; not creating attestation")
continue
}
aggregationBits := bitfield.NewBitlist(committeeSizes[i])
aggregationBits.SetBitAt(uint64(validatorCommitteeIndices[i]), true)
attestation := &spec.Attestation{
AggregationBits: aggregationBits,
Data: &spec.AttestationData{
Slot: duty.Slot(),
Index: committeeIndices[i],
BeaconBlockRoot: data.BeaconBlockRoot,
Source: &spec.Checkpoint{
Epoch: data.Source.Epoch,
Root: data.Source.Root,
},
Target: &spec.Checkpoint{
Epoch: data.Target.Epoch,
Root: data.Target.Root,
},
},
}
copy(attestation.Signature[:], sigs[i][:])
if err := s.attestationSubmitter.SubmitAttestation(ctx, attestation); err != nil {
log.Warn().Err(err).Msg("Failed to submit attestation")
continue
}
attestations[i] = attestation
s.monitor.AttestationCompleted(started, "succeeded")
}
return nil, nil
})
if err != nil {
log.Error().Err(err).Msg("Failed to scatter submit")
}
if err := s.attestationSubmitter.SubmitAttestation(ctx, attestation); err != nil {
return nil, errors.Wrap(err, "failed to submit attestation")
}
return attestation, nil
log.Trace().Dur("elapsed", time.Since(started)).Msg("Submitted")
return attestations, nil
}

97
util/scatter.go Normal file
View File

@ -0,0 +1,97 @@
// Copyright © 2020 Attestant Limited.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"errors"
"runtime"
"sync"
)
// ScatterResult is the result of a single scatter worker.
type ScatterResult struct {
// Offset is the offset at which the worker started.
Offset int
// Extent is the user-defined result of running the scatter function.
Extent interface{}
}
// Scatter scatters a computation across multiple goroutines, returning a set of per-worker results
func Scatter(inputLen int, work func(int, int, *sync.RWMutex) (interface{}, error)) ([]*ScatterResult, error) {
if inputLen <= 0 {
return nil, errors.New("no data with which to work")
}
extentSize := calculateExtentSize(inputLen)
workers := inputLen / extentSize
if inputLen%extentSize != 0 {
workers++
}
resultCh := make(chan *ScatterResult, workers)
defer close(resultCh)
errorCh := make(chan error, workers)
defer close(errorCh)
mutex := new(sync.RWMutex)
for worker := 0; worker < workers; worker++ {
offset := worker * extentSize
entries := extentSize
if offset+entries > inputLen {
entries = inputLen - offset
}
go func(offset int, entries int) {
extent, err := work(offset, entries, mutex)
if err != nil {
errorCh <- err
} else {
resultCh <- &ScatterResult{
Offset: offset,
Extent: extent,
}
}
}(offset, entries)
}
// Collect results from workers
results := make([]*ScatterResult, workers)
var err error
for i := 0; i < workers; i++ {
select {
case result := <-resultCh:
results[i] = result
case err = <-errorCh:
// Error occurred; don't return because that closes the channels
// and can cause other workers to write to the closed channel.
}
}
return results, err
}
// calculateExtentSize calculates the extent size given the number of items and maximum processors available.
func calculateExtentSize(items int) int {
// Start with an even split.
extentSize := items / runtime.GOMAXPROCS(0)
if extentSize == 0 {
// We must have an extent size of at least 1.
return 1
}
if items%extentSize > 0 {
// We have a remainder; add one to the extent size to ensure we capture it.
extentSize++
}
return extentSize
}

View File

@ -0,0 +1,76 @@
// Copyright © 2020 Attestant Limited.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util_test
import (
"crypto/rand"
"crypto/sha256"
"sync"
"testing"
"github.com/attestantio/dirk/util"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
)
var input [][]byte
const (
benchmarkElements = 65536
benchmarkElementSize = 32
benchmarkHashRuns = 128
)
func init() {
input = make([][]byte, benchmarkElements)
for i := 0; i < benchmarkElements; i++ {
input[i] = make([]byte, benchmarkElementSize)
_, err := rand.Read(input[i])
if err != nil {
log.WithError(err).Debug("Cannot read from rand")
}
}
}
// hash is a simple worker function that carries out repeated hashging of its input to provide an output.
func hash(input [][]byte) [][]byte {
output := make([][]byte, len(input))
for i := range input {
copy(output, input)
for j := 0; j < benchmarkHashRuns; j++ {
hash := sha256.Sum256(output[i])
output[i] = hash[:]
}
}
return output
}
func BenchmarkHash(b *testing.B) {
for i := 0; i < b.N; i++ {
hash(input)
}
}
func BenchmarkHashMP(b *testing.B) {
output := make([][]byte, len(input))
for i := 0; i < b.N; i++ {
workerResults, err := util.Scatter(len(input), func(offset int, entries int, _ *sync.RWMutex) (interface{}, error) {
return hash(input[offset : offset+entries]), nil
})
require.NoError(b, err)
for _, result := range workerResults {
copy(output[result.Offset:], result.Extent.([][]byte))
}
}
}

116
util/scatter_test.go Normal file
View File

@ -0,0 +1,116 @@
// Copyright © 2020 Attestant Limited.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util_test
import (
"errors"
"sync"
"testing"
"github.com/attestantio/dirk/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDouble(t *testing.T) {
tests := []struct {
name string
inValues int
err string
}{
{
name: "0",
inValues: 0,
err: "no data with which to work",
},
{
name: "1",
inValues: 1,
},
{
name: "1023",
inValues: 1023,
},
{
name: "1024",
inValues: 1024,
},
{
name: "1025",
inValues: 1025,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
inValues := make([]int, test.inValues)
for i := 0; i < test.inValues; i++ {
inValues[i] = i
}
outValues := make([]int, test.inValues)
workerResults, err := util.Scatter(len(inValues), func(offset int, entries int, _ *sync.RWMutex) (interface{}, error) {
extent := make([]int, entries)
for i := 0; i < entries; i++ {
extent[i] = inValues[offset+i] * 2
}
return extent, nil
})
if test.err != "" {
assert.Equal(t, test.err, err.Error())
} else {
require.NoError(t, err)
for _, result := range workerResults {
copy(outValues[result.Offset:], result.Extent.([]int))
}
for i := 0; i < test.inValues; i++ {
require.Equal(t, inValues[i]*2, outValues[i], "Outvalue at %d incorrect", i)
}
}
})
}
}
func TestMutex(t *testing.T) {
totalRuns := 1048576
val := 0
_, err := util.Scatter(totalRuns, func(offset int, entries int, mu *sync.RWMutex) (interface{}, error) {
for i := 0; i < entries; i++ {
mu.Lock()
val++
mu.Unlock()
}
return nil, nil
})
require.NoError(t, err)
require.Equal(t, totalRuns, val)
}
func TestError(t *testing.T) {
totalRuns := 1024
val := 0
_, err := util.Scatter(totalRuns, func(offset int, entries int, mu *sync.RWMutex) (interface{}, error) {
for i := 0; i < entries; i++ {
mu.Lock()
val++
if val == 1011 {
mu.Unlock()
return nil, errors.New("bad number")
}
mu.Unlock()
}
return nil, nil
})
require.EqualError(t, err, "bad number")
}