Node: Reduce auto reobservation requests (#3203)
This commit is contained in:
parent
ce66e631c2
commit
0fdc03eb4d
|
@ -3,6 +3,8 @@ package processor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/certusone/wormhole/node/pkg/common"
|
"github.com/certusone/wormhole/node/pkg/common"
|
||||||
|
@ -160,21 +162,32 @@ func (p *Processor) handleCleanup(ctx context.Context) {
|
||||||
aggregationStateTimeout.Inc()
|
aggregationStateTimeout.Inc()
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
p.logger.Info("resubmitting observation",
|
|
||||||
zap.String("digest", hash),
|
// If we have already stored this VAA, there is no reason for us to request reobservation.
|
||||||
zap.Duration("delta", delta),
|
alreadyInDB, err := p.signedVaaAlreadyInDB(hash, s)
|
||||||
zap.String("firstObserved", s.firstObserved.String()),
|
if err != nil {
|
||||||
)
|
p.logger.Error("failed to check if observation is already in DB, requesting reobservation", zap.String("hash", hash), zap.Error(err))
|
||||||
req := &gossipv1.ObservationRequest{
|
|
||||||
ChainId: uint32(s.ourObservation.GetEmitterChain()),
|
|
||||||
TxHash: s.txHash,
|
|
||||||
}
|
}
|
||||||
if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil {
|
|
||||||
p.logger.Warn("failed to broadcast re-observation request", zap.Error(err))
|
if alreadyInDB {
|
||||||
|
p.logger.Debug("observation already in DB, not requesting reobservation", zap.String("digest", hash))
|
||||||
|
} else {
|
||||||
|
p.logger.Info("resubmitting observation",
|
||||||
|
zap.String("digest", hash),
|
||||||
|
zap.Duration("delta", delta),
|
||||||
|
zap.String("firstObserved", s.firstObserved.String()),
|
||||||
|
)
|
||||||
|
req := &gossipv1.ObservationRequest{
|
||||||
|
ChainId: uint32(s.ourObservation.GetEmitterChain()),
|
||||||
|
TxHash: s.txHash,
|
||||||
|
}
|
||||||
|
if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil {
|
||||||
|
p.logger.Warn("failed to broadcast re-observation request", zap.Error(err))
|
||||||
|
}
|
||||||
|
p.gossipSendC <- s.ourMsg
|
||||||
|
s.lastRetry = time.Now()
|
||||||
|
aggregationStateRetries.Inc()
|
||||||
}
|
}
|
||||||
p.gossipSendC <- s.ourMsg
|
|
||||||
s.lastRetry = time.Now()
|
|
||||||
aggregationStateRetries.Inc()
|
|
||||||
} else {
|
} else {
|
||||||
// For nil state entries, we log the quorum to determine whether the
|
// For nil state entries, we log the quorum to determine whether the
|
||||||
// network reached consensus without us. We don't know the correct guardian
|
// network reached consensus without us. We don't know the correct guardian
|
||||||
|
@ -182,7 +195,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
|
||||||
hasSigs := len(s.signatures)
|
hasSigs := len(s.signatures)
|
||||||
wantSigs := vaa.CalculateQuorum(len(p.gs.Keys))
|
wantSigs := vaa.CalculateQuorum(len(p.gs.Keys))
|
||||||
|
|
||||||
p.logger.Info("expiring unsubmitted nil observation",
|
p.logger.Debug("expiring unsubmitted nil observation",
|
||||||
zap.String("digest", hash),
|
zap.String("digest", hash),
|
||||||
zap.Duration("delta", delta),
|
zap.Duration("delta", delta),
|
||||||
zap.Int("have_sigs", hasSigs),
|
zap.Int("have_sigs", hasSigs),
|
||||||
|
@ -203,3 +216,39 @@ func (p *Processor) handleCleanup(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// signedVaaAlreadyInDB checks if the VAA is already in the DB. If it is, it makes sure the hash matches.
|
||||||
|
func (p *Processor) signedVaaAlreadyInDB(hash string, s *state) (bool, error) {
|
||||||
|
if s.ourObservation == nil {
|
||||||
|
p.logger.Debug("unable to check if VAA is already in DB, no observation", zap.String("digest", hash))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
vaaID, err := db.VaaIDFromString(s.ourObservation.MessageID())
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf(`failed to generate VAA ID from message id "%s": %w`, s.ourObservation.MessageID(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
vb, err := p.db.GetSignedVAABytes(*vaaID)
|
||||||
|
if err != nil {
|
||||||
|
if err == db.ErrVAANotFound {
|
||||||
|
p.logger.Debug("VAA not in DB", zap.String("digest", hash), zap.String("message_id", s.ourObservation.MessageID()))
|
||||||
|
return false, nil
|
||||||
|
} else {
|
||||||
|
return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
v, err := vaa.Unmarshal(vb)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed to unmarshal VAA: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
oldHash := hex.EncodeToString(v.SigningDigest().Bytes())
|
||||||
|
if hash != oldHash {
|
||||||
|
p.logger.Debug("VAA already in DB but hash is different", zap.String("old_hash", oldHash), zap.String("new_hash", hash))
|
||||||
|
return false, fmt.Errorf("hash mismatch in_db: %s, new: %s", oldHash, hash)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue