From 83c589328343d00c02946f9ec7c27103a61488bf Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Tue, 11 Jul 2023 10:59:15 -0500 Subject: [PATCH] Node: Fix reobservation limits (#3177) --- node/pkg/processor/cleanup.go | 14 ++++++++------ node/pkg/processor/processor.go | 2 -- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index c811a396e..f4f3e12d3 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -54,8 +54,10 @@ var ( ) const ( - settlementTime = time.Second * 30 - retryTime = time.Minute * 5 + settlementTime = time.Second * 30 + retryTime = time.Minute * 5 + retryLimitOurs = time.Hour * 24 + retryLimitNotOurs = time.Hour ) // handleCleanup handles periodic retransmissions and cleanup of observations @@ -138,9 +140,9 @@ func (p *Processor) handleCleanup(ctx context.Context) { p.logger.Debug("expiring submitted observation", zap.String("digest", hash), zap.Duration("delta", delta)) delete(p.state.signatures, hash) aggregationStateExpiration.Inc() - case !s.submitted && ((s.ourMsg != nil && s.retryCount >= 14400 /* 120 hours */) || (s.ourMsg == nil && s.retryCount >= 10 /* 5 minutes */)): + case !s.submitted && ((s.ourMsg != nil && delta > retryLimitOurs) || (s.ourMsg == nil && delta > retryLimitNotOurs)): // Clearly, this horse is dead and continued beatings won't bring it closer to quorum. - p.logger.Info("expiring unsubmitted observation after exhausting retries", zap.String("digest", hash), zap.Duration("delta", delta)) + p.logger.Info("expiring unsubmitted observation after exhausting retries", zap.String("digest", hash), zap.Duration("delta", delta), zap.Bool("weObserved", s.ourMsg != nil)) delete(p.state.signatures, hash) aggregationStateTimeout.Inc() case !s.submitted && delta.Minutes() >= 5 && time.Since(s.lastRetry) >= retryTime: @@ -161,7 +163,8 @@ func (p *Processor) handleCleanup(ctx context.Context) { p.logger.Info("resubmitting observation", zap.String("digest", hash), zap.Duration("delta", delta), - zap.Uint("retry", s.retryCount)) + zap.String("firstObserved", s.firstObserved.String()), + ) req := &gossipv1.ObservationRequest{ ChainId: uint32(s.ourObservation.GetEmitterChain()), TxHash: s.txHash, @@ -170,7 +173,6 @@ func (p *Processor) handleCleanup(ctx context.Context) { p.logger.Warn("failed to broadcast re-observation request", zap.Error(err)) } p.gossipSendC <- s.ourMsg - s.retryCount += 1 s.lastRetry = time.Now() aggregationStateRetries.Inc() } else { diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 18de99465..25010cbfb 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -55,8 +55,6 @@ type ( settled bool // Human-readable description of the VAA's source, used for metrics. source string - // Number of times the cleanup service has attempted to retransmit this VAA. - retryCount uint // Copy of the bytes we submitted (ourObservation, but signed and serialized). Used for retransmissions. ourMsg []byte // The hash of the transaction in which the observation was made. Used for re-observation requests.