diff --git a/api/handlers/transactions/model.go b/api/handlers/transactions/model.go index 911f1b51..e2bf582c 100644 --- a/api/handlers/transactions/model.go +++ b/api/handlers/transactions/model.go @@ -79,6 +79,7 @@ type OriginTx struct { ChainID sdk.ChainID `bson:"chainId" json:"chainId"` TxHash string `bson:"nativeTxHash" json:"txHash"` Timestamp *time.Time `bson:"timestamp" json:"timestamp"` + From string `bson:"from" json:"from"` Status string `bson:"status" json:"status"` } diff --git a/api/routes/wormscan/transactions/controller.go b/api/routes/wormscan/transactions/controller.go index 258aa550..6c1f5563 100644 --- a/api/routes/wormscan/transactions/controller.go +++ b/api/routes/wormscan/transactions/controller.go @@ -421,6 +421,13 @@ func (c *Controller) ListTransactions(ctx *fiber.Ctx) error { tx.Status = TxStatusOngoing } + // Set the origin address, if available + if len(queryResult.Transactions[i].GlobalTransations) == 1 && + queryResult.Transactions[i].GlobalTransations[0].OriginTx != nil { + + tx.OriginAddress = queryResult.Transactions[i].GlobalTransations[0].OriginTx.From + } + response.Transactions = append(response.Transactions, tx) } diff --git a/api/routes/wormscan/transactions/models.go b/api/routes/wormscan/transactions/models.go index c45912c8..27302124 100644 --- a/api/routes/wormscan/transactions/models.go +++ b/api/routes/wormscan/transactions/models.go @@ -18,6 +18,7 @@ type TransactionOverview struct { ID string `json:"id"` Timestamp time.Time `json:"timestamp"` TxHash string `json:"txHash,omitempty"` + OriginAddress string `json:"originAddress,omitempty"` OriginChain sdk.ChainID `json:"originChain"` DestinationAddress string `json:"destinationAddress,omitempty"` DestinationChain sdk.ChainID `json:"destinationChain,omitempty"` diff --git a/deploy/tx-tracker-backfiller/env/production.env b/deploy/tx-tracker-backfiller/env/production.env index e7910c50..ce1fb2e0 100644 --- a/deploy/tx-tracker-backfiller/env/production.env +++ b/deploy/tx-tracker-backfiller/env/production.env @@ -6,7 +6,5 @@ RESOURCES_LIMITS_MEMORY=256Mi RESOURCES_LIMITS_CPU=500m RESOURCES_REQUESTS_MEMORY=128Mi RESOURCES_REQUESTS_CPU=250m -VAA_PAYLOAD_PARSER_URL=http://wormscan-vaa-payload-parser.wormscan -VAA_PAYLOAD_PARSER_TIMEOUT=10 SOLANA_BASE_URL=https://api.mainnet-beta.solana.com SOLANA_REQUESTS_PER_MINUTE=6 diff --git a/deploy/tx-tracker-backfiller/env/staging.env b/deploy/tx-tracker-backfiller/env/staging.env index 4df8781b..5d270918 100644 --- a/deploy/tx-tracker-backfiller/env/staging.env +++ b/deploy/tx-tracker-backfiller/env/staging.env @@ -6,8 +6,6 @@ RESOURCES_LIMITS_MEMORY=128Mi RESOURCES_LIMITS_CPU=500m RESOURCES_REQUESTS_MEMORY=64Mi RESOURCES_REQUESTS_CPU=250m -VAA_PAYLOAD_PARSER_URL=http://wormscan-vaa-payload-parser.wormscan -VAA_PAYLOAD_PARSER_TIMEOUT=10 SOLANA_BASE_URL=https://api.mainnet-beta.solana.com SOLANA_REQUESTS_PER_MINUTE=6 diff --git a/deploy/tx-tracker-backfiller/env/test.env b/deploy/tx-tracker-backfiller/env/test.env index 1f35f7a1..c27dd8a7 100644 --- a/deploy/tx-tracker-backfiller/env/test.env +++ b/deploy/tx-tracker-backfiller/env/test.env @@ -6,7 +6,5 @@ RESOURCES_LIMITS_MEMORY=128Mi RESOURCES_LIMITS_CPU=200m RESOURCES_REQUESTS_MEMORY=64Mi RESOURCES_REQUESTS_CPU=100m -VAA_PAYLOAD_PARSER_URL=http://wormscan-vaa-payload-parser.wormscan-testnet -VAA_PAYLOAD_PARSER_TIMEOUT=10 SOLANA_BASE_URL=https://api.mainnet-beta.solana.com SOLANA_REQUESTS_PER_MINUTE=6 diff --git a/deploy/tx-tracker-backfiller/tx-tracker-backfiller-job.yaml b/deploy/tx-tracker-backfiller/tx-tracker-backfiller-job.yaml index 7fdfcc91..e13574ee 100644 --- a/deploy/tx-tracker-backfiller/tx-tracker-backfiller-job.yaml +++ b/deploy/tx-tracker-backfiller/tx-tracker-backfiller-job.yaml @@ -31,10 +31,38 @@ spec: configMapKeyRef: name: config key: mongo-database - - name: VAA_PAYLOAD_PARSER_URL - value: {{ .VAA_PAYLOAD_PARSER_URL }} - - name: VAA_PAYLOAD_PARSER_TIMEOUT - value: "{{ .VAA_PAYLOAD_PARSER_TIMEOUT }}" + - name: ARBITRUM_BASE_URL + value: {{ .ARBITRUM_BASE_URL }} + - name: ARBITRUM_REQUESTS_PER_MINUTE + value: "{{ .ARBITRUM_REQUESTS_PER_MINUTE }}" + - name: AVALANCHE_BASE_URL + value: {{ .AVALANCHE_BASE_URL }} + - name: AVALANCHE_REQUESTS_PER_MINUTE + value: "{{ .AVALANCHE_REQUESTS_PER_MINUTE }}" + - name: BSC_BASE_URL + value: {{ .BSC_BASE_URL }} + - name: BSC_REQUESTS_PER_MINUTE + value: "{{ .BSC_REQUESTS_PER_MINUTE }}" + - name: CELO_BASE_URL + value: {{ .CELO_BASE_URL }} + - name: CELO_REQUESTS_PER_MINUTE + value: "{{ .CELO_REQUESTS_PER_MINUTE }}" + - name: ETHEREUM_BASE_URL + value: {{ .ETHEREUM_BASE_URL }} + - name: ETHEREUM_REQUESTS_PER_MINUTE + value: "{{ .ETHEREUM_REQUESTS_PER_MINUTE }}" + - name: FANTOM_BASE_URL + value: {{ .FANTOM_BASE_URL }} + - name: FANTOM_REQUESTS_PER_MINUTE + value: "{{ .FANTOM_REQUESTS_PER_MINUTE }}" + - name: OPTIMISM_BASE_URL + value: {{ .OPTIMISM_BASE_URL }} + - name: OPTIMISM_REQUESTS_PER_MINUTE + value: "{{ .OPTIMISM_REQUESTS_PER_MINUTE }}" + - name: POLYGON_BASE_URL + value: {{ .POLYGON_BASE_URL }} + - name: POLYGON_REQUESTS_PER_MINUTE + value: "{{ .POLYGON_REQUESTS_PER_MINUTE }}" - name: SOLANA_BASE_URL value: {{ .SOLANA_BASE_URL }} - name: SOLANA_REQUESTS_PER_MINUTE diff --git a/deploy/tx-tracker/env/production.env b/deploy/tx-tracker/env/production.env index a02bd375..bd1c176a 100644 --- a/deploy/tx-tracker/env/production.env +++ b/deploy/tx-tracker/env/production.env @@ -9,8 +9,6 @@ RESOURCES_REQUESTS_MEMORY=128Mi RESOURCES_REQUESTS_CPU=250m SQS_URL= SQS_AWS_REGION= -VAA_PAYLOAD_PARSER_URL=http://wormscan-vaa-payload-parser.wormscan -VAA_PAYLOAD_PARSER_TIMEOUT=10 SOLANA_BASE_URL=https://api.mainnet-beta.solana.com SOLANA_REQUESTS_PER_MINUTE=6 AWS_IAM_ROLE= diff --git a/deploy/tx-tracker/env/staging.env b/deploy/tx-tracker/env/staging.env index ee42b76a..0b6ddfc7 100644 --- a/deploy/tx-tracker/env/staging.env +++ b/deploy/tx-tracker/env/staging.env @@ -10,9 +10,6 @@ RESOURCES_REQUESTS_CPU=40m SQS_URL= SQS_AWS_REGION= -VAA_PAYLOAD_PARSER_URL=http://wormscan-vaa-payload-parser.wormscan -VAA_PAYLOAD_PARSER_TIMEOUT=10 - SOLANA_BASE_URL=https://api.mainnet-beta.solana.com SOLANA_REQUESTS_PER_MINUTE=6 AWS_IAM_ROLE= \ No newline at end of file diff --git a/deploy/tx-tracker/env/test.env b/deploy/tx-tracker/env/test.env index a63b7f50..832dd925 100644 --- a/deploy/tx-tracker/env/test.env +++ b/deploy/tx-tracker/env/test.env @@ -9,8 +9,6 @@ RESOURCES_REQUESTS_MEMORY=15Mi RESOURCES_REQUESTS_CPU=10m SQS_URL= SQS_AWS_REGION= -VAA_PAYLOAD_PARSER_URL=http://wormscan-vaa-payload-parser.wormscan-testnet -VAA_PAYLOAD_PARSER_TIMEOUT=10 SOLANA_BASE_URL=https://api.devnet.solana.com SOLANA_REQUESTS_PER_MINUTE=6 AWS_IAM_ROLE= \ No newline at end of file diff --git a/deploy/tx-tracker/tx-tracker-service.yaml b/deploy/tx-tracker/tx-tracker-service.yaml index ed223f36..8c36d932 100644 --- a/deploy/tx-tracker/tx-tracker-service.yaml +++ b/deploy/tx-tracker/tx-tracker-service.yaml @@ -57,10 +57,38 @@ spec: value: {{ .SQS_URL }} - name: AWS_REGION value: {{ .SQS_AWS_REGION }} - - name: VAA_PAYLOAD_PARSER_URL - value: {{ .VAA_PAYLOAD_PARSER_URL }} - - name: VAA_PAYLOAD_PARSER_TIMEOUT - value: "{{ .VAA_PAYLOAD_PARSER_TIMEOUT }}" + - name: ARBITRUM_BASE_URL + value: {{ .ARBITRUM_BASE_URL }} + - name: ARBITRUM_REQUESTS_PER_MINUTE + value: "{{ .ARBITRUM_REQUESTS_PER_MINUTE }}" + - name: AVALANCHE_BASE_URL + value: {{ .AVALANCHE_BASE_URL }} + - name: AVALANCHE_REQUESTS_PER_MINUTE + value: "{{ .AVALANCHE_REQUESTS_PER_MINUTE }}" + - name: BSC_BASE_URL + value: {{ .BSC_BASE_URL }} + - name: BSC_REQUESTS_PER_MINUTE + value: "{{ .BSC_REQUESTS_PER_MINUTE }}" + - name: CELO_BASE_URL + value: {{ .CELO_BASE_URL }} + - name: CELO_REQUESTS_PER_MINUTE + value: "{{ .CELO_REQUESTS_PER_MINUTE }}" + - name: ETHEREUM_BASE_URL + value: {{ .ETHEREUM_BASE_URL }} + - name: ETHEREUM_REQUESTS_PER_MINUTE + value: "{{ .ETHEREUM_REQUESTS_PER_MINUTE }}" + - name: FANTOM_BASE_URL + value: {{ .FANTOM_BASE_URL }} + - name: FANTOM_REQUESTS_PER_MINUTE + value: "{{ .FANTOM_REQUESTS_PER_MINUTE }}" + - name: OPTIMISM_BASE_URL + value: {{ .OPTIMISM_BASE_URL }} + - name: OPTIMISM_REQUESTS_PER_MINUTE + value: "{{ .OPTIMISM_REQUESTS_PER_MINUTE }}" + - name: POLYGON_BASE_URL + value: {{ .POLYGON_BASE_URL }} + - name: POLYGON_REQUESTS_PER_MINUTE + value: "{{ .POLYGON_REQUESTS_PER_MINUTE }}" - name: SOLANA_BASE_URL value: {{ .SOLANA_BASE_URL }} - name: SOLANA_REQUESTS_PER_MINUTE diff --git a/tx-tracker/chains/eth.go b/tx-tracker/chains/eth.go index 3865a97a..0aae8ce1 100644 --- a/tx-tracker/chains/eth.go +++ b/tx-tracker/chains/eth.go @@ -24,17 +24,10 @@ func fetchEthTx( ctx context.Context, txHash string, baseUrl string, - apiKey string, ) (*TxDetail, error) { - // build RPC URL - url := baseUrl - if apiKey != "" { - url += "/" + apiKey - } - // initialize RPC client - client, err := rpc.DialContext(ctx, url) + client, err := rpc.DialContext(ctx, baseUrl) if err != nil { return nil, fmt.Errorf("failed to initialize RPC client: %w", err) } @@ -69,7 +62,7 @@ func fetchEthTx( // build results and return txDetail := &TxDetail{ - Signer: strings.ToLower(txReply.From), + From: strings.ToLower(txReply.From), Timestamp: timestamp, NativeTxHash: fmt.Sprintf("0x%s", strings.ToLower(txHash)), } diff --git a/tx-tracker/chains/solanaRpc.go b/tx-tracker/chains/solanaRpc.go index d768901f..226f4897 100644 --- a/tx-tracker/chains/solanaRpc.go +++ b/tx-tracker/chains/solanaRpc.go @@ -116,10 +116,10 @@ func fetchSolanaTx( // set sender/receiver for i := range response.Transaction.Message.AccountKeys { if response.Transaction.Message.AccountKeys[i].Signer { - txDetail.Signer = response.Transaction.Message.AccountKeys[i].Pubkey + txDetail.From = response.Transaction.Message.AccountKeys[i].Pubkey } } - if txDetail.Signer == "" { + if txDetail.From == "" { return nil, fmt.Errorf("failed to find source account") } diff --git a/tx-tracker/chains/tx.go b/tx-tracker/chains/tx.go index bc32fd25..b685eb9c 100644 --- a/tx-tracker/chains/tx.go +++ b/tx-tracker/chains/tx.go @@ -21,8 +21,8 @@ var ( ) type TxDetail struct { - // Signer is the address that signed the transaction, encoded in the chain's native format. - Signer string + // From is the address that signed the transaction, encoded in the chain's native format. + From string // Timestamp indicates the time at which the transaction was confirmed. Timestamp time.Time // NativeTxHash contains the transaction hash, encoded in the chain's native format. @@ -30,7 +30,15 @@ type TxDetail struct { } var tickers = struct { - solana *time.Ticker + arbitrum *time.Ticker + avalanche *time.Ticker + bsc *time.Ticker + celo *time.Ticker + ethereum *time.Ticker + fantom *time.Ticker + optimism *time.Ticker + polygon *time.Ticker + solana *time.Ticker }{} func Initialize(cfg *config.RpcProviderSettings) { @@ -44,7 +52,15 @@ func Initialize(cfg *config.RpcProviderSettings) { return time.Duration(roundedUp) } - // this adapter sends 2 requests per txHash + // these adapters send 2 requests per txHash + tickers.arbitrum = time.NewTicker(f(cfg.ArbitrumRequestsPerMinute / 2)) + tickers.avalanche = time.NewTicker(f(cfg.AvalancheRequestsPerMinute / 2)) + tickers.bsc = time.NewTicker(f(cfg.BscRequestsPerMinute / 2)) + tickers.celo = time.NewTicker(f(cfg.CeloRequestsPerMinute / 2)) + tickers.ethereum = time.NewTicker(f(cfg.EthereumRequestsPerMinute / 2)) + tickers.fantom = time.NewTicker(f(cfg.FantomRequestsPerMinute / 2)) + tickers.optimism = time.NewTicker(f(cfg.OptimismRequestsPerMinute / 2)) + tickers.polygon = time.NewTicker(f(cfg.PolygonRequestsPerMinute / 2)) tickers.solana = time.NewTicker(f(cfg.SolanaRequestsPerMinute / 2)) } @@ -63,6 +79,46 @@ func FetchTx( case vaa.ChainIDSolana: fetchFunc = fetchSolanaTx rateLimiter = *tickers.solana + case vaa.ChainIDCelo: + fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { + return fetchEthTx(ctx, txHash, cfg.CeloBaseUrl) + } + rateLimiter = *tickers.celo + case vaa.ChainIDEthereum: + fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { + return fetchEthTx(ctx, txHash, cfg.EthereumBaseUrl) + } + rateLimiter = *tickers.ethereum + case vaa.ChainIDBSC: + fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { + return fetchEthTx(ctx, txHash, cfg.BscBaseUrl) + } + rateLimiter = *tickers.bsc + case vaa.ChainIDPolygon: + fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { + return fetchEthTx(ctx, txHash, cfg.PolygonBaseUrl) + } + rateLimiter = *tickers.polygon + case vaa.ChainIDFantom: + fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { + return fetchEthTx(ctx, txHash, cfg.FantomBaseUrl) + } + rateLimiter = *tickers.fantom + case vaa.ChainIDArbitrum: + fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { + return fetchEthTx(ctx, txHash, cfg.ArbitrumBaseUrl) + } + rateLimiter = *tickers.arbitrum + case vaa.ChainIDOptimism: + fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { + return fetchEthTx(ctx, txHash, cfg.OptimismBaseUrl) + } + rateLimiter = *tickers.optimism + case vaa.ChainIDAvalanche: + fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { + return fetchEthTx(ctx, txHash, cfg.AvalancheBaseUrl) + } + rateLimiter = *tickers.avalanche default: return nil, ErrChainNotSupported } diff --git a/tx-tracker/cmd/backfiller/main.go b/tx-tracker/cmd/backfiller/main.go index 0c461acb..298e3a79 100644 --- a/tx-tracker/cmd/backfiller/main.go +++ b/tx-tracker/cmd/backfiller/main.go @@ -104,14 +104,13 @@ func main() { for i := uint(0); i < cfg.NumWorkers; i++ { name := fmt.Sprintf("worker-%d", i) p := consumerParams{ - logger: makeLogger(rootLogger, name), - vaaPayloadParserSettings: &cfg.VaaPayloadParserSettings, - rpcProviderSettings: &cfg.RpcProviderSettings, - repository: repository, - queueRx: queue, - wg: &wg, - totalDocuments: totalDocuments, - processedDocuments: &processedDocuments, + logger: makeLogger(rootLogger, name), + rpcProviderSettings: &cfg.RpcProviderSettings, + repository: repository, + queueRx: queue, + wg: &wg, + totalDocuments: totalDocuments, + processedDocuments: &processedDocuments, } go consume(rootCtx, &p) } @@ -233,14 +232,13 @@ func produce(ctx context.Context, params *producerParams) { // consumerParams contains the parameters for the consumer goroutine. type consumerParams struct { - logger *zap.Logger - vaaPayloadParserSettings *config.VaaPayloadParserSettings - rpcProviderSettings *config.RpcProviderSettings - repository *consumer.Repository - queueRx <-chan consumer.GlobalTransaction - wg *sync.WaitGroup - totalDocuments uint64 - processedDocuments *atomic.Uint64 + logger *zap.Logger + rpcProviderSettings *config.RpcProviderSettings + repository *consumer.Repository + queueRx <-chan consumer.GlobalTransaction + wg *sync.WaitGroup + totalDocuments uint64 + processedDocuments *atomic.Uint64 } // consume reads VAA IDs from a channel, processes them, and updates the database accordingly. @@ -252,18 +250,12 @@ type consumerParams struct { func consume(ctx context.Context, params *consumerParams) { // Initialize the client, which processes source Txs. - client, err := consumer.New( + client := consumer.New( nil, - params.vaaPayloadParserSettings, params.rpcProviderSettings, params.logger, params.repository, ) - if err != nil { - params.logger.Error("Failed to initialize consumer", zap.Error(err)) - params.wg.Done() - return - } // Main loop: fetch global txs and process them for { @@ -314,7 +306,7 @@ func consume(ctx context.Context, params *consumerParams) { Sequence: v.Sequence, TxHash: *v.TxHash, } - err = client.ProcessSourceTx(ctx, &p) + err := client.ProcessSourceTx(ctx, &p) if err != nil { params.logger.Error("Failed to track source tx", zap.String("vaaId", globalTx.Id), diff --git a/tx-tracker/cmd/fetchone/main.go b/tx-tracker/cmd/fetchone/main.go index d9f63f68..8d451a37 100644 --- a/tx-tracker/cmd/fetchone/main.go +++ b/tx-tracker/cmd/fetchone/main.go @@ -38,5 +38,5 @@ func main() { // print tx details log.Printf("tx detail: sender=%s nativeTxHash=%s timestamp=%s", - txDetail.Signer, txDetail.NativeTxHash, txDetail.Timestamp) + txDetail.From, txDetail.NativeTxHash, txDetail.Timestamp) } diff --git a/tx-tracker/cmd/service/main.go b/tx-tracker/cmd/service/main.go index f3c33ff2..7e573a5b 100644 --- a/tx-tracker/cmd/service/main.go +++ b/tx-tracker/cmd/service/main.go @@ -65,10 +65,7 @@ func main() { // create and start a consumer. vaaConsumeFunc := newVAAConsumeFunc(rootCtx, cfg, logger) repository := consumer.NewRepository(logger, db) - consumer, err := consumer.New(vaaConsumeFunc, &cfg.VaaPayloadParserSettings, &cfg.RpcProviderSettings, logger, repository) - if err != nil { - logger.Fatal("Failed to create VAA consumer", zap.Error(err)) - } + consumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, logger, repository) consumer.Start(rootCtx) logger.Info("Started wormhole-explorer-tx-tracker") diff --git a/tx-tracker/config/structs.go b/tx-tracker/config/structs.go index cebca701..98b04330 100644 --- a/tx-tracker/config/structs.go +++ b/tx-tracker/config/structs.go @@ -30,7 +30,6 @@ type BackfillerSettings struct { TimestampBefore string `split_words:"true" required:"false"` } - VaaPayloadParserSettings MongodbSettings RpcProviderSettings } @@ -42,16 +41,10 @@ type ServiceSettings struct { PprofEnabled bool `split_words:"true" default:"false"` AwsSettings - VaaPayloadParserSettings MongodbSettings RpcProviderSettings } -type VaaPayloadParserSettings struct { - VaaPayloadParserUrl string `split_words:"true" required:"true"` - VaaPayloadParserTimeout int64 `split_words:"true" required:"true"` -} - type AwsSettings struct { AwsEndpoint string `split_words:"true" required:"false"` AwsAccessKeyID string `split_words:"true" required:"false"` @@ -66,8 +59,24 @@ type MongodbSettings struct { } type RpcProviderSettings struct { - SolanaBaseUrl string `split_words:"true" required:"true"` - SolanaRequestsPerMinute uint16 `split_words:"true" required:"true"` + ArbitrumBaseUrl string `split_words:"true" required:"true"` + ArbitrumRequestsPerMinute uint16 `split_words:"true" required:"true"` + AvalancheBaseUrl string `split_words:"true" required:"true"` + AvalancheRequestsPerMinute uint16 `split_words:"true" required:"true"` + BscBaseUrl string `split_words:"true" required:"true"` + BscRequestsPerMinute uint16 `split_words:"true" required:"true"` + CeloBaseUrl string `split_words:"true" required:"true"` + CeloRequestsPerMinute uint16 `split_words:"true" required:"true"` + EthereumBaseUrl string `split_words:"true" required:"true"` + EthereumRequestsPerMinute uint16 `split_words:"true" required:"true"` + FantomBaseUrl string `split_words:"true" required:"true"` + FantomRequestsPerMinute uint16 `split_words:"true" required:"true"` + OptimismBaseUrl string `split_words:"true" required:"true"` + OptimismRequestsPerMinute uint16 `split_words:"true" required:"true"` + PolygonBaseUrl string `split_words:"true" required:"true"` + PolygonRequestsPerMinute uint16 `split_words:"true" required:"true"` + SolanaBaseUrl string `split_words:"true" required:"true"` + SolanaRequestsPerMinute uint16 `split_words:"true" required:"true"` } func LoadFromEnv[T any]() (*T, error) { diff --git a/tx-tracker/consumer/consumer.go b/tx-tracker/consumer/consumer.go index c1487d31..2c2e902f 100644 --- a/tx-tracker/consumer/consumer.go +++ b/tx-tracker/consumer/consumer.go @@ -2,11 +2,9 @@ package consumer import ( "context" - "fmt" "time" "github.com/wormhole-foundation/wormhole-explorer/common/domain" - "github.com/wormhole-foundation/wormhole-explorer/parser/parser" "github.com/wormhole-foundation/wormhole-explorer/txtracker/chains" "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" @@ -26,36 +24,24 @@ type Consumer struct { rpcServiceProviderSettings *config.RpcProviderSettings logger *zap.Logger repository *Repository - vaaPayloadParser parser.ParserVAAAPIClient } // New creates a new vaa consumer. func New( consumeFunc queue.VAAConsumeFunc, - vaaPayloadParserSettings *config.VaaPayloadParserSettings, rpcServiceProviderSettings *config.RpcProviderSettings, logger *zap.Logger, repository *Repository, -) (*Consumer, error) { - - vaaPayloadParser, err := parser.NewParserVAAAPIClient( - vaaPayloadParserSettings.VaaPayloadParserTimeout, - vaaPayloadParserSettings.VaaPayloadParserUrl, - logger, - ) - if err != nil { - return nil, fmt.Errorf("failed to create VAA parser client: %w", err) - } +) *Consumer { c := Consumer{ consumeFunc: consumeFunc, rpcServiceProviderSettings: rpcServiceProviderSettings, logger: logger, repository: repository, - vaaPayloadParser: vaaPayloadParser, } - return &c, nil + return &c } // Start consumes messages from VAA queue, parse and store those messages in a repository. @@ -77,38 +63,6 @@ func (c *Consumer) Start(ctx context.Context) { continue } - // Parse the VAA's payload - parsedPayload, err := c.vaaPayloadParser.Parse( - uint16(event.ChainID), - event.EmitterAddress, - event.Sequence, - event.Vaa, - ) - if err == parser.ErrNotFound { - c.logger.Debug("Skipping message - no parsed registered for this (chain, emitter) pair", - zap.String("vaaId", event.ID), - ) - msg.Done() - continue - } - if err != nil { - c.logger.Error("Failed to parse VAA payload", - zap.String("vaaId", event.ID), - zap.Error(err), - ) - msg.Done() - continue - } - - // Skip messages that have not been generated by the portal token bridge - if parsedPayload.AppID != domain.AppIdPortalTokenBridge { - c.logger.Debug("Skipping VAA because it was not generated by the portal token bridge", - zap.String("vaaId", event.ID), - ) - msg.Done() - continue - } - // Fetch tx details from the corresponding RPC/API, then persist them on MongoDB. p := ProcessSourceTxParams{ VaaId: event.ID, @@ -117,7 +71,7 @@ func (c *Consumer) Start(ctx context.Context) { Sequence: event.Sequence, TxHash: event.TxHash, } - err = c.ProcessSourceTx(ctx, &p) + err := c.ProcessSourceTx(ctx, &p) if err == chains.ErrChainNotSupported { c.logger.Debug("Skipping VAA - chain not supported", zap.String("vaaId", event.ID), diff --git a/tx-tracker/consumer/repository.go b/tx-tracker/consumer/repository.go index 1673ab5e..909bd353 100644 --- a/tx-tracker/consumer/repository.go +++ b/tx-tracker/consumer/repository.go @@ -53,6 +53,7 @@ func (r *Repository) UpsertDocument(ctx context.Context, params *UpsertDocumentP if params.TxDetail != nil { fields = append(fields, primitive.E{Key: "nativeTxHash", Value: params.TxDetail.NativeTxHash}) + fields = append(fields, primitive.E{Key: "from", Value: params.TxDetail.From}) } update := bson.D{