node: implement haveSignedVAA
This commit is contained in:
parent
567e98b750
commit
dcb720309b
|
@ -128,6 +128,20 @@ func (d *Database) StoreSignedVAA(v *vaa.VAA) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (d *Database) HasVAA(id VAAID) (bool, error) {
|
||||
err := d.db.View(func(txn *badger.Txn) error {
|
||||
_, err := txn.Get(id.Bytes())
|
||||
return err
|
||||
})
|
||||
if err == nil {
|
||||
return true, nil
|
||||
}
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
func (d *Database) GetSignedVAABytes(id VAAID) (b []byte, err error) {
|
||||
if err := d.db.View(func(txn *badger.Txn) error {
|
||||
item, err := txn.Get(id.Bytes())
|
||||
|
|
|
@ -81,7 +81,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
|
|||
// This occurs when we observed a message after the cluster has already reached
|
||||
// consensus on it, causing us to never achieve quorum.
|
||||
if ourVaa, ok := s.ourObservation.(*VAA); ok {
|
||||
if _, err := p.getSignedVAA(*db.VaaIDFromVAA(&ourVaa.VAA)); err == nil {
|
||||
if p.haveSignedVAA(*db.VaaIDFromVAA(&ourVaa.VAA)) {
|
||||
// If we have a stored quorum VAA, we can safely expire the state.
|
||||
//
|
||||
// This is a rare case, and we can safely expire the state, since we
|
||||
|
@ -90,11 +90,6 @@ func (p *Processor) handleCleanup(ctx context.Context) {
|
|||
aggregationStateLate.Inc()
|
||||
delete(p.state.signatures, hash)
|
||||
continue
|
||||
} else if err != db.ErrVAANotFound {
|
||||
p.logger.Error("failed to look up VAA in database",
|
||||
zap.String("digest", hash),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
|
||||
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
|
||||
"github.com/wormhole-foundation/wormhole/sdk/vaa"
|
||||
|
@ -232,17 +233,12 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
|
|||
}
|
||||
|
||||
// Check if we already store this VAA
|
||||
_, err = p.getSignedVAA(*db.VaaIDFromVAA(v))
|
||||
if err == nil {
|
||||
p.logger.Debug("ignored SignedVAAWithQuorum message for VAA we already stored",
|
||||
zap.String("vaaID", string(db.VaaIDFromVAA(v).Bytes())),
|
||||
)
|
||||
return
|
||||
} else if err != db.ErrVAANotFound {
|
||||
p.logger.Error("failed to look up VAA in database",
|
||||
zap.String("vaaID", string(db.VaaIDFromVAA(v).Bytes())),
|
||||
zap.Error(err),
|
||||
)
|
||||
if p.haveSignedVAA(*db.VaaIDFromVAA(v)) {
|
||||
if p.logger.Level().Enabled(zapcore.DebugLevel) {
|
||||
p.logger.Debug("ignored SignedVAAWithQuorum message for VAA we already stored",
|
||||
zap.String("vaaID", string(db.VaaIDFromVAA(v).Bytes())),
|
||||
)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -300,6 +300,34 @@ func (p *Processor) storeSignedVAA(v *vaa.VAA) error {
|
|||
return p.db.StoreSignedVAA(v)
|
||||
}
|
||||
|
||||
// haveSignedVAA returns true if we already have a VAA for the given VAAID
|
||||
func (p *Processor) haveSignedVAA(id db.VAAID) bool {
|
||||
if id.EmitterChain == vaa.ChainIDPythNet {
|
||||
if p.pythnetVaas == nil {
|
||||
return false
|
||||
}
|
||||
key := fmt.Sprintf("%v/%v", id.EmitterAddress, id.Sequence)
|
||||
_, exists := p.pythnetVaas[key]
|
||||
return exists
|
||||
}
|
||||
|
||||
if p.db == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
ok, err := p.db.HasVAA(id)
|
||||
|
||||
if err != nil {
|
||||
p.logger.Error("failed to look up VAA in database",
|
||||
zap.String("vaaID", string(id.Bytes())),
|
||||
zap.Error(err),
|
||||
)
|
||||
return false
|
||||
}
|
||||
|
||||
return ok
|
||||
}
|
||||
|
||||
func (p *Processor) getSignedVAA(id db.VAAID) (*vaa.VAA, error) {
|
||||
|
||||
if id.EmitterChain == vaa.ChainIDPythNet {
|
||||
|
|
Loading…
Reference in New Issue