From bc356a5e51b65ac708d1471a086dd207f168cb12 Mon Sep 17 00:00:00 2001 From: Leo Date: Mon, 25 Jan 2021 20:12:28 +0100 Subject: [PATCH] bridge: count misses for settled VAAs --- bridge/pkg/processor/cleanup.go | 29 +++++++++++++++++++++++++++++ bridge/pkg/processor/observation.go | 7 ++++++- bridge/pkg/processor/processor.go | 24 ++++++++++++++++++------ 3 files changed, 53 insertions(+), 7 deletions(-) diff --git a/bridge/pkg/processor/cleanup.go b/bridge/pkg/processor/cleanup.go index 4e3c4cf84..1e473933e 100644 --- a/bridge/pkg/processor/cleanup.go +++ b/bridge/pkg/processor/cleanup.go @@ -2,6 +2,7 @@ package processor import ( "context" + "github.com/certusone/wormhole/bridge/pkg/common" "github.com/prometheus/client_golang/prometheus" "time" @@ -34,6 +35,11 @@ var ( Name: "wormhole_aggregation_state_unobserved_total", Help: "Total number of aggregation states expired due to no matching local lockup observations", }) + aggregationStateFulfillment = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "wormhole_aggregation_state_settled_signatures_total", + Help: "Total number of signatures produced by a validator, counted after waiting a fixed amount of time", + }, []string{"addr", "origin", "status"}) ) func init() { @@ -42,6 +48,7 @@ func init() { prometheus.MustRegister(aggregationStateTimeout) prometheus.MustRegister(aggregationStateRetries) prometheus.MustRegister(aggregationStateUnobserved) + prometheus.MustRegister(aggregationStateFulfillment) } // handleCleanup handles periodic retransmissions and cleanup of VAAs @@ -53,6 +60,28 @@ func (p *Processor) handleCleanup(ctx context.Context) { delta := time.Now().Sub(s.firstObserved) switch { + case !s.settled && delta.Seconds() >= 30: + // After 30 seconds, the VAA is considered settled - it's unlikely that more observations will + // arrive, barring special circumstances. This is a better time to count misses than submission, + // because we submit right when we quorum rather than waiting for all observations to arrive. + s.settled = true + p.logger.Info("VAA considered settled", zap.String("digest", hash)) + + // Use either the most recent (in case of a VAA we haven't seen) or stored gs, if available. + var gs *common.GuardianSet + if s.gs != nil { + gs = s.gs + } else { + gs = p.gs + } + + for _, k := range gs.Keys { + if _, ok := s.signatures[k]; ok { + aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "present").Inc() + } else { + aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "missing").Inc() + } + } case s.submitted && delta.Hours() >= 1: // We could delete submitted VAAs right away, but then we'd lose context about additional (late) // observation that come in. Therefore, keep it for a reasonable amount of time. diff --git a/bridge/pkg/processor/observation.go b/bridge/pkg/processor/observation.go index 9b709420d..fdedbdc83 100644 --- a/bridge/pkg/processor/observation.go +++ b/bridge/pkg/processor/observation.go @@ -148,7 +148,6 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs // byzantine, but now we know who we're dealing with. // We can now count events by guardian without worry about cardinality explosions: - // TODO: add source_chain observationsReceivedByGuardianAddressTotal.WithLabelValues(their_addr.Hex()).Inc() // []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging. @@ -166,6 +165,7 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs p.state.vaaSignatures[hash] = &vaaState{ firstObserved: time.Now(), signatures: map[common.Address][]byte{}, + source: "unknown", } } @@ -231,6 +231,7 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs switch t := v.Payload.(type) { case *vaa.BodyTransfer: + p.state.vaaSignatures[hash].source = t.SourceChain.String() // Depending on the target chain, guardians submit VAAs directly to the chain. switch t.TargetChain { @@ -250,10 +251,14 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs zap.Stringer("target_chain", t.TargetChain)) } case *vaa.BodyGuardianSetUpdate: + p.state.vaaSignatures[hash].source = "guardian_set_upgrade" + // A guardian set update is broadcast to every chain that we talk to. p.devnetVAASubmission(ctx, signed, hash) p.terraVAASubmission(ctx, signed, hash) case *vaa.BodyContractUpgrade: + p.state.vaaSignatures[hash].source = "contract_upgrade" + switch t.ChainID { case vaa.ChainIDSolana: // Already submitted to Solana. diff --git a/bridge/pkg/processor/processor.go b/bridge/pkg/processor/processor.go index e60335766..f66906a00 100644 --- a/bridge/pkg/processor/processor.go +++ b/bridge/pkg/processor/processor.go @@ -21,13 +21,25 @@ import ( type ( // vaaState represents the local view of a given VAA vaaState struct { + // First time this digest was seen (possibly even before we saw its lockup). firstObserved time.Time - ourVAA *vaa.VAA - signatures map[ethcommon.Address][]byte - submitted bool - retryCount uint - ourMsg []byte - gs *common.GuardianSet + // Copy of the VAA we constructed when we saw the lockup. + ourVAA *vaa.VAA + // Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging + // to either the old or new guardian set. + signatures map[ethcommon.Address][]byte + // Flag set after reaching quorum and submitting the VAA. + submitted bool + // Flag set by the cleanup service after the settlement timeout has expired and misses were counted. + settled bool + // Human-readable description of the VAA's source, used for metrics. + source string + // Number of times the cleanup service has attempted to retransmit this VAA. + retryCount uint + // Copy of the bytes we submitted (ourVAA, but signed and serialized). Used for retransmissions. + ourMsg []byte + // Copy of the guardian set valid at lockup/injection time. + gs *common.GuardianSet } vaaMap map[string]*vaaState