[tx-tracker] Fix issues related to rate limits (#213)

### Summary

The tx-tracker service was exceeding node RPC rate limits, which caused it to fail.

This pull request changes the APIs upon which the service depends to avoid the same issue in the future.

Also, the backfiller job has been updated accordingly.

Tracking issue: https://github.com/wormhole-foundation/wormhole-explorer/issues/205
This commit is contained in:
agodnic 2023-03-30 12:25:16 -03:00 committed by GitHub
parent d0cf9cc04b
commit 2da5719d49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 206 additions and 33 deletions

View File

@ -10,7 +10,7 @@ spec:
labels:
app: {{ .NAME }}
spec:
restartPolicy: Always
restartPolicy: Never
terminationGracePeriodSeconds: 40
containers:
- name: {{ .NAME }}
@ -20,7 +20,7 @@ spec:
- name: ENV
value: "PRODUCTION"
- name: LOG_LEVEL
value: "INFO"
value: "DEBUG"
- name: MONGODB_URI
valueFrom:
secretKeyRef:
@ -40,23 +40,65 @@ spec:
- name: ANKR_BASE_URL
value: {{ .ANKR_BASE_URL }}
- name: ANKR_REQUESTS_PER_MINUTE
value: {{ .ANKR_REQUESTS_PER_MINUTE }}
value: "{{ .ANKR_REQUESTS_PER_MINUTE }}"
- name: ARBITRUM_API_KEY
value: {{ .ARBITRUM_API_KEY }}
- name: ARBITRUM_BASE_URL
value: {{ .ARBITRUM_BASE_URL }}
- name: ARBITRUM_REQUESTS_PER_MINUTE
value: "{{ .ARBITRUM_REQUESTS_PER_MINUTE }}"
- name: AVALANCHE_API_KEY
value: {{ .AVALANCHE_API_KEY }}
- name: AVALANCHE_BASE_URL
value: {{ .AVALANCHE_BASE_URL }}
- name: AVALANCHE_REQUESTS_PER_MINUTE
value: "{{ .AVALANCHE_REQUESTS_PER_MINUTE }}"
- name: BSC_API_KEY
value: {{ .BSC_API_KEY }}
- name: BSC_BASE_URL
value: {{ .BSC_BASE_URL }}
- name: BSC_REQUESTS_PER_MINUTE
value: "{{ .BSC_REQUESTS_PER_MINUTE }}"
- name: CELO_API_KEY
value: {{ .CELO_API_KEY }}
- name: CELO_BASE_URL
value: {{ .CELO_BASE_URL }}
- name: CELO_REQUESTS_PER_MINUTE
value: "{{ .CELO_REQUESTS_PER_MINUTE }}"
- name: ETH_API_KEY
value: {{ .ETH_API_KEY }}
- name: ETH_BASE_URL
value: {{ .ETH_BASE_URL }}
- name: ETH_REQUESTS_PER_MINUTE
value: "{{ .ETH_REQUESTS_PER_MINUTE }}"
- name: FANTOM_API_KEY
value: {{ .FANTOM_API_KEY }}
- name: FANTOM_BASE_URL
value: {{ .FANTOM_BASE_URL }}
- name: FANTOM_REQUESTS_PER_MINUTE
value: "{{ .FANTOM_REQUESTS_PER_MINUTE }}"
- name: OPTIMISM_API_KEY
value: {{ .OPTIMISM_API_KEY }}
- name: OPTIMISM_BASE_URL
value: {{ .OPTIMISM_BASE_URL }}
- name: OPTIMISM_REQUESTS_PER_MINUTE
value: "{{ .OPTIMISM_REQUESTS_PER_MINUTE }}"
- name: POLYGON_API_KEY
value: {{ .POLYGON_API_KEY }}
- name: POLYGON_BASE_URL
value: {{ .POLYGON_BASE_URL }}
- name: POLYGON_REQUESTS_PER_MINUTE
value: "{{ .POLYGON_REQUESTS_PER_MINUTE }}"
- name: SOLANA_BASE_URL
value: {{ .SOLANA_BASE_URL }}
- name: SOLANA_REQUESTS_PER_MINUTE
value: {{ .SOLANA_REQUESTS_PER_MINUTE }}
value: "{{ .SOLANA_REQUESTS_PER_MINUTE }}"
- name: TERRA_BASE_URL
value: {{ .TERRA_BASE_URL }}
- name: TERRA_REQUESTS_PER_MINUTE
value: {{ .TERRA_REQUESTS_PER_MINUTE }}
value: "{{ .TERRA_REQUESTS_PER_MINUTE }}"
- name: NUM_WORKERS
value: "1"
value: "20"
- name: BULK_SIZE
value: "500"
resources:

View File

@ -77,12 +77,54 @@ spec:
value: {{ .ANKR_BASE_URL }}
- name: ANKR_REQUESTS_PER_MINUTE
value: "{{ .ANKR_REQUESTS_PER_MINUTE }}"
- name: ARBITRUM_API_KEY
value: {{ .ARBITRUM_API_KEY }}
- name: ARBITRUM_BASE_URL
value: {{ .ARBITRUM_BASE_URL }}
- name: ARBITRUM_REQUESTS_PER_MINUTE
value: "{{ .ARBITRUM_REQUESTS_PER_MINUTE }}"
- name: AVALANCHE_API_KEY
value: {{ .AVALANCHE_API_KEY }}
- name: AVALANCHE_BASE_URL
value: {{ .AVALANCHE_BASE_URL }}
- name: AVALANCHE_REQUESTS_PER_MINUTE
value: "{{ .AVALANCHE_REQUESTS_PER_MINUTE }}"
- name: BSC_API_KEY
value: {{ .BSC_API_KEY }}
- name: BSC_BASE_URL
value: {{ .BSC_BASE_URL }}
- name: BSC_REQUESTS_PER_MINUTE
value: "{{ .BSC_REQUESTS_PER_MINUTE }}"
- name: CELO_API_KEY
value: {{ .CELO_API_KEY }}
- name: CELO_BASE_URL
value: {{ .CELO_BASE_URL }}
- name: CELO_REQUESTS_PER_MINUTE
value: "{{ .CELO_REQUESTS_PER_MINUTE }}"
- name: ETH_API_KEY
value: {{ .ETH_API_KEY }}
- name: ETH_BASE_URL
value: {{ .ETH_BASE_URL }}
- name: ETH_REQUESTS_PER_MINUTE
value: "{{ .ETH_REQUESTS_PER_MINUTE }}"
- name: FANTOM_API_KEY
value: {{ .FANTOM_API_KEY }}
- name: FANTOM_BASE_URL
value: {{ .FANTOM_BASE_URL }}
- name: FANTOM_REQUESTS_PER_MINUTE
value: "{{ .FANTOM_REQUESTS_PER_MINUTE }}"
- name: OPTIMISM_API_KEY
value: {{ .OPTIMISM_API_KEY }}
- name: OPTIMISM_BASE_URL
value: {{ .OPTIMISM_BASE_URL }}
- name: OPTIMISM_REQUESTS_PER_MINUTE
value: "{{ .OPTIMISM_REQUESTS_PER_MINUTE }}"
- name: POLYGON_API_KEY
value: {{ .POLYGON_API_KEY }}
- name: POLYGON_BASE_URL
value: {{ .POLYGON_BASE_URL }}
- name: POLYGON_REQUESTS_PER_MINUTE
value: "{{ .POLYGON_REQUESTS_PER_MINUTE }}"
- name: SOLANA_BASE_URL
value: {{ .SOLANA_BASE_URL }}
- name: SOLANA_REQUESTS_PER_MINUTE

View File

@ -6,7 +6,6 @@ import (
"strings"
"github.com/ethereum/go-ethereum/rpc"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
)
type ethGetTransactionByHashResponse struct {
@ -21,16 +20,17 @@ type ethGetBlockByHashResponse struct {
Number string `json:"number"`
}
func fetchCeloTx(
func fetchEthTx(
ctx context.Context,
cfg *config.RpcProviderSettings,
txHash string,
baseUrl string,
apiKey string,
) (*TxDetail, error) {
// build RPC URL
url := cfg.CeloBaseUrl
if cfg.CeloApiKey != "" {
url += "/" + cfg.CeloApiKey
url := baseUrl
if apiKey != "" {
url += "/" + apiKey
}
// initialize RPC client

View File

@ -30,10 +30,17 @@ type TxDetail struct {
}
var tickers = struct {
ankr *time.Ticker
celo *time.Ticker
solana *time.Ticker
terra *time.Ticker
ankr *time.Ticker
arbitrum *time.Ticker
avalanche *time.Ticker
bsc *time.Ticker
celo *time.Ticker
eth *time.Ticker
fantom *time.Ticker
optimism *time.Ticker
polygon *time.Ticker
solana *time.Ticker
terra *time.Ticker
}{}
func Initialize(cfg *config.RpcProviderSettings) {
@ -51,7 +58,14 @@ func Initialize(cfg *config.RpcProviderSettings) {
tickers.terra = time.NewTicker(f(cfg.TerraRequestsPerMinute))
// these adapters send 2 requests per txHash
tickers.celo = time.NewTicker(f(cfg.AnkrRequestsPerMinute) / 2)
tickers.arbitrum = time.NewTicker(f(cfg.ArbitrumRequestsPerMinute) / 2)
tickers.avalanche = time.NewTicker(f(cfg.AvalancheRequestsPerMinute) / 2)
tickers.bsc = time.NewTicker(f(cfg.BscRequestsPerMinute) / 2)
tickers.eth = time.NewTicker(f(cfg.EthRequestsPerMinute) / 2)
tickers.fantom = time.NewTicker(f(cfg.FantomRequestsPerMinute) / 2)
tickers.celo = time.NewTicker(f(cfg.CeloRequestsPerMinute) / 2)
tickers.optimism = time.NewTicker(f(cfg.OptimismRequestsPerMinute) / 2)
tickers.polygon = time.NewTicker(f(cfg.PolygonRequestsPerMinute) / 2)
tickers.solana = time.NewTicker(f(cfg.SolanaRequestsPerMinute / 2))
}
@ -74,19 +88,45 @@ func FetchTx(
fetchFunc = fetchTerraTx
rateLimiter = *tickers.terra
case vaa.ChainIDCelo:
fetchFunc = fetchCeloTx
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.CeloBaseUrl, cfg.CeloApiKey)
}
rateLimiter = *tickers.celo
// most EVM-compatible chains use the same RPC service
case vaa.ChainIDEthereum,
vaa.ChainIDBSC,
vaa.ChainIDPolygon,
vaa.ChainIDAvalanche,
vaa.ChainIDFantom,
vaa.ChainIDArbitrum,
vaa.ChainIDOptimism:
fetchFunc = ankrFetchTx
rateLimiter = *tickers.ankr
case vaa.ChainIDEthereum:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.EthBaseUrl, cfg.EthApiKey)
}
rateLimiter = *tickers.eth
case vaa.ChainIDBSC:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.BscBaseUrl, cfg.BscApiKey)
}
rateLimiter = *tickers.bsc
case vaa.ChainIDPolygon:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.PolygonBaseUrl, cfg.PolygonApiKey)
}
rateLimiter = *tickers.polygon
case vaa.ChainIDFantom:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.FantomBaseUrl, cfg.FantomApiKey)
}
rateLimiter = *tickers.fantom
case vaa.ChainIDArbitrum:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.ArbitrumBaseUrl, cfg.ArbitrumApiKey)
}
rateLimiter = *tickers.arbitrum
case vaa.ChainIDOptimism:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.OptimismBaseUrl, cfg.OptimismApiKey)
}
rateLimiter = *tickers.optimism
case vaa.ChainIDAvalanche:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.AvalancheBaseUrl, cfg.AvalancheApiKey)
}
rateLimiter = *tickers.avalanche
default:
return nil, ErrChainNotSupported
}

View File

@ -218,6 +218,13 @@ func consume(ctx context.Context, params *consumerParams) {
params.processedDocuments.Add(1)
continue
}
if globalTx.Vaas[0].TxHash == nil {
params.logger.Warn("VAA doesn't have a TxHash, skipping",
zap.String("vaaId", globalTx.Id),
)
params.processedDocuments.Add(1)
continue
}
params.logger.Debug("Processing source tx",
zap.String("vaaId", globalTx.Id),

View File

@ -52,10 +52,38 @@ type RpcProviderSettings struct {
AnkrApiKey string `split_words:"true" required:"false"`
AnkrRequestsPerMinute uint16 `split_words:"true" required:"true"`
ArbitrumBaseUrl string `split_words:"true" required:"true"`
ArbitrumApiKey string `split_words:"true" required:"false"`
ArbitrumRequestsPerMinute uint16 `split_words:"true" required:"true"`
AvalancheBaseUrl string `split_words:"true" required:"true"`
AvalancheApiKey string `split_words:"true" required:"false"`
AvalancheRequestsPerMinute uint16 `split_words:"true" required:"true"`
BscBaseUrl string `split_words:"true" required:"true"`
BscApiKey string `split_words:"true" required:"false"`
BscRequestsPerMinute uint16 `split_words:"true" required:"true"`
CeloBaseUrl string `split_words:"true" required:"true"`
CeloApiKey string `split_words:"true" required:"false"`
CeloRequestsPerMinute uint16 `split_words:"true" required:"true"`
EthBaseUrl string `split_words:"true" required:"true"`
EthApiKey string `split_words:"true" required:"false"`
EthRequestsPerMinute uint16 `split_words:"true" required:"true"`
FantomBaseUrl string `split_words:"true" required:"true"`
FantomApiKey string `split_words:"true" required:"false"`
FantomRequestsPerMinute uint16 `split_words:"true" required:"true"`
OptimismBaseUrl string `split_words:"true" required:"true"`
OptimismApiKey string `split_words:"true" required:"false"`
OptimismRequestsPerMinute uint16 `split_words:"true" required:"true"`
PolygonBaseUrl string `split_words:"true" required:"true"`
PolygonApiKey string `split_words:"true" required:"false"`
PolygonRequestsPerMinute uint16 `split_words:"true" required:"true"`
SolanaBaseUrl string `split_words:"true" required:"true"`
SolanaRequestsPerMinute uint16 `split_words:"true" required:"true"`

View File

@ -86,9 +86,16 @@ func (r *Repository) CountIncompleteDocuments(ctx context.Context) (uint64, erro
// Build the aggregation pipeline
var pipeline mongo.Pipeline
{
// Look up transactions that have not been processed by the tx-tracker
// Look up transactions that either:
// 1. have not been processed
// 2. have been processed, but encountered an internal error
pipeline = append(pipeline, bson.D{
{"$match", bson.D{{"originTx", bson.M{"$exists": false}}}},
{"$match", bson.D{
{"$or", bson.A{
bson.D{{"originTx", bson.D{{"$exists", false}}}},
bson.D{{"originTx.status", bson.M{"$eq": SourceTxStatusInternalError}}},
}},
}},
})
// Count the number of results
@ -151,9 +158,16 @@ func (r *Repository) GetIncompleteDocuments(
{"$match", bson.D{{"_id", bson.M{"$gt": maxId}}}},
})
// Look up transactions that have not been processed by the tx-tracker
// Look up transactions that either:
// 1. have not been processed
// 2. have been processed, but encountered an internal error
pipeline = append(pipeline, bson.D{
{"$match", bson.D{{"originTx", bson.M{"$exists": false}}}},
{"$match", bson.D{
{"$or", bson.A{
bson.D{{"originTx", bson.D{{"$exists", false}}}},
bson.D{{"originTx.status", bson.M{"$eq": SourceTxStatusInternalError}}},
}},
}},
})
// Left join on the VAA collection