Add logic to fix vaa txhash in tx-tracker component. (#670)

This commit is contained in:
walker-16 2023-08-29 10:50:17 -03:00 committed by Fernando Torres
parent 29bb5fd77b
commit 969dba79cc
5 changed files with 81 additions and 5 deletions

View File

@ -68,7 +68,9 @@ func (c *Consumer) process(ctx context.Context, msg queue.ConsumerMessage) {
// Do not process messages from PythNet
if event.ChainID == sdk.ChainIDPythNet {
c.logger.Debug("Skipping expired PythNet message", zap.String("vaaId", event.ID))
c.logger.Debug("Skipping expired PythNet message",
zap.String("vaaId", event.ID),
zap.String("trackId", event.TrackID))
return
}
@ -90,20 +92,24 @@ func (c *Consumer) process(ctx context.Context, msg queue.ConsumerMessage) {
if errors.Is(err, chains.ErrChainNotSupported) {
c.logger.Info("Skipping VAA - chain not supported",
zap.String("vaaId", event.ID),
)
zap.String("trackId", event.TrackID))
} else if errors.Is(err, ErrAlreadyProcessed) {
c.logger.Warn("Message already processed - skipping",
zap.String("vaaId", event.ID),
)
zap.String("trackId", event.TrackID))
} else if errors.Is(err, ErrVaaWithoutTxHash) {
c.logger.Error("Skipping VAA without txHash",
zap.String("vaaId", event.ID),
zap.String("trackId", event.TrackID))
} else if err != nil {
c.logger.Error("Failed to process originTx",
zap.String("vaaId", event.ID),
zap.Error(err),
)
zap.String("trackId", event.TrackID))
} else {
c.logger.Info("Transaction processed successfully",
zap.String("id", event.ID),
)
zap.String("trackId", event.TrackID))
c.metrics.IncOriginTxInserted(uint16(event.ChainID))
}
}

View File

@ -14,6 +14,7 @@ import (
)
var ErrAlreadyProcessed = errors.New("VAA was already processed")
var ErrVaaWithoutTxHash = errors.New("VAA without txHash")
const (
minRetries = 3
@ -62,6 +63,16 @@ func ProcessSourceTx(
}
}
// Try to fix the VAA without txHash
if params.TxHash == "" {
txHash, err := fixVaaWithoutTxHash(ctx, repository, params.VaaId)
if err != nil {
logger.Info("failed to fix vaa without txHash", zap.String("vaaId", params.VaaId), zap.Error(err))
return ErrVaaWithoutTxHash
}
params.TxHash = txHash
}
// The loop below tries to fetch transaction details from an external API / RPC node.
//
// It keeps retrying until both of these conditions are met:
@ -105,3 +116,19 @@ func ProcessSourceTx(
}
return repository.UpsertDocument(ctx, &p)
}
// fixVaaWithoutTxHash tries to fix a VAA that was inserted into the database without a txHash.
func fixVaaWithoutTxHash(ctx context.Context, repository *Repository, vaaId string) (string, error) {
vaaIdTxHash, err := repository.GetVaaIdTxHash(ctx, vaaId)
if err != nil || vaaIdTxHash == nil {
return "", err
}
if vaaIdTxHash.TxHash == "" {
return "", fmt.Errorf("txhash for vaa (%s) is empty", vaaId)
}
err = repository.UpdateVaaDocTxHash(ctx, vaaId, vaaIdTxHash.TxHash)
if err != nil {
return "", err
}
return vaaIdTxHash.TxHash, nil
}

View File

@ -22,6 +22,7 @@ type Repository struct {
logger *zap.Logger
globalTransactions *mongo.Collection
vaas *mongo.Collection
vaaIdTxHash *mongo.Collection
}
// New creates a new repository.
@ -31,6 +32,7 @@ func NewRepository(logger *zap.Logger, db *mongo.Database) *Repository {
logger: logger,
globalTransactions: db.Collection("globalTransactions"),
vaas: db.Collection("vaas"),
vaaIdTxHash: db.Collection("vaaIdTxHash"),
}
return &r
@ -361,3 +363,41 @@ func (r *Repository) GetIncompleteDocuments(
return documents, nil
}
// VaaIdTxHash represents a vaaIdTxHash document.
type VaaIdTxHash struct {
ChainID sdk.ChainID `bson:"emitterChain"`
Emitter string `bson:"emitterAddr"`
Sequence string `bson:"sequence"`
TxHash string `bson:"txHash"`
UpdatedAt *time.Time `bson:"updatedAt"`
}
// GetVaaIdTxHash returns a vaaIdTxHash document.
func (r *Repository) GetVaaIdTxHash(ctx context.Context, id string) (*VaaIdTxHash, error) {
var v VaaIdTxHash
err := r.vaaIdTxHash.FindOne(ctx, bson.M{"_id": id}).Decode(&v)
return &v, err
}
// VaaUpdate represents a vaa document.
type VaaUpdate struct {
TxHash string `bson:"txHash,omitempty"`
UpdatedAt *time.Time `bson:"updatedAt"`
}
// UpdateVaaTxHash update a txhash in a vaa document.
func (r *Repository) UpdateVaaDocTxHash(ctx context.Context, id string, txhash string) error {
vaaDoc := &VaaUpdate{
TxHash: txhash,
UpdatedAt: &time.Time{},
}
update := bson.M{
"$set": vaaDoc,
"$inc": bson.D{{Key: "revision", Value: 1}},
}
_, err := r.vaas.UpdateByID(ctx, id, update, nil)
return err
}

View File

@ -21,6 +21,7 @@ type Event struct {
Vaa []byte
Timestamp *time.Time
TxHash string
TrackID string
}
// ConsumerMessage defition.

View File

@ -131,6 +131,7 @@ func (q *SQS) createEvent(notification *domain.NotificationEvent) *Event {
Vaa: signedVaa.Vaa,
Timestamp: &signedVaa.Timestamp,
TxHash: signedVaa.TxHash,
TrackID: notification.TrackID,
}
case domain.PublishedLogMessageType:
plm, err := domain.GetEventPayload[domain.PublishedLogMessage](notification)
@ -146,6 +147,7 @@ func (q *SQS) createEvent(notification *domain.NotificationEvent) *Event {
Vaa: plm.Vaa,
Timestamp: &plm.Timestamp,
TxHash: plm.TxHash,
TrackID: notification.TrackID,
}
}
return nil