From 0fdc03eb4d500532f78e73020f84fa11fb839d10 Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Fri, 14 Jul 2023 08:34:24 -0500 Subject: [PATCH] Node: Reduce auto reobservation requests (#3203) --- node/pkg/processor/cleanup.go | 77 ++++++++++++++++++++++++++++------- 1 file changed, 63 insertions(+), 14 deletions(-) diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index f4f3e12d3..c2ca2314f 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -3,6 +3,8 @@ package processor import ( "context" + "encoding/hex" + "fmt" "time" "github.com/certusone/wormhole/node/pkg/common" @@ -160,21 +162,32 @@ func (p *Processor) handleCleanup(ctx context.Context) { aggregationStateTimeout.Inc() break } - p.logger.Info("resubmitting observation", - zap.String("digest", hash), - zap.Duration("delta", delta), - zap.String("firstObserved", s.firstObserved.String()), - ) - req := &gossipv1.ObservationRequest{ - ChainId: uint32(s.ourObservation.GetEmitterChain()), - TxHash: s.txHash, + + // If we have already stored this VAA, there is no reason for us to request reobservation. + alreadyInDB, err := p.signedVaaAlreadyInDB(hash, s) + if err != nil { + p.logger.Error("failed to check if observation is already in DB, requesting reobservation", zap.String("hash", hash), zap.Error(err)) } - if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil { - p.logger.Warn("failed to broadcast re-observation request", zap.Error(err)) + + if alreadyInDB { + p.logger.Debug("observation already in DB, not requesting reobservation", zap.String("digest", hash)) + } else { + p.logger.Info("resubmitting observation", + zap.String("digest", hash), + zap.Duration("delta", delta), + zap.String("firstObserved", s.firstObserved.String()), + ) + req := &gossipv1.ObservationRequest{ + ChainId: uint32(s.ourObservation.GetEmitterChain()), + TxHash: s.txHash, + } + if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil { + p.logger.Warn("failed to broadcast re-observation request", zap.Error(err)) + } + p.gossipSendC <- s.ourMsg + s.lastRetry = time.Now() + aggregationStateRetries.Inc() } - p.gossipSendC <- s.ourMsg - s.lastRetry = time.Now() - aggregationStateRetries.Inc() } else { // For nil state entries, we log the quorum to determine whether the // network reached consensus without us. We don't know the correct guardian @@ -182,7 +195,7 @@ func (p *Processor) handleCleanup(ctx context.Context) { hasSigs := len(s.signatures) wantSigs := vaa.CalculateQuorum(len(p.gs.Keys)) - p.logger.Info("expiring unsubmitted nil observation", + p.logger.Debug("expiring unsubmitted nil observation", zap.String("digest", hash), zap.Duration("delta", delta), zap.Int("have_sigs", hasSigs), @@ -203,3 +216,39 @@ func (p *Processor) handleCleanup(ctx context.Context) { } } } + +// signedVaaAlreadyInDB checks if the VAA is already in the DB. If it is, it makes sure the hash matches. +func (p *Processor) signedVaaAlreadyInDB(hash string, s *state) (bool, error) { + if s.ourObservation == nil { + p.logger.Debug("unable to check if VAA is already in DB, no observation", zap.String("digest", hash)) + return false, nil + } + + vaaID, err := db.VaaIDFromString(s.ourObservation.MessageID()) + if err != nil { + return false, fmt.Errorf(`failed to generate VAA ID from message id "%s": %w`, s.ourObservation.MessageID(), err) + } + + vb, err := p.db.GetSignedVAABytes(*vaaID) + if err != nil { + if err == db.ErrVAANotFound { + p.logger.Debug("VAA not in DB", zap.String("digest", hash), zap.String("message_id", s.ourObservation.MessageID())) + return false, nil + } else { + return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err) + } + } + + v, err := vaa.Unmarshal(vb) + if err != nil { + return false, fmt.Errorf("failed to unmarshal VAA: %w", err) + } + + oldHash := hex.EncodeToString(v.SigningDigest().Bytes()) + if hash != oldHash { + p.logger.Debug("VAA already in DB but hash is different", zap.String("old_hash", oldHash), zap.String("new_hash", hash)) + return false, fmt.Errorf("hash mismatch in_db: %s, new: %s", oldHash, hash) + } + + return true, nil +}