diff --git a/tx-tracker/cmd/backfiller/main.go b/tx-tracker/cmd/backfiller/main.go index b7977a59..55250ae9 100644 --- a/tx-tracker/cmd/backfiller/main.go +++ b/tx-tracker/cmd/backfiller/main.go @@ -85,7 +85,7 @@ func main() { // Spawn the producer goroutine. // // The producer sends tasks to the workers via a buffered channel. - queue := make(chan consumer.GlobalTransaction, cfg.BulkSize) + queue := make(chan consumer.VaaById, cfg.BulkSize) p := producerParams{ logger: makeLogger(rootLogger, "producer"), repository: repository, @@ -157,7 +157,7 @@ func parseStrategyCallbacks( countFn: func(ctx context.Context) (uint64, error) { return r.CountDocumentsByTimeRange(ctx, timestampAfter, timestampBefore) }, - iteratorFn: func(ctx context.Context, lastId string, lastTimestamp *time.Time, limit uint) ([]consumer.GlobalTransaction, error) { + iteratorFn: func(ctx context.Context, lastId string, lastTimestamp *time.Time, limit uint) ([]consumer.VaaById, error) { return r.GetDocumentsByTimeRange(ctx, lastId, lastTimestamp, limit, timestampAfter, timestampBefore) }, } @@ -177,14 +177,14 @@ func parseStrategyCallbacks( type strategyCallbacks struct { countFn func(ctx context.Context) (uint64, error) - iteratorFn func(ctx context.Context, lastId string, lastTimestamp *time.Time, limit uint) ([]consumer.GlobalTransaction, error) + iteratorFn func(ctx context.Context, lastId string, lastTimestamp *time.Time, limit uint) ([]consumer.VaaById, error) } // producerParams contains the parameters for the producer goroutine. type producerParams struct { logger *zap.Logger repository *consumer.Repository - queueTx chan<- consumer.GlobalTransaction + queueTx chan<- consumer.VaaById bulkSize uint strategyCallbacks *strategyCallbacks } @@ -239,7 +239,7 @@ type consumerParams struct { logger *zap.Logger rpcProviderSettings *config.RpcProviderSettings repository *consumer.Repository - queueRx <-chan consumer.GlobalTransaction + queueRx <-chan consumer.VaaById wg *sync.WaitGroup totalDocuments uint64 processedDocuments *atomic.Uint64 diff --git a/tx-tracker/consumer/consumer.go b/tx-tracker/consumer/consumer.go index c97ea0d5..c6896ca2 100644 --- a/tx-tracker/consumer/consumer.go +++ b/tx-tracker/consumer/consumer.go @@ -62,13 +62,12 @@ func (c *Consumer) producerLoop(ctx context.Context) { func (c *Consumer) process(ctx context.Context, msg queue.ConsumerMessage) { - defer msg.Done() - event := msg.Data() // Do not process messages from PythNet if event.ChainID == sdk.ChainIDPythNet { c.logger.Debug("Skipping expired PythNet message", zap.String("vaaId", event.ID)) + msg.Done() return } @@ -88,19 +87,23 @@ func (c *Consumer) process(ctx context.Context, msg queue.ConsumerMessage) { // Log a message informing the processing status if errors.Is(err, chains.ErrChainNotSupported) { + msg.Done() c.logger.Info("Skipping VAA - chain not supported", zap.String("vaaId", event.ID), ) } else if errors.Is(err, ErrAlreadyProcessed) { + msg.Done() c.logger.Warn("Message already processed - skipping", zap.String("vaaId", event.ID), ) } else if err != nil { + msg.Failed() c.logger.Error("Failed to process originTx", zap.String("vaaId", event.ID), zap.Error(err), ) } else { + msg.Done() c.logger.Info("Transaction processed successfully", zap.String("id", event.ID), ) diff --git a/tx-tracker/consumer/repository.go b/tx-tracker/consumer/repository.go index b6f11d7d..fc9b066b 100644 --- a/tx-tracker/consumer/repository.go +++ b/tx-tracker/consumer/repository.go @@ -80,6 +80,40 @@ func (r *Repository) UpsertDocument(ctx context.Context, params *UpsertDocumentP return nil } +// GlobalTransaction represents a global transaction. +type GlobalTransaction struct { + ID string `bson:"_id" json:"id"` + OriginTx *OriginTx `bson:"originTx" json:"originTx"` + DestinationTx *DestinationTx `bson:"destinationTx" json:"destinationTx"` +} + +// OriginTx represents a origin transaction. +type OriginTx struct { + TxHash string `bson:"nativeTxHash" json:"txHash"` + From string `bson:"from" json:"from"` + Status string `bson:"status" json:"status"` + Attribute *AttributeDoc `bson:"attribute" json:"attribute"` +} + +// AttributeDoc represents a custom attribute for a origin transaction. +type AttributeDoc struct { + Type string `bson:"type" json:"type"` + Value map[string]any `bson:"value" json:"value"` +} + +// DestinationTx represents a destination transaction. +type DestinationTx struct { + ChainID sdk.ChainID `bson:"chainId" json:"chainId"` + Status string `bson:"status" json:"status"` + Method string `bson:"method" json:"method"` + TxHash string `bson:"txHash" json:"txHash"` + From string `bson:"from" json:"from"` + To string `bson:"to" json:"to"` + BlockNumber string `bson:"blockNumber" json:"blockNumber"` + Timestamp *time.Time `bson:"timestamp" json:"timestamp"` + UpdatedAt *time.Time `bson:"updatedAt" json:"updatedAt"` +} + // AlreadyProcessed returns true if the given VAA ID has already been processed. func (r *Repository) AlreadyProcessed(ctx context.Context, vaaId string) (bool, error) { @@ -87,15 +121,19 @@ func (r *Repository) AlreadyProcessed(ctx context.Context, vaaId string) (bool, globalTransactions. FindOne(ctx, bson.D{{"_id", vaaId}}) - var tx GlobalTransaction - err := result.Decode(&tx) + var globalTransaction GlobalTransaction + err := result.Decode(&globalTransaction) if err == mongo.ErrNoDocuments { return false, nil } else if err != nil { return false, fmt.Errorf("failed to decode already processed VAA id: %w", err) - } else { - return true, nil } + + // check if the originTx field does not exist, otherwise the VAA has already been processed + if globalTransaction.OriginTx == nil { + return false, nil + } + return true, nil } // CountDocumentsByTimeRange returns the number of documents that match the given time range. @@ -204,7 +242,7 @@ func (r *Repository) CountIncompleteDocuments(ctx context.Context) (uint64, erro return results[0].NumDocuments, nil } -type GlobalTransaction struct { +type VaaById struct { Id string `bson:"_id"` Vaas []vaa.VaaDoc `bson:"vaas"` } @@ -217,7 +255,7 @@ func (r *Repository) GetDocumentsByTimeRange( limit uint, timeAfter time.Time, timeBefore time.Time, -) ([]GlobalTransaction, error) { +) ([]VaaById, error) { // Build the aggregation pipeline var pipeline mongo.Pipeline @@ -281,9 +319,9 @@ func (r *Repository) GetDocumentsByTimeRange( } // Build the result - var globalTransactions []GlobalTransaction + var globalTransactions []VaaById for i := range documents { - globalTransaction := GlobalTransaction{ + globalTransaction := VaaById{ Id: documents[i].ID, Vaas: []vaa.VaaDoc{documents[i]}, } @@ -299,7 +337,7 @@ func (r *Repository) GetIncompleteDocuments( lastId string, lastTimestamp *time.Time, limit uint, -) ([]GlobalTransaction, error) { +) ([]VaaById, error) { // Build the aggregation pipeline var pipeline mongo.Pipeline @@ -352,7 +390,7 @@ func (r *Repository) GetIncompleteDocuments( } // Read results from cursor - var documents []GlobalTransaction + var documents []VaaById err = cur.All(ctx, &documents) if err != nil { r.logger.Error("failed to decode cursor", zap.Error(err))