bridge: count misses for settled VAAs
This commit is contained in:
parent
c5b59ac6a0
commit
bc356a5e51
|
@ -2,6 +2,7 @@ package processor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/certusone/wormhole/bridge/pkg/common"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -34,6 +35,11 @@ var (
|
||||||
Name: "wormhole_aggregation_state_unobserved_total",
|
Name: "wormhole_aggregation_state_unobserved_total",
|
||||||
Help: "Total number of aggregation states expired due to no matching local lockup observations",
|
Help: "Total number of aggregation states expired due to no matching local lockup observations",
|
||||||
})
|
})
|
||||||
|
aggregationStateFulfillment = prometheus.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "wormhole_aggregation_state_settled_signatures_total",
|
||||||
|
Help: "Total number of signatures produced by a validator, counted after waiting a fixed amount of time",
|
||||||
|
}, []string{"addr", "origin", "status"})
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -42,6 +48,7 @@ func init() {
|
||||||
prometheus.MustRegister(aggregationStateTimeout)
|
prometheus.MustRegister(aggregationStateTimeout)
|
||||||
prometheus.MustRegister(aggregationStateRetries)
|
prometheus.MustRegister(aggregationStateRetries)
|
||||||
prometheus.MustRegister(aggregationStateUnobserved)
|
prometheus.MustRegister(aggregationStateUnobserved)
|
||||||
|
prometheus.MustRegister(aggregationStateFulfillment)
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleCleanup handles periodic retransmissions and cleanup of VAAs
|
// handleCleanup handles periodic retransmissions and cleanup of VAAs
|
||||||
|
@ -53,6 +60,28 @@ func (p *Processor) handleCleanup(ctx context.Context) {
|
||||||
delta := time.Now().Sub(s.firstObserved)
|
delta := time.Now().Sub(s.firstObserved)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
|
case !s.settled && delta.Seconds() >= 30:
|
||||||
|
// After 30 seconds, the VAA is considered settled - it's unlikely that more observations will
|
||||||
|
// arrive, barring special circumstances. This is a better time to count misses than submission,
|
||||||
|
// because we submit right when we quorum rather than waiting for all observations to arrive.
|
||||||
|
s.settled = true
|
||||||
|
p.logger.Info("VAA considered settled", zap.String("digest", hash))
|
||||||
|
|
||||||
|
// Use either the most recent (in case of a VAA we haven't seen) or stored gs, if available.
|
||||||
|
var gs *common.GuardianSet
|
||||||
|
if s.gs != nil {
|
||||||
|
gs = s.gs
|
||||||
|
} else {
|
||||||
|
gs = p.gs
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, k := range gs.Keys {
|
||||||
|
if _, ok := s.signatures[k]; ok {
|
||||||
|
aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "present").Inc()
|
||||||
|
} else {
|
||||||
|
aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "missing").Inc()
|
||||||
|
}
|
||||||
|
}
|
||||||
case s.submitted && delta.Hours() >= 1:
|
case s.submitted && delta.Hours() >= 1:
|
||||||
// We could delete submitted VAAs right away, but then we'd lose context about additional (late)
|
// We could delete submitted VAAs right away, but then we'd lose context about additional (late)
|
||||||
// observation that come in. Therefore, keep it for a reasonable amount of time.
|
// observation that come in. Therefore, keep it for a reasonable amount of time.
|
||||||
|
|
|
@ -148,7 +148,6 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
|
||||||
// byzantine, but now we know who we're dealing with.
|
// byzantine, but now we know who we're dealing with.
|
||||||
|
|
||||||
// We can now count events by guardian without worry about cardinality explosions:
|
// We can now count events by guardian without worry about cardinality explosions:
|
||||||
// TODO: add source_chain
|
|
||||||
observationsReceivedByGuardianAddressTotal.WithLabelValues(their_addr.Hex()).Inc()
|
observationsReceivedByGuardianAddressTotal.WithLabelValues(their_addr.Hex()).Inc()
|
||||||
|
|
||||||
// []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging.
|
// []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging.
|
||||||
|
@ -166,6 +165,7 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
|
||||||
p.state.vaaSignatures[hash] = &vaaState{
|
p.state.vaaSignatures[hash] = &vaaState{
|
||||||
firstObserved: time.Now(),
|
firstObserved: time.Now(),
|
||||||
signatures: map[common.Address][]byte{},
|
signatures: map[common.Address][]byte{},
|
||||||
|
source: "unknown",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,6 +231,7 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
|
||||||
|
|
||||||
switch t := v.Payload.(type) {
|
switch t := v.Payload.(type) {
|
||||||
case *vaa.BodyTransfer:
|
case *vaa.BodyTransfer:
|
||||||
|
p.state.vaaSignatures[hash].source = t.SourceChain.String()
|
||||||
// Depending on the target chain, guardians submit VAAs directly to the chain.
|
// Depending on the target chain, guardians submit VAAs directly to the chain.
|
||||||
|
|
||||||
switch t.TargetChain {
|
switch t.TargetChain {
|
||||||
|
@ -250,10 +251,14 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
|
||||||
zap.Stringer("target_chain", t.TargetChain))
|
zap.Stringer("target_chain", t.TargetChain))
|
||||||
}
|
}
|
||||||
case *vaa.BodyGuardianSetUpdate:
|
case *vaa.BodyGuardianSetUpdate:
|
||||||
|
p.state.vaaSignatures[hash].source = "guardian_set_upgrade"
|
||||||
|
|
||||||
// A guardian set update is broadcast to every chain that we talk to.
|
// A guardian set update is broadcast to every chain that we talk to.
|
||||||
p.devnetVAASubmission(ctx, signed, hash)
|
p.devnetVAASubmission(ctx, signed, hash)
|
||||||
p.terraVAASubmission(ctx, signed, hash)
|
p.terraVAASubmission(ctx, signed, hash)
|
||||||
case *vaa.BodyContractUpgrade:
|
case *vaa.BodyContractUpgrade:
|
||||||
|
p.state.vaaSignatures[hash].source = "contract_upgrade"
|
||||||
|
|
||||||
switch t.ChainID {
|
switch t.ChainID {
|
||||||
case vaa.ChainIDSolana:
|
case vaa.ChainIDSolana:
|
||||||
// Already submitted to Solana.
|
// Already submitted to Solana.
|
||||||
|
|
|
@ -21,13 +21,25 @@ import (
|
||||||
type (
|
type (
|
||||||
// vaaState represents the local view of a given VAA
|
// vaaState represents the local view of a given VAA
|
||||||
vaaState struct {
|
vaaState struct {
|
||||||
|
// First time this digest was seen (possibly even before we saw its lockup).
|
||||||
firstObserved time.Time
|
firstObserved time.Time
|
||||||
ourVAA *vaa.VAA
|
// Copy of the VAA we constructed when we saw the lockup.
|
||||||
signatures map[ethcommon.Address][]byte
|
ourVAA *vaa.VAA
|
||||||
submitted bool
|
// Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging
|
||||||
retryCount uint
|
// to either the old or new guardian set.
|
||||||
ourMsg []byte
|
signatures map[ethcommon.Address][]byte
|
||||||
gs *common.GuardianSet
|
// Flag set after reaching quorum and submitting the VAA.
|
||||||
|
submitted bool
|
||||||
|
// Flag set by the cleanup service after the settlement timeout has expired and misses were counted.
|
||||||
|
settled bool
|
||||||
|
// Human-readable description of the VAA's source, used for metrics.
|
||||||
|
source string
|
||||||
|
// Number of times the cleanup service has attempted to retransmit this VAA.
|
||||||
|
retryCount uint
|
||||||
|
// Copy of the bytes we submitted (ourVAA, but signed and serialized). Used for retransmissions.
|
||||||
|
ourMsg []byte
|
||||||
|
// Copy of the guardian set valid at lockup/injection time.
|
||||||
|
gs *common.GuardianSet
|
||||||
}
|
}
|
||||||
|
|
||||||
vaaMap map[string]*vaaState
|
vaaMap map[string]*vaaState
|
||||||
|
|
Loading…
Reference in New Issue