diff --git a/tx-tracker/chains/api_aptos.go b/tx-tracker/chains/api_aptos.go index 3dce4530..c561b711 100644 --- a/tx-tracker/chains/api_aptos.go +++ b/tx-tracker/chains/api_aptos.go @@ -58,7 +58,9 @@ func fetchAptosTx( return nil, fmt.Errorf("failed to parse response body from events endpoint: %w", err) } } - if len(events) != 1 { + if len(events) == 0 { + return nil, ErrTransactionNotFound + } else if len(events) > 1 { return nil, fmt.Errorf("expected exactly one event, but got %d", len(events)) } diff --git a/tx-tracker/chains/api_cosmos.go b/tx-tracker/chains/api_cosmos.go index a4c136e9..9e350f55 100644 --- a/tx-tracker/chains/api_cosmos.go +++ b/tx-tracker/chains/api_cosmos.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" ) @@ -41,7 +42,9 @@ func fetchCosmosTx( // Perform the HTTP request uri := fmt.Sprintf("%s/cosmos/tx/v1beta1/txs/%s", baseUrl, txHash) body, err := httpGet(ctx, rateLimiter, uri) - if err != nil { + if strings.Contains(err.Error(), "404") { + return nil, ErrTransactionNotFound + } else if err != nil { return nil, fmt.Errorf("failed to query cosmos tx endpoint: %w", err) } diff --git a/tx-tracker/chains/api_sui.go b/tx-tracker/chains/api_sui.go index 04976c9b..1c680b57 100644 --- a/tx-tracker/chains/api_sui.go +++ b/tx-tracker/chains/api_sui.go @@ -3,6 +3,7 @@ package chains import ( "context" "fmt" + "strings" "time" "github.com/ethereum/go-ethereum/rpc" @@ -52,7 +53,9 @@ func fetchSuiTx( // Execute the remote procedure call opts := suiGetTransactionBlockOpts{ShowInput: true} err = client.CallContext(ctx, &reply, "sui_getTransactionBlock", txHash, opts) - if err != nil { + if strings.Contains(err.Error(), "Could not find the referenced transaction") { + return nil, ErrTransactionNotFound + } else if err != nil { return nil, fmt.Errorf("failed to get tx by hash: %w", err) } } diff --git a/tx-tracker/cmd/service/main.go b/tx-tracker/cmd/service/main.go index e2a1e4e5..b8e8b947 100644 --- a/tx-tracker/cmd/service/main.go +++ b/tx-tracker/cmd/service/main.go @@ -114,14 +114,7 @@ func newSqsConsumer(ctx context.Context, cfg *config.ServiceSettings) (*sqs.Cons awsconfig, cfg.SqsUrl, sqs.WithMaxMessages(10), - // We're setting a high visibility timeout to decrease the likelihood of a - // message being processed more than once. - // - // This is particularly relevant for the cases in which we receive a burst - // of traffic (e.g.: dozens of VAAs being emitted in the same minute), and - // also when a we have to retry fetching transaction metadata many times - // (due to finality delay, out-of-sync nodes, etc). - sqs.WithVisibilityTimeout(20*60), + sqs.WithVisibilityTimeout(4*60), ) return consumer, err } diff --git a/tx-tracker/consumer/consumer.go b/tx-tracker/consumer/consumer.go index a2f6f758..a0d60ac5 100644 --- a/tx-tracker/consumer/consumer.go +++ b/tx-tracker/consumer/consumer.go @@ -2,38 +2,37 @@ package consumer import ( "context" + "errors" + "github.com/wormhole-foundation/wormhole-explorer/txtracker/chains" "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" "github.com/wormhole-foundation/wormhole-explorer/txtracker/queue" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" ) // Consumer consumer struct definition. type Consumer struct { - consumeFunc queue.VAAConsumeFunc - rpcServiceProviderSettings *config.RpcProviderSettings - logger *zap.Logger - repository *Repository - workerPool *WorkerPool + consumeFunc queue.VAAConsumeFunc + rpcProviderSettings *config.RpcProviderSettings + logger *zap.Logger + repository *Repository } // New creates a new vaa consumer. func New( consumeFunc queue.VAAConsumeFunc, - rpcServiceProviderSettings *config.RpcProviderSettings, + rpcProviderSettings *config.RpcProviderSettings, ctx context.Context, logger *zap.Logger, repository *Repository, ) *Consumer { - workerPool := NewWorkerPool(ctx, logger, rpcServiceProviderSettings, repository) - c := Consumer{ - consumeFunc: consumeFunc, - rpcServiceProviderSettings: rpcServiceProviderSettings, - logger: logger, - repository: repository, - workerPool: workerPool, + consumeFunc: consumeFunc, + rpcProviderSettings: rpcProviderSettings, + logger: logger, + repository: repository, } return &c @@ -49,19 +48,76 @@ func (c *Consumer) producerLoop(ctx context.Context) { ch := c.consumeFunc(ctx) for msg := range ch { - - c.logger.Debug("Received message, pushing to worker pool", zap.String("vaaId", msg.Data().ID)) - - // Send the VAA to the worker pool. - // - // The worker pool is responsible for calling `msg.Done()` - err := c.workerPool.Push(ctx, msg) - if err != nil { - c.logger.Warn("failed to push message into worker pool", - zap.String("vaaId", msg.Data().ID), - zap.Error(err), - ) - msg.Failed() - } + c.logger.Debug("Received message", zap.String("vaaId", msg.Data().ID)) + c.process(ctx, msg) + } +} + +func (c *Consumer) process(ctx context.Context, msg queue.ConsumerMessage) { + + event := msg.Data() + + // Do not process messages from PythNet + if event.ChainID == sdk.ChainIDPythNet { + if !msg.IsExpired() { + c.logger.Debug("Deleting PythNet message", zap.String("vaaId", event.ID)) + msg.Done() + } else { + c.logger.Debug("Skipping expired PythNet message", zap.String("vaaId", event.ID)) + } + return + } + + // Skip non-processed, expired messages + if msg.IsExpired() { + c.logger.Warn("Message expired - skipping", + zap.String("vaaId", event.ID), + zap.Bool("isExpired", msg.IsExpired()), + ) + return + } + + // Process the VAA + p := ProcessSourceTxParams{ + VaaId: event.ID, + ChainId: event.ChainID, + Emitter: event.EmitterAddress, + Sequence: event.Sequence, + TxHash: event.TxHash, + Overwrite: false, // avoid processing the same transaction twice + } + err := ProcessSourceTx(ctx, c.logger, c.rpcProviderSettings, c.repository, &p) + + // Log a message informing the processing status + if errors.Is(err, chains.ErrChainNotSupported) { + c.logger.Info("Skipping VAA - chain not supported", + zap.String("vaaId", event.ID), + ) + } else if errors.Is(err, ErrAlreadyProcessed) { + c.logger.Warn("Message already processed - skipping", + zap.String("vaaId", event.ID), + ) + } else if errors.Is(err, chains.ErrTransactionNotFound) { + c.logger.Warn("Transaction not found - will retry after SQS visibilityTimeout", + zap.String("vaaId", event.ID), + ) + return + } else if err != nil { + c.logger.Error("Failed to process originTx - will retry after SQS visibilityTimeout", + zap.String("vaaId", event.ID), + zap.Error(err), + ) + return + } else { + c.logger.Info("Transaction processed successfully", + zap.String("id", event.ID), + ) + } + + // Mark the message as done + // + // If the message is expired, it will be put back into the queue. + if !msg.IsExpired() { + msg.Done() } } diff --git a/tx-tracker/consumer/processor.go b/tx-tracker/consumer/processor.go index 6ff89537..ef19f655 100644 --- a/tx-tracker/consumer/processor.go +++ b/tx-tracker/consumer/processor.go @@ -3,7 +3,7 @@ package consumer import ( "context" "errors" - "time" + "fmt" "github.com/wormhole-foundation/wormhole-explorer/common/domain" "github.com/wormhole-foundation/wormhole-explorer/txtracker/chains" @@ -12,11 +12,6 @@ import ( "go.uber.org/zap" ) -const ( - maxAttempts = 1 - retryDelay = 5 * time.Minute -) - var ErrAlreadyProcessed = errors.New("VAA was already processed") // ProcessSourceTxParams is a struct that contains the parameters for the ProcessSourceTx method. @@ -43,66 +38,25 @@ func ProcessSourceTx( params *ProcessSourceTxParams, ) error { + if !params.Overwrite { + // If the message has already been processed, skip it. + // + // Sometimes the SQS visibility timeout expires and the message is put back into the queue, + // even if the RPC nodes have been hit and data has been written to MongoDB. + // In those cases, when we fetch the message for the second time, + // we don't want to hit the RPC nodes again for performance reasons. + processed, err := repository.AlreadyProcessed(ctx, params.VaaId) + if err != nil { + return err + } else if err == nil && processed { + return ErrAlreadyProcessed + } + } + // Get transaction details from the emitter blockchain - // - // If the transaction is not found, will retry a few times before giving up. - var txStatus domain.SourceTxStatus - var txDetail *chains.TxDetail - var err error - for attempts := 1; attempts <= maxAttempts; attempts++ { - - if !params.Overwrite { - // If the message has already been processed, skip it. - // - // Sometimes the SQS visibility timeout expires and the message is put back into the queue, - // even if the RPC nodes have been hit and data has been written to MongoDB. - // In those cases, when we fetch the message for the second time, - // we don't want to hit the RPC nodes again for performance reasons. - processed, err := repository.AlreadyProcessed(ctx, params.VaaId) - if err != nil { - return err - } else if err == nil && processed { - return ErrAlreadyProcessed - } - } - - txDetail, err = chains.FetchTx(ctx, rpcServiceProviderSettings, params.ChainId, params.TxHash) - - switch { - // If the transaction is not found, retry after a delay - case errors.Is(err, chains.ErrTransactionNotFound): - txStatus = domain.SourceTxStatusInternalError - logger.Warn("transaction not found, will retry after a delay", - zap.String("vaaId", params.VaaId), - zap.Duration("retryDelay", retryDelay), - zap.Int("attempts", attempts), - zap.Int("maxAttempts", maxAttempts), - ) - time.Sleep(retryDelay) - continue - - // If the chain ID is not supported, we're done. - case errors.Is(err, chains.ErrChainNotSupported): - return err - - // If the context was cancelled, do not attempt to save the result on the database - case errors.Is(err, context.Canceled): - return err - - // If there is an internal error, give up - case err != nil: - logger.Error("Failed to fetch source transaction details", - zap.String("vaaId", params.VaaId), - zap.Error(err), - ) - txStatus = domain.SourceTxStatusInternalError - break - - // Success - case err == nil: - txStatus = domain.SourceTxStatusConfirmed - break - } + txDetail, err := chains.FetchTx(ctx, rpcServiceProviderSettings, params.ChainId, params.TxHash) + if err != nil { + return fmt.Errorf("failed to process transaction: %w", err) } // Store source transaction details in the database @@ -110,7 +64,7 @@ func ProcessSourceTx( VaaId: params.VaaId, ChainId: params.ChainId, TxDetail: txDetail, - TxStatus: txStatus, + TxStatus: domain.SourceTxStatusConfirmed, } return repository.UpsertDocument(ctx, &p) } diff --git a/tx-tracker/consumer/workerpool.go b/tx-tracker/consumer/workerpool.go deleted file mode 100644 index acad425a..00000000 --- a/tx-tracker/consumer/workerpool.go +++ /dev/null @@ -1,162 +0,0 @@ -package consumer - -import ( - "context" - "fmt" - "sync" - - "github.com/wormhole-foundation/wormhole-explorer/txtracker/chains" - "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" - "github.com/wormhole-foundation/wormhole-explorer/txtracker/queue" - sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" - "go.uber.org/zap" -) - -const numWorkers = 500 - -// WorkerPool is an abstraction to process VAAs concurrently. -type WorkerPool struct { - wg sync.WaitGroup - chInput chan queue.ConsumerMessage - ctx context.Context - logger *zap.Logger - rpcProviderSettings *config.RpcProviderSettings - repository *Repository -} - -// NewWorkerPool creates a new worker pool. -func NewWorkerPool( - ctx context.Context, - logger *zap.Logger, - rpcProviderSettings *config.RpcProviderSettings, - repository *Repository, -) *WorkerPool { - - w := WorkerPool{ - chInput: make(chan queue.ConsumerMessage), - ctx: ctx, - logger: logger, - rpcProviderSettings: rpcProviderSettings, - repository: repository, - } - - // Spawn worker goroutines - for i := 0; i < numWorkers; i++ { - w.wg.Add(1) - go w.consumerLoop() - } - - return &w -} - -// Push sends a new item to the worker pool. -// -// This function will block until either a worker is available or the context is cancelled. -func (w *WorkerPool) Push(ctx context.Context, msg queue.ConsumerMessage) error { - - select { - case w.chInput <- msg: - return nil - case <-ctx.Done(): - return fmt.Errorf("failed to push message into worker pool due to calcelled context: %w", ctx.Err()) - } -} - -// StopGracefully stops the worker pool gracefully. -// -// This function blocks until the consumer queue is empty. -func (w *WorkerPool) StopGracefully() { - - // Close the producer channel. - // This will stop sending items to the workers. - // After all items are consumed, the workers will exit. - close(w.chInput) - w.chInput = nil - - // Wait for all workers to finish gracefully - w.wg.Wait() -} - -// consumerLoop is the main loop of a worker. -// -// It will consume items from the input channel until the channel is closed or the context is cancelled. -func (w *WorkerPool) consumerLoop() { - for { - select { - case msg, ok := <-w.chInput: - if !ok { - w.wg.Done() - return - } - w.process(msg) - - case <-w.ctx.Done(): - w.wg.Done() - return - } - } -} - -// process consumes a single item from the input channel. -func (w *WorkerPool) process(msg queue.ConsumerMessage) { - - event := msg.Data() - - // Do not process messages from PythNet - if event.ChainID == sdk.ChainIDPythNet { - if !msg.IsExpired() { - w.logger.Debug("Deleting PythNet message", zap.String("vaaId", event.ID)) - msg.Done() - } else { - w.logger.Debug("Skipping expired PythNet message", zap.String("vaaId", event.ID)) - } - return - } - - // Skip non-processed, expired messages - if msg.IsExpired() { - w.logger.Warn("Message expired - skipping", - zap.String("vaaId", event.ID), - zap.Bool("isExpired", msg.IsExpired()), - ) - return - } - - // Process the VAA - p := ProcessSourceTxParams{ - VaaId: event.ID, - ChainId: event.ChainID, - Emitter: event.EmitterAddress, - Sequence: event.Sequence, - TxHash: event.TxHash, - Overwrite: false, // avoid processing the same transaction twice - } - err := ProcessSourceTx(w.ctx, w.logger, w.rpcProviderSettings, w.repository, &p) - - // Log a message informing the processing status - if err == chains.ErrChainNotSupported { - w.logger.Info("Skipping VAA - chain not supported", - zap.String("vaaId", event.ID), - ) - } else if err == ErrAlreadyProcessed { - w.logger.Warn("Message already processed - skipping", - zap.String("vaaId", event.ID), - ) - } else if err != nil { - w.logger.Error("Failed to process originTx", - zap.String("vaaId", event.ID), - zap.Error(err), - ) - } else { - w.logger.Info("Updated originTx in the database", - zap.String("id", event.ID), - ) - } - - // Mark the message as done - // - // If the message is expired, it will be put back into the queue. - if !msg.IsExpired() { - msg.Done() - } -}