From dcb720309b7b386918ad025c753177fed40a6eee Mon Sep 17 00:00:00 2001 From: tbjump Date: Sat, 15 Jul 2023 23:30:51 +0000 Subject: [PATCH] node: implement haveSignedVAA --- node/pkg/db/db.go | 14 ++++++++++++++ node/pkg/processor/cleanup.go | 7 +------ node/pkg/processor/observation.go | 18 +++++++----------- node/pkg/processor/processor.go | 28 ++++++++++++++++++++++++++++ 4 files changed, 50 insertions(+), 17 deletions(-) diff --git a/node/pkg/db/db.go b/node/pkg/db/db.go index ce2a51ac1..66810b30b 100644 --- a/node/pkg/db/db.go +++ b/node/pkg/db/db.go @@ -128,6 +128,20 @@ func (d *Database) StoreSignedVAA(v *vaa.VAA) error { return nil } +func (d *Database) HasVAA(id VAAID) (bool, error) { + err := d.db.View(func(txn *badger.Txn) error { + _, err := txn.Get(id.Bytes()) + return err + }) + if err == nil { + return true, nil + } + if err == badger.ErrKeyNotFound { + return false, nil + } + return false, err +} + func (d *Database) GetSignedVAABytes(id VAAID) (b []byte, err error) { if err := d.db.View(func(txn *badger.Txn) error { item, err := txn.Get(id.Bytes()) diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index b3befda17..851c0681f 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -81,7 +81,7 @@ func (p *Processor) handleCleanup(ctx context.Context) { // This occurs when we observed a message after the cluster has already reached // consensus on it, causing us to never achieve quorum. if ourVaa, ok := s.ourObservation.(*VAA); ok { - if _, err := p.getSignedVAA(*db.VaaIDFromVAA(&ourVaa.VAA)); err == nil { + if p.haveSignedVAA(*db.VaaIDFromVAA(&ourVaa.VAA)) { // If we have a stored quorum VAA, we can safely expire the state. // // This is a rare case, and we can safely expire the state, since we @@ -90,11 +90,6 @@ func (p *Processor) handleCleanup(ctx context.Context) { aggregationStateLate.Inc() delete(p.state.signatures, hash) continue - } else if err != db.ErrVAANotFound { - p.logger.Error("failed to look up VAA in database", - zap.String("digest", hash), - zap.Error(err), - ) } } } diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index 9714fd487..9834f4fed 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "go.uber.org/zap" + "go.uber.org/zap/zapcore" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" "github.com/wormhole-foundation/wormhole/sdk/vaa" @@ -232,17 +233,12 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos } // Check if we already store this VAA - _, err = p.getSignedVAA(*db.VaaIDFromVAA(v)) - if err == nil { - p.logger.Debug("ignored SignedVAAWithQuorum message for VAA we already stored", - zap.String("vaaID", string(db.VaaIDFromVAA(v).Bytes())), - ) - return - } else if err != db.ErrVAANotFound { - p.logger.Error("failed to look up VAA in database", - zap.String("vaaID", string(db.VaaIDFromVAA(v).Bytes())), - zap.Error(err), - ) + if p.haveSignedVAA(*db.VaaIDFromVAA(v)) { + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("ignored SignedVAAWithQuorum message for VAA we already stored", + zap.String("vaaID", string(db.VaaIDFromVAA(v).Bytes())), + ) + } return } diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index c6aadfef1..a1454066d 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -300,6 +300,34 @@ func (p *Processor) storeSignedVAA(v *vaa.VAA) error { return p.db.StoreSignedVAA(v) } +// haveSignedVAA returns true if we already have a VAA for the given VAAID +func (p *Processor) haveSignedVAA(id db.VAAID) bool { + if id.EmitterChain == vaa.ChainIDPythNet { + if p.pythnetVaas == nil { + return false + } + key := fmt.Sprintf("%v/%v", id.EmitterAddress, id.Sequence) + _, exists := p.pythnetVaas[key] + return exists + } + + if p.db == nil { + return false + } + + ok, err := p.db.HasVAA(id) + + if err != nil { + p.logger.Error("failed to look up VAA in database", + zap.String("vaaID", string(id.Bytes())), + zap.Error(err), + ) + return false + } + + return ok +} + func (p *Processor) getSignedVAA(id db.VAAID) (*vaa.VAA, error) { if id.EmitterChain == vaa.ChainIDPythNet {