From 93e1a7240967442eb9629e54d0f396e14e7f022d Mon Sep 17 00:00:00 2001 From: agodnic Date: Thu, 6 Jul 2023 15:26:18 -0300 Subject: [PATCH] Improvements in the `tx-tracker` service (#517) ### Description This pull request implements improvements in the `tx-tracker` service: * Adjust SQS visibilityTimeout. * Process PythNet messages concurrently (this will make it easier for the service to catch up if there are a lot of messages in the input queue). * Add more context information to error messages. --- tx-tracker/cmd/service/main.go | 2 +- tx-tracker/consumer/consumer.go | 18 +----------------- tx-tracker/consumer/processor.go | 1 - tx-tracker/consumer/repository.go | 1 - tx-tracker/consumer/workerpool.go | 25 ++++++++++++++++++++++--- tx-tracker/queue/vaa_sqs.go | 7 ++++++- 6 files changed, 30 insertions(+), 24 deletions(-) diff --git a/tx-tracker/cmd/service/main.go b/tx-tracker/cmd/service/main.go index 46e83be7..e2a1e4e5 100644 --- a/tx-tracker/cmd/service/main.go +++ b/tx-tracker/cmd/service/main.go @@ -121,7 +121,7 @@ func newSqsConsumer(ctx context.Context, cfg *config.ServiceSettings) (*sqs.Cons // of traffic (e.g.: dozens of VAAs being emitted in the same minute), and // also when a we have to retry fetching transaction metadata many times // (due to finality delay, out-of-sync nodes, etc). - sqs.WithVisibilityTimeout(15*60), + sqs.WithVisibilityTimeout(20*60), ) return consumer, err } diff --git a/tx-tracker/consumer/consumer.go b/tx-tracker/consumer/consumer.go index 6c894f3f..30815b66 100644 --- a/tx-tracker/consumer/consumer.go +++ b/tx-tracker/consumer/consumer.go @@ -6,7 +6,6 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" "github.com/wormhole-foundation/wormhole-explorer/txtracker/queue" - sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" ) @@ -57,28 +56,13 @@ func (c *Consumer) producerLoop(ctx context.Context) { for msg := range ch { - event := msg.Data() - - // Check if message is expired. - if msg.IsExpired() { - c.logger.Warn("Message with VAA expired", zap.String("id", event.ID)) - msg.Failed() - continue - } - - // Do not process messages from PythNet - if event.ChainID == sdk.ChainIDPythNet { - msg.Done() - continue - } - // Send the VAA to the worker pool. // // The worker pool is responsible for calling `msg.Done()` err := c.workerPool.Push(ctx, msg) if err != nil { c.logger.Warn("failed to push message into worker pool", - zap.String("vaaId", event.ID), + zap.String("vaaId", msg.Data().ID), zap.Error(err), ) msg.Failed() diff --git a/tx-tracker/consumer/processor.go b/tx-tracker/consumer/processor.go index 06ee2745..13337107 100644 --- a/tx-tracker/consumer/processor.go +++ b/tx-tracker/consumer/processor.go @@ -80,7 +80,6 @@ func ProcessSourceTx( p := UpsertDocumentParams{ VaaId: params.VaaId, ChainId: params.ChainId, - TxHash: params.TxHash, TxDetail: txDetail, TxStatus: txStatus, } diff --git a/tx-tracker/consumer/repository.go b/tx-tracker/consumer/repository.go index 909bd353..388beb90 100644 --- a/tx-tracker/consumer/repository.go +++ b/tx-tracker/consumer/repository.go @@ -40,7 +40,6 @@ func NewRepository(logger *zap.Logger, db *mongo.Database) *Repository { type UpsertDocumentParams struct { VaaId string ChainId sdk.ChainID - TxHash string TxDetail *chains.TxDetail TxStatus domain.SourceTxStatus } diff --git a/tx-tracker/consumer/workerpool.go b/tx-tracker/consumer/workerpool.go index 5487a354..6e09e695 100644 --- a/tx-tracker/consumer/workerpool.go +++ b/tx-tracker/consumer/workerpool.go @@ -8,6 +8,7 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/txtracker/chains" "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" "github.com/wormhole-foundation/wormhole-explorer/txtracker/queue" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" ) @@ -101,6 +102,23 @@ func (w *WorkerPool) process(msg queue.ConsumerMessage) { event := msg.Data() + // Check if the message is expired + if msg.IsExpired() { + w.logger.Warn("Message with VAA expired", + zap.String("vaaId", event.ID), + zap.Bool("isExpired", msg.IsExpired()), + ) + msg.Failed() + return + } + + // Do not process messages from PythNet + if event.ChainID == sdk.ChainIDPythNet { + msg.Done() + return + } + + // Process the VAA p := ProcessSourceTxParams{ VaaId: event.ID, ChainId: event.ChainID, @@ -110,17 +128,18 @@ func (w *WorkerPool) process(msg queue.ConsumerMessage) { } err := ProcessSourceTx(w.ctx, w.logger, w.rpcProviderSettings, w.repository, &p) + // Log a message informing the processing status if err == chains.ErrChainNotSupported { - w.logger.Debug("Skipping VAA - chain not supported", + w.logger.Info("Skipping VAA - chain not supported", zap.String("vaaId", event.ID), ) } else if err != nil { - w.logger.Error("Failed to upsert source transaction details", + w.logger.Error("Failed to process originTx", zap.String("vaaId", event.ID), zap.Error(err), ) } else { - w.logger.Info("Updated source transaction details in the database", + w.logger.Info("Updated originTx in the database", zap.String("id", event.ID), ) } diff --git a/tx-tracker/queue/vaa_sqs.go b/tx-tracker/queue/vaa_sqs.go index adff7ec5..58dfa015 100644 --- a/tx-tracker/queue/vaa_sqs.go +++ b/tx-tracker/queue/vaa_sqs.go @@ -112,7 +112,12 @@ func (m *sqsConsumerMessage) Data() *VaaEvent { func (m *sqsConsumerMessage) Done() { if err := m.consumer.DeleteMessage(m.ctx, m.id); err != nil { - m.logger.Error("Error deleting message from SQS", zap.Error(err)) + m.logger.Error("Error deleting message from SQS", + zap.String("vaaId", m.data.ID), + zap.Bool("isExpired", m.IsExpired()), + zap.Time("expiredAt", m.expiredAt), + zap.Error(err), + ) } m.wg.Done() }