Fix missing `txHash` for chains 1 and 22 (#530)

### Summary

This pull request changes the retry logic in the `tx-tracker`.

When failing to process a transaction, the associated SQS message will be left to expire and will reappear after `visibilityTimeout` seconds. As a side effect, there will be several retries before giving up.
This commit is contained in:
agodnic 2023-07-13 12:26:38 -03:00 committed by GitHub
parent 50774265ea
commit 1ef666322c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 115 additions and 266 deletions

View File

@ -58,7 +58,9 @@ func fetchAptosTx(
return nil, fmt.Errorf("failed to parse response body from events endpoint: %w", err)
}
}
if len(events) != 1 {
if len(events) == 0 {
return nil, ErrTransactionNotFound
} else if len(events) > 1 {
return nil, fmt.Errorf("expected exactly one event, but got %d", len(events))
}

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
)
@ -41,7 +42,9 @@ func fetchCosmosTx(
// Perform the HTTP request
uri := fmt.Sprintf("%s/cosmos/tx/v1beta1/txs/%s", baseUrl, txHash)
body, err := httpGet(ctx, rateLimiter, uri)
if err != nil {
if strings.Contains(err.Error(), "404") {
return nil, ErrTransactionNotFound
} else if err != nil {
return nil, fmt.Errorf("failed to query cosmos tx endpoint: %w", err)
}

View File

@ -3,6 +3,7 @@ package chains
import (
"context"
"fmt"
"strings"
"time"
"github.com/ethereum/go-ethereum/rpc"
@ -52,7 +53,9 @@ func fetchSuiTx(
// Execute the remote procedure call
opts := suiGetTransactionBlockOpts{ShowInput: true}
err = client.CallContext(ctx, &reply, "sui_getTransactionBlock", txHash, opts)
if err != nil {
if strings.Contains(err.Error(), "Could not find the referenced transaction") {
return nil, ErrTransactionNotFound
} else if err != nil {
return nil, fmt.Errorf("failed to get tx by hash: %w", err)
}
}

View File

@ -114,14 +114,7 @@ func newSqsConsumer(ctx context.Context, cfg *config.ServiceSettings) (*sqs.Cons
awsconfig,
cfg.SqsUrl,
sqs.WithMaxMessages(10),
// We're setting a high visibility timeout to decrease the likelihood of a
// message being processed more than once.
//
// This is particularly relevant for the cases in which we receive a burst
// of traffic (e.g.: dozens of VAAs being emitted in the same minute), and
// also when a we have to retry fetching transaction metadata many times
// (due to finality delay, out-of-sync nodes, etc).
sqs.WithVisibilityTimeout(20*60),
sqs.WithVisibilityTimeout(4*60),
)
return consumer, err
}

View File

@ -2,38 +2,37 @@ package consumer
import (
"context"
"errors"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/queue"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
// Consumer consumer struct definition.
type Consumer struct {
consumeFunc queue.VAAConsumeFunc
rpcServiceProviderSettings *config.RpcProviderSettings
logger *zap.Logger
repository *Repository
workerPool *WorkerPool
consumeFunc queue.VAAConsumeFunc
rpcProviderSettings *config.RpcProviderSettings
logger *zap.Logger
repository *Repository
}
// New creates a new vaa consumer.
func New(
consumeFunc queue.VAAConsumeFunc,
rpcServiceProviderSettings *config.RpcProviderSettings,
rpcProviderSettings *config.RpcProviderSettings,
ctx context.Context,
logger *zap.Logger,
repository *Repository,
) *Consumer {
workerPool := NewWorkerPool(ctx, logger, rpcServiceProviderSettings, repository)
c := Consumer{
consumeFunc: consumeFunc,
rpcServiceProviderSettings: rpcServiceProviderSettings,
logger: logger,
repository: repository,
workerPool: workerPool,
consumeFunc: consumeFunc,
rpcProviderSettings: rpcProviderSettings,
logger: logger,
repository: repository,
}
return &c
@ -49,19 +48,76 @@ func (c *Consumer) producerLoop(ctx context.Context) {
ch := c.consumeFunc(ctx)
for msg := range ch {
c.logger.Debug("Received message, pushing to worker pool", zap.String("vaaId", msg.Data().ID))
// Send the VAA to the worker pool.
//
// The worker pool is responsible for calling `msg.Done()`
err := c.workerPool.Push(ctx, msg)
if err != nil {
c.logger.Warn("failed to push message into worker pool",
zap.String("vaaId", msg.Data().ID),
zap.Error(err),
)
msg.Failed()
}
c.logger.Debug("Received message", zap.String("vaaId", msg.Data().ID))
c.process(ctx, msg)
}
}
func (c *Consumer) process(ctx context.Context, msg queue.ConsumerMessage) {
event := msg.Data()
// Do not process messages from PythNet
if event.ChainID == sdk.ChainIDPythNet {
if !msg.IsExpired() {
c.logger.Debug("Deleting PythNet message", zap.String("vaaId", event.ID))
msg.Done()
} else {
c.logger.Debug("Skipping expired PythNet message", zap.String("vaaId", event.ID))
}
return
}
// Skip non-processed, expired messages
if msg.IsExpired() {
c.logger.Warn("Message expired - skipping",
zap.String("vaaId", event.ID),
zap.Bool("isExpired", msg.IsExpired()),
)
return
}
// Process the VAA
p := ProcessSourceTxParams{
VaaId: event.ID,
ChainId: event.ChainID,
Emitter: event.EmitterAddress,
Sequence: event.Sequence,
TxHash: event.TxHash,
Overwrite: false, // avoid processing the same transaction twice
}
err := ProcessSourceTx(ctx, c.logger, c.rpcProviderSettings, c.repository, &p)
// Log a message informing the processing status
if errors.Is(err, chains.ErrChainNotSupported) {
c.logger.Info("Skipping VAA - chain not supported",
zap.String("vaaId", event.ID),
)
} else if errors.Is(err, ErrAlreadyProcessed) {
c.logger.Warn("Message already processed - skipping",
zap.String("vaaId", event.ID),
)
} else if errors.Is(err, chains.ErrTransactionNotFound) {
c.logger.Warn("Transaction not found - will retry after SQS visibilityTimeout",
zap.String("vaaId", event.ID),
)
return
} else if err != nil {
c.logger.Error("Failed to process originTx - will retry after SQS visibilityTimeout",
zap.String("vaaId", event.ID),
zap.Error(err),
)
return
} else {
c.logger.Info("Transaction processed successfully",
zap.String("id", event.ID),
)
}
// Mark the message as done
//
// If the message is expired, it will be put back into the queue.
if !msg.IsExpired() {
msg.Done()
}
}

View File

@ -3,7 +3,7 @@ package consumer
import (
"context"
"errors"
"time"
"fmt"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
@ -12,11 +12,6 @@ import (
"go.uber.org/zap"
)
const (
maxAttempts = 1
retryDelay = 5 * time.Minute
)
var ErrAlreadyProcessed = errors.New("VAA was already processed")
// ProcessSourceTxParams is a struct that contains the parameters for the ProcessSourceTx method.
@ -43,66 +38,25 @@ func ProcessSourceTx(
params *ProcessSourceTxParams,
) error {
if !params.Overwrite {
// If the message has already been processed, skip it.
//
// Sometimes the SQS visibility timeout expires and the message is put back into the queue,
// even if the RPC nodes have been hit and data has been written to MongoDB.
// In those cases, when we fetch the message for the second time,
// we don't want to hit the RPC nodes again for performance reasons.
processed, err := repository.AlreadyProcessed(ctx, params.VaaId)
if err != nil {
return err
} else if err == nil && processed {
return ErrAlreadyProcessed
}
}
// Get transaction details from the emitter blockchain
//
// If the transaction is not found, will retry a few times before giving up.
var txStatus domain.SourceTxStatus
var txDetail *chains.TxDetail
var err error
for attempts := 1; attempts <= maxAttempts; attempts++ {
if !params.Overwrite {
// If the message has already been processed, skip it.
//
// Sometimes the SQS visibility timeout expires and the message is put back into the queue,
// even if the RPC nodes have been hit and data has been written to MongoDB.
// In those cases, when we fetch the message for the second time,
// we don't want to hit the RPC nodes again for performance reasons.
processed, err := repository.AlreadyProcessed(ctx, params.VaaId)
if err != nil {
return err
} else if err == nil && processed {
return ErrAlreadyProcessed
}
}
txDetail, err = chains.FetchTx(ctx, rpcServiceProviderSettings, params.ChainId, params.TxHash)
switch {
// If the transaction is not found, retry after a delay
case errors.Is(err, chains.ErrTransactionNotFound):
txStatus = domain.SourceTxStatusInternalError
logger.Warn("transaction not found, will retry after a delay",
zap.String("vaaId", params.VaaId),
zap.Duration("retryDelay", retryDelay),
zap.Int("attempts", attempts),
zap.Int("maxAttempts", maxAttempts),
)
time.Sleep(retryDelay)
continue
// If the chain ID is not supported, we're done.
case errors.Is(err, chains.ErrChainNotSupported):
return err
// If the context was cancelled, do not attempt to save the result on the database
case errors.Is(err, context.Canceled):
return err
// If there is an internal error, give up
case err != nil:
logger.Error("Failed to fetch source transaction details",
zap.String("vaaId", params.VaaId),
zap.Error(err),
)
txStatus = domain.SourceTxStatusInternalError
break
// Success
case err == nil:
txStatus = domain.SourceTxStatusConfirmed
break
}
txDetail, err := chains.FetchTx(ctx, rpcServiceProviderSettings, params.ChainId, params.TxHash)
if err != nil {
return fmt.Errorf("failed to process transaction: %w", err)
}
// Store source transaction details in the database
@ -110,7 +64,7 @@ func ProcessSourceTx(
VaaId: params.VaaId,
ChainId: params.ChainId,
TxDetail: txDetail,
TxStatus: txStatus,
TxStatus: domain.SourceTxStatusConfirmed,
}
return repository.UpsertDocument(ctx, &p)
}

View File

@ -1,162 +0,0 @@
package consumer
import (
"context"
"fmt"
"sync"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/queue"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
const numWorkers = 500
// WorkerPool is an abstraction to process VAAs concurrently.
type WorkerPool struct {
wg sync.WaitGroup
chInput chan queue.ConsumerMessage
ctx context.Context
logger *zap.Logger
rpcProviderSettings *config.RpcProviderSettings
repository *Repository
}
// NewWorkerPool creates a new worker pool.
func NewWorkerPool(
ctx context.Context,
logger *zap.Logger,
rpcProviderSettings *config.RpcProviderSettings,
repository *Repository,
) *WorkerPool {
w := WorkerPool{
chInput: make(chan queue.ConsumerMessage),
ctx: ctx,
logger: logger,
rpcProviderSettings: rpcProviderSettings,
repository: repository,
}
// Spawn worker goroutines
for i := 0; i < numWorkers; i++ {
w.wg.Add(1)
go w.consumerLoop()
}
return &w
}
// Push sends a new item to the worker pool.
//
// This function will block until either a worker is available or the context is cancelled.
func (w *WorkerPool) Push(ctx context.Context, msg queue.ConsumerMessage) error {
select {
case w.chInput <- msg:
return nil
case <-ctx.Done():
return fmt.Errorf("failed to push message into worker pool due to calcelled context: %w", ctx.Err())
}
}
// StopGracefully stops the worker pool gracefully.
//
// This function blocks until the consumer queue is empty.
func (w *WorkerPool) StopGracefully() {
// Close the producer channel.
// This will stop sending items to the workers.
// After all items are consumed, the workers will exit.
close(w.chInput)
w.chInput = nil
// Wait for all workers to finish gracefully
w.wg.Wait()
}
// consumerLoop is the main loop of a worker.
//
// It will consume items from the input channel until the channel is closed or the context is cancelled.
func (w *WorkerPool) consumerLoop() {
for {
select {
case msg, ok := <-w.chInput:
if !ok {
w.wg.Done()
return
}
w.process(msg)
case <-w.ctx.Done():
w.wg.Done()
return
}
}
}
// process consumes a single item from the input channel.
func (w *WorkerPool) process(msg queue.ConsumerMessage) {
event := msg.Data()
// Do not process messages from PythNet
if event.ChainID == sdk.ChainIDPythNet {
if !msg.IsExpired() {
w.logger.Debug("Deleting PythNet message", zap.String("vaaId", event.ID))
msg.Done()
} else {
w.logger.Debug("Skipping expired PythNet message", zap.String("vaaId", event.ID))
}
return
}
// Skip non-processed, expired messages
if msg.IsExpired() {
w.logger.Warn("Message expired - skipping",
zap.String("vaaId", event.ID),
zap.Bool("isExpired", msg.IsExpired()),
)
return
}
// Process the VAA
p := ProcessSourceTxParams{
VaaId: event.ID,
ChainId: event.ChainID,
Emitter: event.EmitterAddress,
Sequence: event.Sequence,
TxHash: event.TxHash,
Overwrite: false, // avoid processing the same transaction twice
}
err := ProcessSourceTx(w.ctx, w.logger, w.rpcProviderSettings, w.repository, &p)
// Log a message informing the processing status
if err == chains.ErrChainNotSupported {
w.logger.Info("Skipping VAA - chain not supported",
zap.String("vaaId", event.ID),
)
} else if err == ErrAlreadyProcessed {
w.logger.Warn("Message already processed - skipping",
zap.String("vaaId", event.ID),
)
} else if err != nil {
w.logger.Error("Failed to process originTx",
zap.String("vaaId", event.ID),
zap.Error(err),
)
} else {
w.logger.Info("Updated originTx in the database",
zap.String("id", event.ID),
)
}
// Mark the message as done
//
// If the message is expired, it will be put back into the queue.
if !msg.IsExpired() {
msg.Done()
}
}