diff --git a/api/handlers/transactions/repository.go b/api/handlers/transactions/repository.go index 70bd1ab9..dba77258 100644 --- a/api/handlers/transactions/repository.go +++ b/api/handlers/transactions/repository.go @@ -693,10 +693,12 @@ func (r *Repository) findOriginTxFromVaa(ctx context.Context, q *GlobalTransacti // populate the result and return originTx := OriginTx{ Timestamp: &record.Timestamp, - TxHash: record.TxHash, ChainID: record.EmitterChain, Status: string(domain.SourceTxStatusConfirmed), } + if record.EmitterChain != sdk.ChainIDSolana && record.EmitterChain != sdk.ChainIDAptos { + originTx.TxHash = record.TxHash + } return &originTx, nil } diff --git a/deploy/tx-tracker-backfiller/env/production.env b/deploy/tx-tracker-backfiller/env/production.env index f523e301..7d179b3f 100644 --- a/deploy/tx-tracker-backfiller/env/production.env +++ b/deploy/tx-tracker-backfiller/env/production.env @@ -14,7 +14,7 @@ ALGORAND_BASE_URL=https://mainnet-idx.algonode.cloud ALGORAND_REQUESTS_PER_MINUTE=1 APTOS_BASE_URL=https://fullnode.mainnet.aptoslabs.com/v1 -APTOS_REQUESTS_PER_MINUTE=2 +APTOS_REQUESTS_PER_MINUTE=1 ARBITRUM_BASE_URL=https://rpc.ankr.com/arbitrum ARBITRUM_REQUESTS_PER_MINUTE=1 @@ -58,13 +58,13 @@ POLYGON_BASE_URL=https://rpc.ankr.com/polygon POLYGON_REQUESTS_PER_MINUTE=2 SOLANA_BASE_URL=https://api.mainnet-beta.solana.com -SOLANA_REQUESTS_PER_MINUTE=2 +SOLANA_REQUESTS_PER_MINUTE=4 SUI_BASE_URL=https://fullnode.mainnet.sui.io:443 SUI_REQUESTS_PER_MINUTE=1 TERRA_BASE_URL=https://columbus-fcd.terra.dev -TERRA_REQUESTS_PER_MINUTE=2 +TERRA_REQUESTS_PER_MINUTE=4 TERRA2_BASE_URL=https://phoenix-lcd.terra.dev TERRA2_REQUESTS_PER_MINUTE=1 diff --git a/deploy/tx-tracker-backfiller/env/staging.env b/deploy/tx-tracker-backfiller/env/staging.env index 75266537..d973194c 100644 --- a/deploy/tx-tracker-backfiller/env/staging.env +++ b/deploy/tx-tracker-backfiller/env/staging.env @@ -62,13 +62,13 @@ POLYGON_BASE_URL=https://rpc.ankr.com/polygon POLYGON_REQUESTS_PER_MINUTE=1 SOLANA_BASE_URL=https://api.mainnet-beta.solana.com -SOLANA_REQUESTS_PER_MINUTE=1 +SOLANA_REQUESTS_PER_MINUTE=2 SUI_BASE_URL=https://fullnode.mainnet.sui.io:443 SUI_REQUESTS_PER_MINUTE=1 TERRA_BASE_URL=https://columbus-fcd.terra.dev -TERRA_REQUESTS_PER_MINUTE=1 +TERRA_REQUESTS_PER_MINUTE=2 TERRA2_BASE_URL=https://phoenix-lcd.terra.dev TERRA2_REQUESTS_PER_MINUTE=1 diff --git a/deploy/tx-tracker/env/staging.env b/deploy/tx-tracker/env/staging.env index e2a4f90f..85850c36 100644 --- a/deploy/tx-tracker/env/staging.env +++ b/deploy/tx-tracker/env/staging.env @@ -18,22 +18,22 @@ ALGORAND_BASE_URL=https://mainnet-idx.algonode.cloud ALGORAND_REQUESTS_PER_MINUTE=2 APTOS_BASE_URL=https://fullnode.mainnet.aptoslabs.com/v1 -APTOS_REQUESTS_PER_MINUTE=2 +APTOS_REQUESTS_PER_MINUTE=12 ARBITRUM_BASE_URL=https://rpc.ankr.com/arbitrum ARBITRUM_REQUESTS_PER_MINUTE=2 AVALANCHE_BASE_URL=https://api.avax.network/ext/bc/C/rpc -AVALANCHE_REQUESTS_PER_MINUTE=2 +AVALANCHE_REQUESTS_PER_MINUTE=8 BSC_BASE_URL=https://bsc-dataseed2.defibit.io -BSC_REQUESTS_PER_MINUTE=2 +BSC_REQUESTS_PER_MINUTE=8 CELO_BASE_URL=https://forno.celo.org CELO_REQUESTS_PER_MINUTE=2 ETHEREUM_BASE_URL=https://rpc.ankr.com/eth -ETHEREUM_REQUESTS_PER_MINUTE=2 +ETHEREUM_REQUESTS_PER_MINUTE=8 FANTOM_BASE_URL=https://rpc.ankr.com/fantom FANTOM_REQUESTS_PER_MINUTE=2 @@ -59,10 +59,10 @@ OPTIMISM_BASE_URL=https://rpc.ankr.com/optimism OPTIMISM_REQUESTS_PER_MINUTE=2 POLYGON_BASE_URL=https://rpc.ankr.com/polygon -POLYGON_REQUESTS_PER_MINUTE=2 +POLYGON_REQUESTS_PER_MINUTE=8 SOLANA_BASE_URL=https://api.mainnet-beta.solana.com -SOLANA_REQUESTS_PER_MINUTE=2 +SOLANA_REQUESTS_PER_MINUTE=12 SUI_BASE_URL=https://fullnode.mainnet.sui.io:443 SUI_REQUESTS_PER_MINUTE=2 diff --git a/tx-tracker/chains/api_algorand.go b/tx-tracker/chains/api_algorand.go index 2de62384..d4f651d3 100644 --- a/tx-tracker/chains/api_algorand.go +++ b/tx-tracker/chains/api_algorand.go @@ -42,7 +42,6 @@ func fetchAlgorandTx( txDetail := TxDetail{ NativeTxHash: response.Transaction.ID, From: response.Transaction.Sender, - Timestamp: time.Unix(int64(response.Transaction.RoundTime), 0), } return &txDetail, nil } diff --git a/tx-tracker/chains/api_aptos.go b/tx-tracker/chains/api_aptos.go index 7db9c2c6..3dce4530 100644 --- a/tx-tracker/chains/api_aptos.go +++ b/tx-tracker/chains/api_aptos.go @@ -85,7 +85,6 @@ func fetchAptosTx( TxDetail := TxDetail{ NativeTxHash: tx.Hash, From: tx.Sender, - Timestamp: time.UnixMicro(int64(tx.Timestamp)), } return &TxDetail, nil } diff --git a/tx-tracker/chains/api_cosmos.go b/tx-tracker/chains/api_cosmos.go index 18db92bb..a4c136e9 100644 --- a/tx-tracker/chains/api_cosmos.go +++ b/tx-tracker/chains/api_cosmos.go @@ -65,16 +65,9 @@ func fetchCosmosTx( return nil, fmt.Errorf("failed to find sender address in cosmos tx response") } - // Parse the timestamp - timestamp, err := time.Parse("2006-01-02T15:04:05Z", response.TxResponse.Timestamp) - if err != nil { - return nil, fmt.Errorf("failed to parse tx timestamp from cosmos tx response: %w", err) - } - // Build the result object and return TxDetail := &TxDetail{ From: sender, - Timestamp: timestamp, NativeTxHash: response.TxResponse.TxHash, } return TxDetail, nil diff --git a/tx-tracker/chains/api_evm.go b/tx-tracker/chains/api_evm.go index 3d9b72e1..dc6dacea 100644 --- a/tx-tracker/chains/api_evm.go +++ b/tx-tracker/chains/api_evm.go @@ -45,29 +45,9 @@ func fetchEthTx( } } - // query block data - var blkReply ethGetBlockByHashResponse - { - blkParams := []interface{}{ - txReply.BlockHash, // tx hash - false, // include transactions? - } - err = client.CallContext(ctx, rateLimiter, &blkReply, "eth_getBlockByHash", blkParams...) - if err != nil { - return nil, fmt.Errorf("failed to get block by hash: %w", err) - } - } - - // parse transaction timestamp - timestamp, err := timestampFromHex(blkReply.Timestamp) - if err != nil { - return nil, fmt.Errorf("failed to parse block timestamp: %w", err) - } - // build results and return txDetail := &TxDetail{ From: strings.ToLower(txReply.From), - Timestamp: timestamp, NativeTxHash: fmt.Sprintf("0x%s", strings.ToLower(txHash)), } return txDetail, nil diff --git a/tx-tracker/chains/api_solana.go b/tx-tracker/chains/api_solana.go index fecebf26..38368f69 100644 --- a/tx-tracker/chains/api_solana.go +++ b/tx-tracker/chains/api_solana.go @@ -100,7 +100,6 @@ func fetchSolanaTx( // populate the response object txDetail := TxDetail{ - Timestamp: time.Unix(response.BlockTime, 0).UTC(), NativeTxHash: sigs[0].Signature, } diff --git a/tx-tracker/chains/api_sui.go b/tx-tracker/chains/api_sui.go index 968fc13e..04976c9b 100644 --- a/tx-tracker/chains/api_sui.go +++ b/tx-tracker/chains/api_sui.go @@ -61,7 +61,6 @@ func fetchSuiTx( txDetail := TxDetail{ NativeTxHash: reply.Digest, From: reply.Transaction.Data.Sender, - Timestamp: time.UnixMilli(reply.TimestampMs), } return &txDetail, nil } diff --git a/tx-tracker/chains/chains.go b/tx-tracker/chains/chains.go index c3486fb2..8d32c550 100644 --- a/tx-tracker/chains/chains.go +++ b/tx-tracker/chains/chains.go @@ -26,8 +26,6 @@ var ( type TxDetail struct { // From is the address that signed the transaction, encoded in the chain's native format. From string - // Timestamp indicates the time at which the transaction was confirmed. - Timestamp time.Time // NativeTxHash contains the transaction hash, encoded in the chain's native format. NativeTxHash string } diff --git a/tx-tracker/cmd/backfiller/main.go b/tx-tracker/cmd/backfiller/main.go index 1eb3b561..9825a9af 100644 --- a/tx-tracker/cmd/backfiller/main.go +++ b/tx-tracker/cmd/backfiller/main.go @@ -292,11 +292,12 @@ func consume(ctx context.Context, params *consumerParams) { // 2. Persisting source tx details in the database. v := globalTx.Vaas[0] p := consumer.ProcessSourceTxParams{ - VaaId: v.ID, - ChainId: v.EmitterChain, - Emitter: v.EmitterAddr, - Sequence: v.Sequence, - TxHash: *v.TxHash, + VaaId: v.ID, + ChainId: v.EmitterChain, + Emitter: v.EmitterAddr, + Sequence: v.Sequence, + TxHash: *v.TxHash, + Overwrite: true, // Overwrite old contents } err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcProviderSettings, params.repository, &p) if err != nil { diff --git a/tx-tracker/consumer/consumer.go b/tx-tracker/consumer/consumer.go index 30815b66..a2f6f758 100644 --- a/tx-tracker/consumer/consumer.go +++ b/tx-tracker/consumer/consumer.go @@ -2,18 +2,12 @@ package consumer import ( "context" - "time" "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" "github.com/wormhole-foundation/wormhole-explorer/txtracker/queue" "go.uber.org/zap" ) -const ( - maxAttempts = 5 - retryDelay = 60 * time.Second -) - // Consumer consumer struct definition. type Consumer struct { consumeFunc queue.VAAConsumeFunc @@ -56,6 +50,8 @@ func (c *Consumer) producerLoop(ctx context.Context) { for msg := range ch { + c.logger.Debug("Received message, pushing to worker pool", zap.String("vaaId", msg.Data().ID)) + // Send the VAA to the worker pool. // // The worker pool is responsible for calling `msg.Done()` diff --git a/tx-tracker/consumer/processor.go b/tx-tracker/consumer/processor.go index 13337107..6ff89537 100644 --- a/tx-tracker/consumer/processor.go +++ b/tx-tracker/consumer/processor.go @@ -12,6 +12,13 @@ import ( "go.uber.org/zap" ) +const ( + maxAttempts = 1 + retryDelay = 5 * time.Minute +) + +var ErrAlreadyProcessed = errors.New("VAA was already processed") + // ProcessSourceTxParams is a struct that contains the parameters for the ProcessSourceTx method. type ProcessSourceTxParams struct { ChainId sdk.ChainID @@ -19,6 +26,13 @@ type ProcessSourceTxParams struct { Emitter string Sequence string TxHash string + // Overwrite indicates whether to reprocess a VAA that has already been processed. + // + // In the context of backfilling, sometimes you want to overwrite old data (e.g.: because + // the schema changed). + // In the context of the service, you usually don't want to overwrite existing data + // to avoid processing the same VAA twice, which would result in performance degradation. + Overwrite bool } func ProcessSourceTx( @@ -37,6 +51,21 @@ func ProcessSourceTx( var err error for attempts := 1; attempts <= maxAttempts; attempts++ { + if !params.Overwrite { + // If the message has already been processed, skip it. + // + // Sometimes the SQS visibility timeout expires and the message is put back into the queue, + // even if the RPC nodes have been hit and data has been written to MongoDB. + // In those cases, when we fetch the message for the second time, + // we don't want to hit the RPC nodes again for performance reasons. + processed, err := repository.AlreadyProcessed(ctx, params.VaaId) + if err != nil { + return err + } else if err == nil && processed { + return ErrAlreadyProcessed + } + } + txDetail, err = chains.FetchTx(ctx, rpcServiceProviderSettings, params.ChainId, params.TxHash) switch { diff --git a/tx-tracker/consumer/repository.go b/tx-tracker/consumer/repository.go index 388beb90..17ba8499 100644 --- a/tx-tracker/consumer/repository.go +++ b/tx-tracker/consumer/repository.go @@ -77,6 +77,24 @@ func (r *Repository) UpsertDocument(ctx context.Context, params *UpsertDocumentP return nil } +// AlreadyProcessed returns true if the given VAA ID has already been processed. +func (r *Repository) AlreadyProcessed(ctx context.Context, vaaId string) (bool, error) { + + result := r. + globalTransactions. + FindOne(ctx, bson.D{{"_id", vaaId}}) + + var tx GlobalTransaction + err := result.Decode(&tx) + if err == mongo.ErrNoDocuments { + return false, nil + } else if err != nil { + return false, fmt.Errorf("failed to decode already processed VAA id: %w", err) + } else { + return true, nil + } +} + // CountDocumentsByTimeRange returns the number of documents that match the given time range. func (r *Repository) CountDocumentsByTimeRange( ctx context.Context, diff --git a/tx-tracker/consumer/workerpool.go b/tx-tracker/consumer/workerpool.go index 6e09e695..acad425a 100644 --- a/tx-tracker/consumer/workerpool.go +++ b/tx-tracker/consumer/workerpool.go @@ -102,29 +102,34 @@ func (w *WorkerPool) process(msg queue.ConsumerMessage) { event := msg.Data() - // Check if the message is expired - if msg.IsExpired() { - w.logger.Warn("Message with VAA expired", - zap.String("vaaId", event.ID), - zap.Bool("isExpired", msg.IsExpired()), - ) - msg.Failed() + // Do not process messages from PythNet + if event.ChainID == sdk.ChainIDPythNet { + if !msg.IsExpired() { + w.logger.Debug("Deleting PythNet message", zap.String("vaaId", event.ID)) + msg.Done() + } else { + w.logger.Debug("Skipping expired PythNet message", zap.String("vaaId", event.ID)) + } return } - // Do not process messages from PythNet - if event.ChainID == sdk.ChainIDPythNet { - msg.Done() + // Skip non-processed, expired messages + if msg.IsExpired() { + w.logger.Warn("Message expired - skipping", + zap.String("vaaId", event.ID), + zap.Bool("isExpired", msg.IsExpired()), + ) return } // Process the VAA p := ProcessSourceTxParams{ - VaaId: event.ID, - ChainId: event.ChainID, - Emitter: event.EmitterAddress, - Sequence: event.Sequence, - TxHash: event.TxHash, + VaaId: event.ID, + ChainId: event.ChainID, + Emitter: event.EmitterAddress, + Sequence: event.Sequence, + TxHash: event.TxHash, + Overwrite: false, // avoid processing the same transaction twice } err := ProcessSourceTx(w.ctx, w.logger, w.rpcProviderSettings, w.repository, &p) @@ -133,6 +138,10 @@ func (w *WorkerPool) process(msg queue.ConsumerMessage) { w.logger.Info("Skipping VAA - chain not supported", zap.String("vaaId", event.ID), ) + } else if err == ErrAlreadyProcessed { + w.logger.Warn("Message already processed - skipping", + zap.String("vaaId", event.ID), + ) } else if err != nil { w.logger.Error("Failed to process originTx", zap.String("vaaId", event.ID), @@ -144,5 +153,10 @@ func (w *WorkerPool) process(msg queue.ConsumerMessage) { ) } - msg.Done() + // Mark the message as done + // + // If the message is expired, it will be put back into the queue. + if !msg.IsExpired() { + msg.Done() + } }