node: don't store pythnet VAAs in the database (#1643)

* Don't store pythnet VAAs in the database

Change-Id: Ief4357ab4c909d25dc9182490c322ef253ac23d3

* Clean up logging

Change-Id: I46efea96d6c2ba65459254ffeb21f5d65abebb01

* Rework this to require less custom code

Change-Id: Ib7f521ecff62b1bd13efcb627f88413f4141de59

* Fix copy paste error

Change-Id: I067f8364042f494ad56ed88919cd917f18423073

* Fix typo

Change-Id: I22ccb56ac330bd557b6e8438cfe9c02d7593361d
This commit is contained in:
bruce-riley 2022-09-27 08:14:52 -05:00 committed by GitHub
parent e2575550e8
commit ddd8b78160
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 72 additions and 17 deletions

View File

@ -73,7 +73,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
// This occurs when we observed a message after the cluster has already reached
// consensus on it, causing us to never achieve quorum.
if ourVaa, ok := s.ourObservation.(*VAA); ok {
if _, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(&ourVaa.VAA)); err == nil {
if _, err := p.getSignedVAA(*db.VaaIDFromVAA(&ourVaa.VAA)); err == nil {
// If we have a stored quorum VAA, we can safely expire the state.
//
// This is a rare case, and we can safely expire the state, since we
@ -225,4 +225,13 @@ func (p *Processor) handleCleanup(ctx context.Context) {
}
}
}
// Clean up old pythnet VAAs.
oldestTime := time.Now().Add(-time.Hour)
for key, pe := range p.pythnetVaas {
if pe.updateTime.Before(oldestTime) {
p.logger.Info("PYTHNET: dropping old pythnet vaa", zap.String("message_id", key), zap.Stringer("updateTime", pe.updateTime))
delete(p.pythnetVaas, key)
}
}
}

View File

@ -101,13 +101,8 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat
//
// Exception: if an observation is made within the settlement time (30s), we'll
// process it so other nodes won't consider it a miss.
if vb, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(&v.VAA)); err == nil {
// unmarshal vaa
var existing *vaa.VAA
if existing, err = vaa.Unmarshal(vb); err != nil {
panic("failed to unmarshal VAA from db")
}
if existing, err := p.getSignedVAA(*db.VaaIDFromVAA(&v.VAA)); err == nil {
if k.Timestamp.Sub(existing.Timestamp) > settlementTime {
p.logger.Info("ignoring observation since we already have a quorum VAA for it",
zap.Stringer("emitter_chain", k.EmitterChain),

View File

@ -286,7 +286,7 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
// - enough signatures are present for the VAA to reach quorum
// Check if we already store this VAA
_, err = p.db.GetSignedVAABytes(*db.VaaIDFromVAA(v))
_, err = p.getSignedVAA(*db.VaaIDFromVAA(v))
if err == nil {
p.logger.Debug("ignored SignedVAAWithQuorum message for VAA we already store",
zap.String("digest", hash),
@ -307,7 +307,7 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
zap.String("bytes", hex.EncodeToString(m.Vaa)),
zap.String("message_id", v.MessageID()))
if err := p.db.StoreSignedVAA(v); err != nil {
if err := p.storeSignedVAA(v); err != nil {
p.logger.Error("failed to store signed VAA", zap.Error(err))
return
}

View File

@ -3,6 +3,7 @@ package processor
import (
"context"
"crypto/ecdsa"
"fmt"
"time"
"github.com/certusone/wormhole/node/pkg/notify/discord"
@ -73,6 +74,11 @@ type (
}
)
type PythNetVaaEntry struct {
v *vaa.VAA
updateTime time.Time // Used for determining when to delete entries
}
type Processor struct {
// lockC is a channel of observed emitted messages
lockC chan *common.MessagePublication
@ -122,8 +128,9 @@ type Processor struct {
// cleanup triggers periodic state cleanup
cleanup *time.Ticker
notifier *discord.DiscordNotifier
governor *governor.ChainGovernor
notifier *discord.DiscordNotifier
governor *governor.ChainGovernor
pythnetVaas map[string]PythNetVaaEntry
}
func NewProcessor(
@ -165,10 +172,11 @@ func NewProcessor(
notifier: notifier,
logger: supervisor.Logger(ctx),
state: &aggregationState{observationMap{}},
ourAddr: crypto.PubkeyToAddress(gk.PublicKey),
governor: g,
logger: supervisor.Logger(ctx),
state: &aggregationState{observationMap{}},
ourAddr: crypto.PubkeyToAddress(gk.PublicKey),
governor: g,
pythnetVaas: make(map[string]PythNetVaaEntry),
}
}
@ -218,3 +226,39 @@ func (p *Processor) Run(ctx context.Context) error {
}
}
}
func (p *Processor) storeSignedVAA(v *vaa.VAA) error {
if v.EmitterChain == vaa.ChainIDPythNet {
key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence)
p.logger.Info("PYTHNET: storing pythnet vaa", zap.String("message_id", key))
p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()}
return nil
}
return p.db.StoreSignedVAA(v)
}
func (p *Processor) getSignedVAA(id db.VAAID) (*vaa.VAA, error) {
if id.EmitterChain == vaa.ChainIDPythNet {
key := fmt.Sprintf("%v/%v", id.EmitterAddress, id.Sequence)
ret, exists := p.pythnetVaas[key]
if exists {
p.logger.Info("PYTHNET: found pythnet vaa", zap.String("message_id", key))
return ret.v, nil
}
p.logger.Info("PYTHNET: did not find pythnet vaa", zap.String("message_id", key))
return nil, db.ErrVAANotFound
}
vb, err := p.db.GetSignedVAABytes(id)
if err != nil {
return nil, err
}
vaa, err := vaa.Unmarshal(vb)
if err != nil {
panic("failed to unmarshal VAA from db")
}
return vaa, err
}

View File

@ -38,7 +38,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
zap.String("bytes", hex.EncodeToString(vaaBytes)),
zap.String("message_id", signed.MessageID()))
if err := p.db.StoreSignedVAA(signed); err != nil {
if err := p.storeSignedVAA(signed); err != nil {
p.logger.Error("failed to store signed VAA", zap.Error(err))
}

View File

@ -68,6 +68,13 @@ func (s *PublicrpcServer) GetSignedVAA(ctx context.Context, req *publicrpcv1.Get
return nil, status.Error(codes.InvalidArgument, "no message ID specified")
}
chainID := vaa.ChainID(req.MessageId.EmitterChain.Number())
// This interface is not supported for PythNet messages because those VAAs are not stored in the database.
if chainID == vaa.ChainIDPythNet {
return nil, status.Error(codes.InvalidArgument, "not supported for PythNet")
}
address, err := hex.DecodeString(req.MessageId.EmitterAddress)
if err != nil {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("failed to decode address: %v", err))
@ -80,7 +87,7 @@ func (s *PublicrpcServer) GetSignedVAA(ctx context.Context, req *publicrpcv1.Get
copy(addr[:], address)
b, err := s.db.GetSignedVAABytes(db.VAAID{
EmitterChain: vaa.ChainID(req.MessageId.EmitterChain.Number()),
EmitterChain: chainID,
EmitterAddress: addr,
Sequence: req.MessageId.Sequence,
})