Improve performance in the `tx-tracker` service (#519)

### Description

This pull request implements an improvement in the processing logic of the `tx-tracker` service to avoid processing a message more than once.
This commit is contained in:
agodnic 2023-07-11 16:31:45 -03:00 committed by GitHub
parent 93e1a72409
commit 0a854b590d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 99 additions and 72 deletions

View File

@ -693,10 +693,12 @@ func (r *Repository) findOriginTxFromVaa(ctx context.Context, q *GlobalTransacti
// populate the result and return
originTx := OriginTx{
Timestamp: &record.Timestamp,
TxHash: record.TxHash,
ChainID: record.EmitterChain,
Status: string(domain.SourceTxStatusConfirmed),
}
if record.EmitterChain != sdk.ChainIDSolana && record.EmitterChain != sdk.ChainIDAptos {
originTx.TxHash = record.TxHash
}
return &originTx, nil
}

View File

@ -14,7 +14,7 @@ ALGORAND_BASE_URL=https://mainnet-idx.algonode.cloud
ALGORAND_REQUESTS_PER_MINUTE=1
APTOS_BASE_URL=https://fullnode.mainnet.aptoslabs.com/v1
APTOS_REQUESTS_PER_MINUTE=2
APTOS_REQUESTS_PER_MINUTE=1
ARBITRUM_BASE_URL=https://rpc.ankr.com/arbitrum
ARBITRUM_REQUESTS_PER_MINUTE=1
@ -58,13 +58,13 @@ POLYGON_BASE_URL=https://rpc.ankr.com/polygon
POLYGON_REQUESTS_PER_MINUTE=2
SOLANA_BASE_URL=https://api.mainnet-beta.solana.com
SOLANA_REQUESTS_PER_MINUTE=2
SOLANA_REQUESTS_PER_MINUTE=4
SUI_BASE_URL=https://fullnode.mainnet.sui.io:443
SUI_REQUESTS_PER_MINUTE=1
TERRA_BASE_URL=https://columbus-fcd.terra.dev
TERRA_REQUESTS_PER_MINUTE=2
TERRA_REQUESTS_PER_MINUTE=4
TERRA2_BASE_URL=https://phoenix-lcd.terra.dev
TERRA2_REQUESTS_PER_MINUTE=1

View File

@ -62,13 +62,13 @@ POLYGON_BASE_URL=https://rpc.ankr.com/polygon
POLYGON_REQUESTS_PER_MINUTE=1
SOLANA_BASE_URL=https://api.mainnet-beta.solana.com
SOLANA_REQUESTS_PER_MINUTE=1
SOLANA_REQUESTS_PER_MINUTE=2
SUI_BASE_URL=https://fullnode.mainnet.sui.io:443
SUI_REQUESTS_PER_MINUTE=1
TERRA_BASE_URL=https://columbus-fcd.terra.dev
TERRA_REQUESTS_PER_MINUTE=1
TERRA_REQUESTS_PER_MINUTE=2
TERRA2_BASE_URL=https://phoenix-lcd.terra.dev
TERRA2_REQUESTS_PER_MINUTE=1

View File

@ -18,22 +18,22 @@ ALGORAND_BASE_URL=https://mainnet-idx.algonode.cloud
ALGORAND_REQUESTS_PER_MINUTE=2
APTOS_BASE_URL=https://fullnode.mainnet.aptoslabs.com/v1
APTOS_REQUESTS_PER_MINUTE=2
APTOS_REQUESTS_PER_MINUTE=12
ARBITRUM_BASE_URL=https://rpc.ankr.com/arbitrum
ARBITRUM_REQUESTS_PER_MINUTE=2
AVALANCHE_BASE_URL=https://api.avax.network/ext/bc/C/rpc
AVALANCHE_REQUESTS_PER_MINUTE=2
AVALANCHE_REQUESTS_PER_MINUTE=8
BSC_BASE_URL=https://bsc-dataseed2.defibit.io
BSC_REQUESTS_PER_MINUTE=2
BSC_REQUESTS_PER_MINUTE=8
CELO_BASE_URL=https://forno.celo.org
CELO_REQUESTS_PER_MINUTE=2
ETHEREUM_BASE_URL=https://rpc.ankr.com/eth
ETHEREUM_REQUESTS_PER_MINUTE=2
ETHEREUM_REQUESTS_PER_MINUTE=8
FANTOM_BASE_URL=https://rpc.ankr.com/fantom
FANTOM_REQUESTS_PER_MINUTE=2
@ -59,10 +59,10 @@ OPTIMISM_BASE_URL=https://rpc.ankr.com/optimism
OPTIMISM_REQUESTS_PER_MINUTE=2
POLYGON_BASE_URL=https://rpc.ankr.com/polygon
POLYGON_REQUESTS_PER_MINUTE=2
POLYGON_REQUESTS_PER_MINUTE=8
SOLANA_BASE_URL=https://api.mainnet-beta.solana.com
SOLANA_REQUESTS_PER_MINUTE=2
SOLANA_REQUESTS_PER_MINUTE=12
SUI_BASE_URL=https://fullnode.mainnet.sui.io:443
SUI_REQUESTS_PER_MINUTE=2

View File

@ -42,7 +42,6 @@ func fetchAlgorandTx(
txDetail := TxDetail{
NativeTxHash: response.Transaction.ID,
From: response.Transaction.Sender,
Timestamp: time.Unix(int64(response.Transaction.RoundTime), 0),
}
return &txDetail, nil
}

View File

@ -85,7 +85,6 @@ func fetchAptosTx(
TxDetail := TxDetail{
NativeTxHash: tx.Hash,
From: tx.Sender,
Timestamp: time.UnixMicro(int64(tx.Timestamp)),
}
return &TxDetail, nil
}

View File

@ -65,16 +65,9 @@ func fetchCosmosTx(
return nil, fmt.Errorf("failed to find sender address in cosmos tx response")
}
// Parse the timestamp
timestamp, err := time.Parse("2006-01-02T15:04:05Z", response.TxResponse.Timestamp)
if err != nil {
return nil, fmt.Errorf("failed to parse tx timestamp from cosmos tx response: %w", err)
}
// Build the result object and return
TxDetail := &TxDetail{
From: sender,
Timestamp: timestamp,
NativeTxHash: response.TxResponse.TxHash,
}
return TxDetail, nil

View File

@ -45,29 +45,9 @@ func fetchEthTx(
}
}
// query block data
var blkReply ethGetBlockByHashResponse
{
blkParams := []interface{}{
txReply.BlockHash, // tx hash
false, // include transactions?
}
err = client.CallContext(ctx, rateLimiter, &blkReply, "eth_getBlockByHash", blkParams...)
if err != nil {
return nil, fmt.Errorf("failed to get block by hash: %w", err)
}
}
// parse transaction timestamp
timestamp, err := timestampFromHex(blkReply.Timestamp)
if err != nil {
return nil, fmt.Errorf("failed to parse block timestamp: %w", err)
}
// build results and return
txDetail := &TxDetail{
From: strings.ToLower(txReply.From),
Timestamp: timestamp,
NativeTxHash: fmt.Sprintf("0x%s", strings.ToLower(txHash)),
}
return txDetail, nil

View File

@ -100,7 +100,6 @@ func fetchSolanaTx(
// populate the response object
txDetail := TxDetail{
Timestamp: time.Unix(response.BlockTime, 0).UTC(),
NativeTxHash: sigs[0].Signature,
}

View File

@ -61,7 +61,6 @@ func fetchSuiTx(
txDetail := TxDetail{
NativeTxHash: reply.Digest,
From: reply.Transaction.Data.Sender,
Timestamp: time.UnixMilli(reply.TimestampMs),
}
return &txDetail, nil
}

View File

@ -26,8 +26,6 @@ var (
type TxDetail struct {
// From is the address that signed the transaction, encoded in the chain's native format.
From string
// Timestamp indicates the time at which the transaction was confirmed.
Timestamp time.Time
// NativeTxHash contains the transaction hash, encoded in the chain's native format.
NativeTxHash string
}

View File

@ -292,11 +292,12 @@ func consume(ctx context.Context, params *consumerParams) {
// 2. Persisting source tx details in the database.
v := globalTx.Vaas[0]
p := consumer.ProcessSourceTxParams{
VaaId: v.ID,
ChainId: v.EmitterChain,
Emitter: v.EmitterAddr,
Sequence: v.Sequence,
TxHash: *v.TxHash,
VaaId: v.ID,
ChainId: v.EmitterChain,
Emitter: v.EmitterAddr,
Sequence: v.Sequence,
TxHash: *v.TxHash,
Overwrite: true, // Overwrite old contents
}
err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcProviderSettings, params.repository, &p)
if err != nil {

View File

@ -2,18 +2,12 @@ package consumer
import (
"context"
"time"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/queue"
"go.uber.org/zap"
)
const (
maxAttempts = 5
retryDelay = 60 * time.Second
)
// Consumer consumer struct definition.
type Consumer struct {
consumeFunc queue.VAAConsumeFunc
@ -56,6 +50,8 @@ func (c *Consumer) producerLoop(ctx context.Context) {
for msg := range ch {
c.logger.Debug("Received message, pushing to worker pool", zap.String("vaaId", msg.Data().ID))
// Send the VAA to the worker pool.
//
// The worker pool is responsible for calling `msg.Done()`

View File

@ -12,6 +12,13 @@ import (
"go.uber.org/zap"
)
const (
maxAttempts = 1
retryDelay = 5 * time.Minute
)
var ErrAlreadyProcessed = errors.New("VAA was already processed")
// ProcessSourceTxParams is a struct that contains the parameters for the ProcessSourceTx method.
type ProcessSourceTxParams struct {
ChainId sdk.ChainID
@ -19,6 +26,13 @@ type ProcessSourceTxParams struct {
Emitter string
Sequence string
TxHash string
// Overwrite indicates whether to reprocess a VAA that has already been processed.
//
// In the context of backfilling, sometimes you want to overwrite old data (e.g.: because
// the schema changed).
// In the context of the service, you usually don't want to overwrite existing data
// to avoid processing the same VAA twice, which would result in performance degradation.
Overwrite bool
}
func ProcessSourceTx(
@ -37,6 +51,21 @@ func ProcessSourceTx(
var err error
for attempts := 1; attempts <= maxAttempts; attempts++ {
if !params.Overwrite {
// If the message has already been processed, skip it.
//
// Sometimes the SQS visibility timeout expires and the message is put back into the queue,
// even if the RPC nodes have been hit and data has been written to MongoDB.
// In those cases, when we fetch the message for the second time,
// we don't want to hit the RPC nodes again for performance reasons.
processed, err := repository.AlreadyProcessed(ctx, params.VaaId)
if err != nil {
return err
} else if err == nil && processed {
return ErrAlreadyProcessed
}
}
txDetail, err = chains.FetchTx(ctx, rpcServiceProviderSettings, params.ChainId, params.TxHash)
switch {

View File

@ -77,6 +77,24 @@ func (r *Repository) UpsertDocument(ctx context.Context, params *UpsertDocumentP
return nil
}
// AlreadyProcessed returns true if the given VAA ID has already been processed.
func (r *Repository) AlreadyProcessed(ctx context.Context, vaaId string) (bool, error) {
result := r.
globalTransactions.
FindOne(ctx, bson.D{{"_id", vaaId}})
var tx GlobalTransaction
err := result.Decode(&tx)
if err == mongo.ErrNoDocuments {
return false, nil
} else if err != nil {
return false, fmt.Errorf("failed to decode already processed VAA id: %w", err)
} else {
return true, nil
}
}
// CountDocumentsByTimeRange returns the number of documents that match the given time range.
func (r *Repository) CountDocumentsByTimeRange(
ctx context.Context,

View File

@ -102,29 +102,34 @@ func (w *WorkerPool) process(msg queue.ConsumerMessage) {
event := msg.Data()
// Check if the message is expired
if msg.IsExpired() {
w.logger.Warn("Message with VAA expired",
zap.String("vaaId", event.ID),
zap.Bool("isExpired", msg.IsExpired()),
)
msg.Failed()
// Do not process messages from PythNet
if event.ChainID == sdk.ChainIDPythNet {
if !msg.IsExpired() {
w.logger.Debug("Deleting PythNet message", zap.String("vaaId", event.ID))
msg.Done()
} else {
w.logger.Debug("Skipping expired PythNet message", zap.String("vaaId", event.ID))
}
return
}
// Do not process messages from PythNet
if event.ChainID == sdk.ChainIDPythNet {
msg.Done()
// Skip non-processed, expired messages
if msg.IsExpired() {
w.logger.Warn("Message expired - skipping",
zap.String("vaaId", event.ID),
zap.Bool("isExpired", msg.IsExpired()),
)
return
}
// Process the VAA
p := ProcessSourceTxParams{
VaaId: event.ID,
ChainId: event.ChainID,
Emitter: event.EmitterAddress,
Sequence: event.Sequence,
TxHash: event.TxHash,
VaaId: event.ID,
ChainId: event.ChainID,
Emitter: event.EmitterAddress,
Sequence: event.Sequence,
TxHash: event.TxHash,
Overwrite: false, // avoid processing the same transaction twice
}
err := ProcessSourceTx(w.ctx, w.logger, w.rpcProviderSettings, w.repository, &p)
@ -133,6 +138,10 @@ func (w *WorkerPool) process(msg queue.ConsumerMessage) {
w.logger.Info("Skipping VAA - chain not supported",
zap.String("vaaId", event.ID),
)
} else if err == ErrAlreadyProcessed {
w.logger.Warn("Message already processed - skipping",
zap.String("vaaId", event.ID),
)
} else if err != nil {
w.logger.Error("Failed to process originTx",
zap.String("vaaId", event.ID),
@ -144,5 +153,10 @@ func (w *WorkerPool) process(msg queue.ConsumerMessage) {
)
}
msg.Done()
// Mark the message as done
//
// If the message is expired, it will be put back into the queue.
if !msg.IsExpired() {
msg.Done()
}
}