diff --git a/tx-tracker/chains/algorand.go b/tx-tracker/chains/api_algorand.go similarity index 53% rename from tx-tracker/chains/algorand.go rename to tx-tracker/chains/api_algorand.go index e8943ce5..2de62384 100644 --- a/tx-tracker/chains/algorand.go +++ b/tx-tracker/chains/api_algorand.go @@ -5,8 +5,6 @@ import ( "encoding/json" "fmt" "time" - - "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" ) type algorandTransactionResponse struct { @@ -19,22 +17,25 @@ type algorandTransactionResponse struct { func fetchAlgorandTx( ctx context.Context, - cfg *config.RpcProviderSettings, + rateLimiter *time.Ticker, + baseUrl string, txHash string, ) (*TxDetail, error) { - // Fetch tx data from the Algorand Indexer API - url := fmt.Sprintf("%s/v2/transactions/%s", cfg.AlgorandBaseUrl, txHash) - fmt.Println(url) - body, err := httpGet(ctx, url) - if err != nil { - return nil, fmt.Errorf("HTTP request to Algorand transactions endpoint failed: %w", err) - } - - // Decode the response + // Call the transaction endpoint of the Algorand Indexer REST API var response algorandTransactionResponse - if err := json.Unmarshal(body, &response); err != nil { - return nil, fmt.Errorf("failed to decode Algorand transactions response as JSON: %w", err) + { + // Perform the HTTP request + url := fmt.Sprintf("%s/v2/transactions/%s", baseUrl, txHash) + body, err := httpGet(ctx, rateLimiter, url) + if err != nil { + return nil, fmt.Errorf("HTTP request to Algorand transactions endpoint failed: %w", err) + } + + // Decode the response + if err := json.Unmarshal(body, &response); err != nil { + return nil, fmt.Errorf("failed to decode Algorand transactions response as JSON: %w", err) + } } // Populate the result struct and return diff --git a/tx-tracker/chains/aptos.go b/tx-tracker/chains/api_aptos.go similarity index 87% rename from tx-tracker/chains/aptos.go rename to tx-tracker/chains/api_aptos.go index 76f15f66..7db9c2c6 100644 --- a/tx-tracker/chains/aptos.go +++ b/tx-tracker/chains/api_aptos.go @@ -6,8 +6,6 @@ import ( "fmt" "strconv" "time" - - "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" ) const ( @@ -26,7 +24,8 @@ type aptosTx struct { func fetchAptosTx( ctx context.Context, - cfg *config.RpcProviderSettings, + rateLimiter *time.Ticker, + baseUrl string, txHash string, ) (*TxDetail, error) { @@ -41,14 +40,14 @@ func fetchAptosTx( { // Build the URI for the events endpoint uri := fmt.Sprintf("%s/accounts/%s/events/%s::state::WormholeMessageHandle/event?start=%d&limit=1", - cfg.AptosBaseUrl, + baseUrl, aptosCoreContractAddress, aptosCoreContractAddress, creationNumber, ) // Query the events endpoint - body, err := httpGet(ctx, uri) + body, err := httpGet(ctx, rateLimiter, uri) if err != nil { return nil, fmt.Errorf("failed to query events endpoint: %w", err) } @@ -67,10 +66,10 @@ func fetchAptosTx( var tx aptosTx { // Build the URI for the events endpoint - uri := fmt.Sprintf("%s/transactions/by_version/%d", cfg.AptosBaseUrl, events[0].Version) + uri := fmt.Sprintf("%s/transactions/by_version/%d", baseUrl, events[0].Version) // Query the events endpoint - body, err := httpGet(ctx, uri) + body, err := httpGet(ctx, rateLimiter, uri) if err != nil { return nil, fmt.Errorf("failed to query transactions endpoint: %w", err) } diff --git a/tx-tracker/chains/cosmos.go b/tx-tracker/chains/api_cosmos.go similarity index 73% rename from tx-tracker/chains/cosmos.go rename to tx-tracker/chains/api_cosmos.go index de283d7f..d728dee8 100644 --- a/tx-tracker/chains/cosmos.go +++ b/tx-tracker/chains/api_cosmos.go @@ -29,21 +29,25 @@ type cosmosTxsResponse struct { func fetchCosmosTx( ctx context.Context, - baseUri string, + rateLimiter *time.Ticker, + baseUrl string, txHash string, ) (*TxDetail, error) { - // Query the Cosmos transaction endpoint - uri := fmt.Sprintf("%s/cosmos/tx/v1beta1/txs/%s", baseUri, txHash) - body, err := httpGet(ctx, uri) - if err != nil { - return nil, fmt.Errorf("failed to query cosmos tx endpoint: %w", err) - } - - // Deserialize response body + // Call the transaction endpoint of the cosmos REST API var response cosmosTxsResponse - if err := json.Unmarshal(body, &response); err != nil { - return nil, fmt.Errorf("failed to deserialize cosmos tx response: %w", err) + { + // Perform the HTTP request + uri := fmt.Sprintf("%s/cosmos/tx/v1beta1/txs/%s", baseUrl, txHash) + body, err := httpGet(ctx, rateLimiter, uri) + if err != nil { + return nil, fmt.Errorf("failed to query cosmos tx endpoint: %w", err) + } + + // Deserialize response body + if err := json.Unmarshal(body, &response); err != nil { + return nil, fmt.Errorf("failed to deserialize cosmos tx response: %w", err) + } } // Find the sender address diff --git a/tx-tracker/chains/eth.go b/tx-tracker/chains/api_eth.go similarity index 63% rename from tx-tracker/chains/eth.go rename to tx-tracker/chains/api_eth.go index 0aae8ce1..3d9b72e1 100644 --- a/tx-tracker/chains/eth.go +++ b/tx-tracker/chains/api_eth.go @@ -4,8 +4,7 @@ import ( "context" "fmt" "strings" - - "github.com/ethereum/go-ethereum/rpc" + "time" ) type ethGetTransactionByHashResponse struct { @@ -22,12 +21,13 @@ type ethGetBlockByHashResponse struct { func fetchEthTx( ctx context.Context, - txHash string, + rateLimiter *time.Ticker, baseUrl string, + txHash string, ) (*TxDetail, error) { // initialize RPC client - client, err := rpc.DialContext(ctx, baseUrl) + client, err := rpcDialContext(ctx, baseUrl) if err != nil { return nil, fmt.Errorf("failed to initialize RPC client: %w", err) } @@ -35,23 +35,27 @@ func fetchEthTx( // query transaction data var txReply ethGetTransactionByHashResponse - err = client.CallContext(ctx, &txReply, "eth_getTransactionByHash", "0x"+txHash) - if err != nil { - return nil, fmt.Errorf("failed to get tx by hash: %w", err) - } - if txReply.BlockHash == "" || txReply.From == "" { - return nil, ErrTransactionNotFound + { + err = client.CallContext(ctx, rateLimiter, &txReply, "eth_getTransactionByHash", "0x"+txHash) + if err != nil { + return nil, fmt.Errorf("failed to get tx by hash: %w", err) + } + if txReply.BlockHash == "" || txReply.From == "" { + return nil, ErrTransactionNotFound + } } // query block data - blkParams := []interface{}{ - txReply.BlockHash, // tx hash - false, // include transactions? - } var blkReply ethGetBlockByHashResponse - err = client.CallContext(ctx, &blkReply, "eth_getBlockByHash", blkParams...) - if err != nil { - return nil, fmt.Errorf("failed to get block by hash: %w", err) + { + blkParams := []interface{}{ + txReply.BlockHash, // tx hash + false, // include transactions? + } + err = client.CallContext(ctx, rateLimiter, &blkReply, "eth_getBlockByHash", blkParams...) + if err != nil { + return nil, fmt.Errorf("failed to get block by hash: %w", err) + } } // parse transaction timestamp diff --git a/tx-tracker/chains/api_solana.go b/tx-tracker/chains/api_solana.go new file mode 100644 index 00000000..fecebf26 --- /dev/null +++ b/tx-tracker/chains/api_solana.go @@ -0,0 +1,118 @@ +package chains + +import ( + "context" + "encoding/hex" + "fmt" + "time" + + "github.com/mr-tron/base58" +) + +type solanaTransactionSignature struct { + Signature string `json:"signature"` +} + +type solanaGetTransactionResponse struct { + BlockTime int64 `json:"blockTime"` + Meta struct { + InnerInstructions []struct { + Instructions []struct { + ParsedInstruction struct { + Type_ string `json:"type"` + Info struct { + Account string `json:"account"` + Amount string `json:"amount"` + Authority string `json:"authority"` + Destination string `json:"destination"` + Source string `json:"source"` + } `json:"info"` + } `json:"parsed"` + } `json:"instructions"` + } `json:"innerInstructions"` + + Err []interface{} `json:"err"` + } `json:"meta"` + Transaction struct { + Message struct { + AccountKeys []struct { + Pubkey string `json:"pubkey"` + Signer bool `json:"signer"` + } `json:"accountKeys"` + } `json:"message"` + Signatures []string `json:"signatures"` + } `json:"transaction"` +} + +func fetchSolanaTx( + ctx context.Context, + rateLimiter *time.Ticker, + baseUrl string, + txHash string, +) (*TxDetail, error) { + + // Initialize RPC client + client, err := rpcDialContext(ctx, baseUrl) + if err != nil { + return nil, fmt.Errorf("failed to initialize RPC client: %w", err) + } + defer client.Close() + + // Decode txHash bytes + // TODO: remove this when the fly fixes all txHash for Solana + h, err := hex.DecodeString(txHash) + if err != nil { + h, err = base58.Decode(txHash) + if err != nil { + return nil, fmt.Errorf("failed to decode from hex txHash=%s: %w", txHash, err) + } + } + + // Get transaction signatures for the given account + var sigs []solanaTransactionSignature + { + err = client.CallContext(ctx, rateLimiter, &sigs, "getSignaturesForAddress", base58.Encode(h)) + if err != nil { + return nil, fmt.Errorf("failed to get signatures for account: %w (%+v)", err, err) + } + if len(sigs) == 0 { + return nil, ErrTransactionNotFound + } + if len(sigs) > 1 { + return nil, fmt.Errorf("expected exactly one signature, but found %d", len(sigs)) + } + } + + // Fetch the portal token bridge transaction + var response solanaGetTransactionResponse + { + err = client.CallContext(ctx, rateLimiter, &response, "getTransaction", sigs[0].Signature, "jsonParsed") + if err != nil { + return nil, fmt.Errorf("failed to get tx by signature: %w", err) + } + if len(response.Meta.InnerInstructions) == 0 { + return nil, fmt.Errorf("response.Meta.InnerInstructions is empty") + } + if len(response.Meta.InnerInstructions[0].Instructions) == 0 { + return nil, fmt.Errorf("response.Meta.InnerInstructions[0].Instructions is empty") + } + } + + // populate the response object + txDetail := TxDetail{ + Timestamp: time.Unix(response.BlockTime, 0).UTC(), + NativeTxHash: sigs[0].Signature, + } + + // set sender/receiver + for i := range response.Transaction.Message.AccountKeys { + if response.Transaction.Message.AccountKeys[i].Signer { + txDetail.From = response.Transaction.Message.AccountKeys[i].Pubkey + } + } + if txDetail.From == "" { + return nil, fmt.Errorf("failed to find source account") + } + + return &txDetail, nil +} diff --git a/tx-tracker/chains/sui.go b/tx-tracker/chains/api_sui.go similarity index 72% rename from tx-tracker/chains/sui.go rename to tx-tracker/chains/api_sui.go index 896d83c9..968fc13e 100644 --- a/tx-tracker/chains/sui.go +++ b/tx-tracker/chains/api_sui.go @@ -6,7 +6,6 @@ import ( "time" "github.com/ethereum/go-ethereum/rpc" - "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" ) type suiGetTransactionBlockResponse struct { @@ -30,12 +29,13 @@ type suiGetTransactionBlockOpts struct { func fetchSuiTx( ctx context.Context, - cfg *config.RpcProviderSettings, + rateLimiter *time.Ticker, + baseUrl string, txHash string, ) (*TxDetail, error) { // Initialize RPC client - client, err := rpc.DialContext(ctx, cfg.SuiBaseUrl) + client, err := rpc.DialContext(ctx, baseUrl) if err != nil { return nil, fmt.Errorf("failed to initialize RPC client: %w", err) } @@ -43,10 +43,18 @@ func fetchSuiTx( // Query transaction data var reply suiGetTransactionBlockResponse - opts := suiGetTransactionBlockOpts{ShowInput: true} - err = client.CallContext(ctx, &reply, "sui_getTransactionBlock", txHash, opts) - if err != nil { - return nil, fmt.Errorf("failed to get tx by hash: %w", err) + { + // Wait for the rate limiter + if !waitForRateLimiter(ctx, rateLimiter) { + return nil, ctx.Err() + } + + // Execute the remote procedure call + opts := suiGetTransactionBlockOpts{ShowInput: true} + err = client.CallContext(ctx, &reply, "sui_getTransactionBlock", txHash, opts) + if err != nil { + return nil, fmt.Errorf("failed to get tx by hash: %w", err) + } } // Populate the response struct and return diff --git a/tx-tracker/chains/chains.go b/tx-tracker/chains/chains.go new file mode 100644 index 00000000..eb8d462d --- /dev/null +++ b/tx-tracker/chains/chains.go @@ -0,0 +1,143 @@ +package chains + +import ( + "context" + "errors" + "fmt" + "math" + "time" + + "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" +) + +var ( + ErrChainNotSupported = errors.New("chain id not supported") + ErrTransactionNotFound = errors.New("transaction not found") +) + +var ( + // rateLimitersByChain maps a chain ID to the request rate limiter for that chain. + rateLimitersByChain map[sdk.ChainID]*time.Ticker + // baseUrlsByChain maps a chain ID to the base URL of the RPC/API service for that chain. + baseUrlsByChain map[sdk.ChainID]string +) + +type TxDetail struct { + // From is the address that signed the transaction, encoded in the chain's native format. + From string + // Timestamp indicates the time at which the transaction was confirmed. + Timestamp time.Time + // NativeTxHash contains the transaction hash, encoded in the chain's native format. + NativeTxHash string +} + +func Initialize(cfg *config.RpcProviderSettings) { + + // convertToRateLimiter converts "requests per minute" into the associated *time.Ticker + convertToRateLimiter := func(requestsPerMinute uint16) *time.Ticker { + + division := float64(time.Minute) / float64(time.Duration(requestsPerMinute)) + roundedUp := math.Ceil(division) + + duration := time.Duration(roundedUp) + + return time.NewTicker(duration) + } + + // Initialize rate limiters for each chain + rateLimitersByChain = make(map[sdk.ChainID]*time.Ticker) + rateLimitersByChain[sdk.ChainIDArbitrum] = convertToRateLimiter(cfg.ArbitrumRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDAlgorand] = convertToRateLimiter(cfg.AlgorandRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDAptos] = convertToRateLimiter(cfg.AptosRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDAvalanche] = convertToRateLimiter(cfg.AvalancheRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDBSC] = convertToRateLimiter(cfg.BscRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDCelo] = convertToRateLimiter(cfg.CeloRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDEthereum] = convertToRateLimiter(cfg.EthereumRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDFantom] = convertToRateLimiter(cfg.FantomRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDKlaytn] = convertToRateLimiter(cfg.KlaytnRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDMoonbeam] = convertToRateLimiter(cfg.MoonbeamRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDOasis] = convertToRateLimiter(cfg.OasisRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDOptimism] = convertToRateLimiter(cfg.OptimismRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDPolygon] = convertToRateLimiter(cfg.PolygonRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDSolana] = convertToRateLimiter(cfg.SolanaRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDTerra2] = convertToRateLimiter(cfg.Terra2RequestsPerMinute) + rateLimitersByChain[sdk.ChainIDSui] = convertToRateLimiter(cfg.SuiRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDXpla] = convertToRateLimiter(cfg.XplaRequestsPerMinute) + + // Initialize the RPC base URLs for each chain + baseUrlsByChain = make(map[sdk.ChainID]string) + baseUrlsByChain[sdk.ChainIDArbitrum] = cfg.ArbitrumBaseUrl + baseUrlsByChain[sdk.ChainIDAlgorand] = cfg.AlgorandBaseUrl + baseUrlsByChain[sdk.ChainIDAptos] = cfg.AptosBaseUrl + baseUrlsByChain[sdk.ChainIDAvalanche] = cfg.AvalancheBaseUrl + baseUrlsByChain[sdk.ChainIDBSC] = cfg.BscBaseUrl + baseUrlsByChain[sdk.ChainIDCelo] = cfg.CeloBaseUrl + baseUrlsByChain[sdk.ChainIDEthereum] = cfg.EthereumBaseUrl + baseUrlsByChain[sdk.ChainIDFantom] = cfg.FantomBaseUrl + baseUrlsByChain[sdk.ChainIDKlaytn] = cfg.KlaytnBaseUrl + baseUrlsByChain[sdk.ChainIDMoonbeam] = cfg.MoonbeamBaseUrl + baseUrlsByChain[sdk.ChainIDOasis] = cfg.OasisBaseUrl + baseUrlsByChain[sdk.ChainIDOptimism] = cfg.OptimismBaseUrl + baseUrlsByChain[sdk.ChainIDPolygon] = cfg.PolygonBaseUrl + baseUrlsByChain[sdk.ChainIDSolana] = cfg.SolanaBaseUrl + baseUrlsByChain[sdk.ChainIDTerra2] = cfg.Terra2BaseUrl + baseUrlsByChain[sdk.ChainIDSui] = cfg.SuiBaseUrl + baseUrlsByChain[sdk.ChainIDXpla] = cfg.XplaBaseUrl +} + +func FetchTx( + ctx context.Context, + cfg *config.RpcProviderSettings, + chainId sdk.ChainID, + txHash string, +) (*TxDetail, error) { + + // Decide which RPC/API service to use based on chain ID + var fetchFunc func(ctx context.Context, rateLimiter *time.Ticker, baseUrl string, txHash string) (*TxDetail, error) + switch chainId { + case sdk.ChainIDSolana: + fetchFunc = fetchSolanaTx + case sdk.ChainIDAlgorand: + fetchFunc = fetchAlgorandTx + case sdk.ChainIDAptos: + fetchFunc = fetchAptosTx + case sdk.ChainIDSui: + fetchFunc = fetchSuiTx + case sdk.ChainIDTerra2, + sdk.ChainIDXpla: + fetchFunc = fetchCosmosTx + case sdk.ChainIDArbitrum, + sdk.ChainIDAvalanche, + sdk.ChainIDBSC, + sdk.ChainIDCelo, + sdk.ChainIDEthereum, + sdk.ChainIDFantom, + sdk.ChainIDKlaytn, + sdk.ChainIDMoonbeam, + sdk.ChainIDOasis, + sdk.ChainIDOptimism, + sdk.ChainIDPolygon: + fetchFunc = fetchEthTx + default: + return nil, ErrChainNotSupported + } + + // Get the rate limiter and base URL for the given chain ID + rateLimiter, ok := rateLimitersByChain[chainId] + if !ok { + return nil, fmt.Errorf("found no rate limiter for chain %s", chainId.String()) + } + baseUrl, ok := baseUrlsByChain[chainId] + if !ok { + return nil, fmt.Errorf("found no base URL for chain %s", chainId.String()) + } + + // Get transaction details from the RPC/API service + txDetail, err := fetchFunc(ctx, rateLimiter, baseUrl, txHash) + if err != nil { + return nil, fmt.Errorf("failed to retrieve tx information: %w", err) + } + + return txDetail, nil +} diff --git a/tx-tracker/chains/solanaRpc.go b/tx-tracker/chains/solanaRpc.go deleted file mode 100644 index 87e18d55..00000000 --- a/tx-tracker/chains/solanaRpc.go +++ /dev/null @@ -1,131 +0,0 @@ -package chains - -import ( - "context" - "encoding/hex" - "fmt" - "time" - - "github.com/ethereum/go-ethereum/rpc" - "github.com/mr-tron/base58" - - "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" -) - -type solanaTransactionSignature struct { - Signature string `json:"signature"` -} - -type solanaGetTransactionResponse struct { - BlockTime int64 `json:"blockTime"` - Meta solanaTransactionMeta `json:"meta"` - Transaction solanaTransaction `json:"transaction"` -} - -type solanaTransactionMeta struct { - InnerInstructions []solanaInnerInstruction `json:"innerInstructions"` - Err []interface{} `json:"err"` -} - -type solanaInnerInstruction struct { - Instructions []solanaInstruction `json:"instructions"` -} - -type solanaInstruction struct { - ParsedInstruction solanaParsedInstruction `json:"parsed"` -} - -type solanaParsedInstruction struct { - Type_ string `json:"type"` - Info solanaParsedInstructionInfo `json:"info"` -} - -type solanaParsedInstructionInfo struct { - Account string `json:"account"` - Amount string `json:"amount"` - Authority string `json:"authority"` - Destination string `json:"destination"` - Source string `json:"source"` -} - -type solanaTransaction struct { - Message solanaTransactionMessage `json:"message"` - Signatures []string `json:"signatures"` -} - -type solanaTransactionMessage struct { - AccountKeys []solanaAccountKey `json:"accountKeys"` -} - -type solanaAccountKey struct { - Pubkey string `json:"pubkey"` - Signer bool `json:"signer"` -} - -func fetchSolanaTx( - ctx context.Context, - cfg *config.RpcProviderSettings, - txHash string, -) (*TxDetail, error) { - - // Initialize RPC client - client, err := rpc.DialContext(ctx, cfg.SolanaBaseUrl) - if err != nil { - return nil, fmt.Errorf("failed to initialize RPC client: %w", err) - } - defer client.Close() - - // Decode txHash bytes - // TODO: remove this when the fly fixes all txHash for Solana - h, err := hex.DecodeString(txHash) - if err != nil { - h, err = base58.Decode(txHash) - if err != nil { - return nil, fmt.Errorf("failed to decode from hex txHash=%s: %w", txHash, err) - } - } - - // Get transaction signatures for the given account - var sigs []solanaTransactionSignature - err = client.CallContext(ctx, &sigs, "getSignaturesForAddress", base58.Encode(h)) - if err != nil { - return nil, fmt.Errorf("failed to get signatures for account: %w (%+v)", err, err) - } - if len(sigs) == 0 { - return nil, ErrTransactionNotFound - } - if len(sigs) > 1 { - return nil, fmt.Errorf("expected exactly one signature, but found %d", len(sigs)) - } - - // Fetch the portal token bridge transaction - var response solanaGetTransactionResponse - err = client.CallContext(ctx, &response, "getTransaction", sigs[0].Signature, "jsonParsed") - if err != nil { - return nil, fmt.Errorf("failed to get tx by signature: %w", err) - } - if len(response.Meta.InnerInstructions) == 0 { - return nil, fmt.Errorf("response.Meta.InnerInstructions is empty") - } - if len(response.Meta.InnerInstructions[0].Instructions) == 0 { - return nil, fmt.Errorf("response.Meta.InnerInstructions[0].Instructions is empty") - } - - // populate the response object - txDetail := TxDetail{ - Timestamp: time.Unix(response.BlockTime, 0).UTC(), - NativeTxHash: sigs[0].Signature, - } - - // set sender/receiver - for i := range response.Transaction.Message.AccountKeys { - if response.Transaction.Message.AccountKeys[i].Signer { - txDetail.From = response.Transaction.Message.AccountKeys[i].Pubkey - } - } - if txDetail.From == "" { - return nil, fmt.Errorf("failed to find source account") - } - - return &txDetail, nil -} diff --git a/tx-tracker/chains/tx.go b/tx-tracker/chains/tx.go deleted file mode 100644 index f0c0ff1a..00000000 --- a/tx-tracker/chains/tx.go +++ /dev/null @@ -1,242 +0,0 @@ -package chains - -import ( - "context" - "errors" - "fmt" - "io/ioutil" - "math" - "net/http" - "strconv" - "strings" - "time" - - "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" - "github.com/wormhole-foundation/wormhole/sdk/vaa" -) - -const requestTimeout = 30 * time.Second - -var ( - ErrChainNotSupported = errors.New("chain id not supported") - ErrTransactionNotFound = errors.New("transaction not found") -) - -type TxDetail struct { - // From is the address that signed the transaction, encoded in the chain's native format. - From string - // Timestamp indicates the time at which the transaction was confirmed. - Timestamp time.Time - // NativeTxHash contains the transaction hash, encoded in the chain's native format. - NativeTxHash string -} - -var tickers = struct { - algorand *time.Ticker - aptos *time.Ticker - arbitrum *time.Ticker - avalanche *time.Ticker - bsc *time.Ticker - celo *time.Ticker - ethereum *time.Ticker - fantom *time.Ticker - klaytn *time.Ticker - moonbeam *time.Ticker - oasis *time.Ticker - optimism *time.Ticker - polygon *time.Ticker - solana *time.Ticker - sui *time.Ticker - terra2 *time.Ticker - xpla *time.Ticker -}{} - -func Initialize(cfg *config.RpcProviderSettings) { - - // f converts "requests per minute" into the associated time.Duration - f := func(requestsPerMinute uint16) time.Duration { - - division := float64(time.Minute) / float64(time.Duration(requestsPerMinute)) - roundedUp := math.Ceil(division) - - return time.Duration(roundedUp) - } - - // these adapters send 1 request per txHash - tickers.algorand = time.NewTicker(f(cfg.AlgorandRequestsPerMinute)) - tickers.sui = time.NewTicker(f(cfg.SuiRequestsPerMinute)) - tickers.terra2 = time.NewTicker(f(cfg.Terra2RequestsPerMinute)) - tickers.xpla = time.NewTicker(f(cfg.XplaRequestsPerMinute)) - - // these adapters send 2 requests per txHash - tickers.aptos = time.NewTicker(f(cfg.AptosRequestsPerMinute / 2)) - tickers.arbitrum = time.NewTicker(f(cfg.ArbitrumRequestsPerMinute / 2)) - tickers.avalanche = time.NewTicker(f(cfg.AvalancheRequestsPerMinute / 2)) - tickers.bsc = time.NewTicker(f(cfg.BscRequestsPerMinute / 2)) - tickers.celo = time.NewTicker(f(cfg.CeloRequestsPerMinute / 2)) - tickers.ethereum = time.NewTicker(f(cfg.EthereumRequestsPerMinute / 2)) - tickers.fantom = time.NewTicker(f(cfg.FantomRequestsPerMinute / 2)) - tickers.klaytn = time.NewTicker(f(cfg.KlaytnRequestsPerMinute / 2)) - tickers.moonbeam = time.NewTicker(f(cfg.MoonbeamRequestsPerMinute / 2)) - tickers.oasis = time.NewTicker(f(cfg.OasisRequestsPerMinute / 2)) - tickers.optimism = time.NewTicker(f(cfg.OptimismRequestsPerMinute / 2)) - tickers.polygon = time.NewTicker(f(cfg.PolygonRequestsPerMinute / 2)) - tickers.solana = time.NewTicker(f(cfg.SolanaRequestsPerMinute / 2)) -} - -func FetchTx( - ctx context.Context, - cfg *config.RpcProviderSettings, - chainId vaa.ChainID, - txHash string, -) (*TxDetail, error) { - - var fetchFunc func(context.Context, *config.RpcProviderSettings, string) (*TxDetail, error) - var rateLimiter time.Ticker - - // decide which RPC/API service to use based on chain ID - switch chainId { - case vaa.ChainIDSolana: - fetchFunc = fetchSolanaTx - rateLimiter = *tickers.solana - case vaa.ChainIDAlgorand: - fetchFunc = fetchAlgorandTx - rateLimiter = *tickers.algorand - case vaa.ChainIDCelo: - fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { - return fetchEthTx(ctx, txHash, cfg.CeloBaseUrl) - } - rateLimiter = *tickers.celo - case vaa.ChainIDEthereum: - fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { - return fetchEthTx(ctx, txHash, cfg.EthereumBaseUrl) - } - rateLimiter = *tickers.ethereum - case vaa.ChainIDBSC: - fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { - return fetchEthTx(ctx, txHash, cfg.BscBaseUrl) - } - rateLimiter = *tickers.bsc - case vaa.ChainIDPolygon: - fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { - return fetchEthTx(ctx, txHash, cfg.PolygonBaseUrl) - } - rateLimiter = *tickers.polygon - case vaa.ChainIDFantom: - fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { - return fetchEthTx(ctx, txHash, cfg.FantomBaseUrl) - } - rateLimiter = *tickers.fantom - case vaa.ChainIDKlaytn: - fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { - return fetchEthTx(ctx, txHash, cfg.KlaytnBaseUrl) - } - rateLimiter = *tickers.fantom - case vaa.ChainIDArbitrum: - fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { - return fetchEthTx(ctx, txHash, cfg.ArbitrumBaseUrl) - } - rateLimiter = *tickers.arbitrum - case vaa.ChainIDOasis: - fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { - return fetchEthTx(ctx, txHash, cfg.OasisBaseUrl) - } - rateLimiter = *tickers.oasis - case vaa.ChainIDOptimism: - fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { - return fetchEthTx(ctx, txHash, cfg.OptimismBaseUrl) - } - rateLimiter = *tickers.optimism - case vaa.ChainIDAvalanche: - fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { - return fetchEthTx(ctx, txHash, cfg.AvalancheBaseUrl) - } - rateLimiter = *tickers.avalanche - case vaa.ChainIDMoonbeam: - fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { - return fetchEthTx(ctx, txHash, cfg.MoonbeamBaseUrl) - } - rateLimiter = *tickers.avalanche - case vaa.ChainIDAptos: - fetchFunc = fetchAptosTx - rateLimiter = *tickers.aptos - case vaa.ChainIDSui: - fetchFunc = fetchSuiTx - rateLimiter = *tickers.sui - case vaa.ChainIDTerra2: - fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { - return fetchCosmosTx(ctx, cfg.Terra2BaseUrl, txHash) - } - rateLimiter = *tickers.terra2 - case vaa.ChainIDXpla: - fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) { - return fetchCosmosTx(ctx, cfg.XplaBaseUrl, txHash) - } - rateLimiter = *tickers.xpla - default: - return nil, ErrChainNotSupported - } - - // wait for rate limit - fail fast if context was cancelled - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-rateLimiter.C: - } - - // get transaction details from the RPC/API service - subContext, cancelFunc := context.WithTimeout(ctx, requestTimeout) - defer cancelFunc() - txDetail, err := fetchFunc(subContext, cfg, txHash) - if err != nil { - return nil, fmt.Errorf("failed to retrieve tx information: %w", err) - } - - return txDetail, nil -} - -// timestampFromHex converts a hex timestamp into a `time.Time` value. -func timestampFromHex(s string) (time.Time, error) { - - // remove the leading "0x" or "0X" from the hex string - hexDigits := strings.Replace(s, "0x", "", 1) - hexDigits = strings.Replace(hexDigits, "0X", "", 1) - - // parse the hex digits into an integer - epoch, err := strconv.ParseInt(hexDigits, 16, 64) - if err != nil { - return time.Time{}, fmt.Errorf("failed to parse hex timestamp: %w", err) - } - - // convert the unix epoch into a `time.Time` value - timestamp := time.Unix(epoch, 0).UTC() - return timestamp, nil -} - -// httpGet is a helper function that performs an HTTP request. -func httpGet(ctx context.Context, url string) ([]byte, error) { - - // Build the HTTP request - request, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) - } - - // Send it - var client http.Client - response, err := client.Do(request) - if err != nil { - return nil, fmt.Errorf("failed to query url: %w", err) - } - defer response.Body.Close() - if response.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected HTTP status code: %d", response.StatusCode) - } - - // Read the response body and return - body, err := ioutil.ReadAll(response.Body) - if err != nil { - return nil, fmt.Errorf("failed to read response body: %w", err) - } - return body, nil -} diff --git a/tx-tracker/chains/util.go b/tx-tracker/chains/util.go new file mode 100644 index 00000000..d7c00914 --- /dev/null +++ b/tx-tracker/chains/util.go @@ -0,0 +1,110 @@ +package chains + +import ( + "context" + "fmt" + "io/ioutil" + "net/http" + "strconv" + "strings" + "time" + + "github.com/ethereum/go-ethereum/rpc" +) + +// timestampFromHex converts a hex timestamp into a `time.Time` value. +func timestampFromHex(s string) (time.Time, error) { + + // remove the leading "0x" or "0X" from the hex string + hexDigits := strings.Replace(s, "0x", "", 1) + hexDigits = strings.Replace(hexDigits, "0X", "", 1) + + // parse the hex digits into an integer + epoch, err := strconv.ParseInt(hexDigits, 16, 64) + if err != nil { + return time.Time{}, fmt.Errorf("failed to parse hex timestamp: %w", err) + } + + // convert the unix epoch into a `time.Time` value + timestamp := time.Unix(epoch, 0).UTC() + return timestamp, nil +} + +// httpGet is a helper function that performs an HTTP request. +func httpGet(ctx context.Context, rateLimiter *time.Ticker, url string) ([]byte, error) { + + // Wait for the rate limiter + if !waitForRateLimiter(ctx, rateLimiter) { + return nil, ctx.Err() + } + + // Build the HTTP request + request, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + // Send it + var client http.Client + response, err := client.Do(request) + if err != nil { + return nil, fmt.Errorf("failed to query url: %w", err) + } + defer response.Body.Close() + if response.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected HTTP status code: %d", response.StatusCode) + } + + // Read the response body and return + body, err := ioutil.ReadAll(response.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + return body, nil +} + +func waitForRateLimiter(ctx context.Context, t *time.Ticker) bool { + select { + case <-t.C: + return true + case <-ctx.Done(): + return false + } +} + +// rateLimitedRpcClient is a wrapper around `rpc.Client` that adds rate limits +type rateLimitedRpcClient struct { + client *rpc.Client +} + +func rpcDialContext(ctx context.Context, url string) (*rateLimitedRpcClient, error) { + + client, err := rpc.DialContext(ctx, url) + if err != nil { + return nil, err + } + + tmp := rateLimitedRpcClient{ + client: client, + } + return &tmp, nil +} + +func (c *rateLimitedRpcClient) CallContext( + ctx context.Context, + rateLimiter *time.Ticker, + result interface{}, + method string, + args ...interface{}, +) error { + + if !waitForRateLimiter(ctx, rateLimiter) { + return ctx.Err() + } + + return c.client.CallContext(ctx, result, method, args...) +} + +func (c *rateLimitedRpcClient) Close() { + c.client.Close() +} diff --git a/tx-tracker/cmd/backfiller/main.go b/tx-tracker/cmd/backfiller/main.go index 298e3a79..1eb3b561 100644 --- a/tx-tracker/cmd/backfiller/main.go +++ b/tx-tracker/cmd/backfiller/main.go @@ -249,14 +249,6 @@ type consumerParams struct { // - the channel is closed (i.e.: no more items to process) func consume(ctx context.Context, params *consumerParams) { - // Initialize the client, which processes source Txs. - client := consumer.New( - nil, - params.rpcProviderSettings, - params.logger, - params.repository, - ) - // Main loop: fetch global txs and process them for { select { @@ -306,7 +298,7 @@ func consume(ctx context.Context, params *consumerParams) { Sequence: v.Sequence, TxHash: *v.TxHash, } - err := client.ProcessSourceTx(ctx, &p) + err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcProviderSettings, params.repository, &p) if err != nil { params.logger.Error("Failed to track source tx", zap.String("vaaId", globalTx.Id), diff --git a/tx-tracker/cmd/service/main.go b/tx-tracker/cmd/service/main.go index 7e573a5b..46e83be7 100644 --- a/tx-tracker/cmd/service/main.go +++ b/tx-tracker/cmd/service/main.go @@ -65,7 +65,7 @@ func main() { // create and start a consumer. vaaConsumeFunc := newVAAConsumeFunc(rootCtx, cfg, logger) repository := consumer.NewRepository(logger, db) - consumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, logger, repository) + consumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, rootCtx, logger, repository) consumer.Start(rootCtx) logger.Info("Started wormhole-explorer-tx-tracker") @@ -81,9 +81,9 @@ func main() { } // graceful shutdown - logger.Info("Cancelling root context ...") + logger.Info("Cancelling root context...") rootCtxCancel() - logger.Info("Closing Http server ...") + logger.Info("Closing Http server...") server.Stop() logger.Info("Terminated wormhole-explorer-tx-tracker") } @@ -114,7 +114,14 @@ func newSqsConsumer(ctx context.Context, cfg *config.ServiceSettings) (*sqs.Cons awsconfig, cfg.SqsUrl, sqs.WithMaxMessages(10), - sqs.WithVisibilityTimeout(120), + // We're setting a high visibility timeout to decrease the likelihood of a + // message being processed more than once. + // + // This is particularly relevant for the cases in which we receive a burst + // of traffic (e.g.: dozens of VAAs being emitted in the same minute), and + // also when a we have to retry fetching transaction metadata many times + // (due to finality delay, out-of-sync nodes, etc). + sqs.WithVisibilityTimeout(15*60), ) return consumer, err } diff --git a/tx-tracker/consumer/consumer.go b/tx-tracker/consumer/consumer.go index 2c2e902f..6c894f3f 100644 --- a/tx-tracker/consumer/consumer.go +++ b/tx-tracker/consumer/consumer.go @@ -4,9 +4,6 @@ import ( "context" "time" - "github.com/wormhole-foundation/wormhole-explorer/common/domain" - - "github.com/wormhole-foundation/wormhole-explorer/txtracker/chains" "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" "github.com/wormhole-foundation/wormhole-explorer/txtracker/queue" sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" @@ -14,8 +11,8 @@ import ( ) const ( - numRetries = 2 - retryDelay = 10 * time.Second + maxAttempts = 5 + retryDelay = 60 * time.Second ) // Consumer consumer struct definition. @@ -24,21 +21,26 @@ type Consumer struct { rpcServiceProviderSettings *config.RpcProviderSettings logger *zap.Logger repository *Repository + workerPool *WorkerPool } // New creates a new vaa consumer. func New( consumeFunc queue.VAAConsumeFunc, rpcServiceProviderSettings *config.RpcProviderSettings, + ctx context.Context, logger *zap.Logger, repository *Repository, ) *Consumer { + workerPool := NewWorkerPool(ctx, logger, rpcServiceProviderSettings, repository) + c := Consumer{ consumeFunc: consumeFunc, rpcServiceProviderSettings: rpcServiceProviderSettings, logger: logger, repository: repository, + workerPool: workerPool, } return &c @@ -46,110 +48,40 @@ func New( // Start consumes messages from VAA queue, parse and store those messages in a repository. func (c *Consumer) Start(ctx context.Context) { - go func() { - for msg := range c.consumeFunc(ctx) { - event := msg.Data() - - // Check if message is expired. - if msg.IsExpired() { - c.logger.Warn("Message with VAA expired", zap.String("id", event.ID)) - msg.Failed() - continue - } - - // Do not process messages from PythNet - if event.ChainID == sdk.ChainIDPythNet { - msg.Done() - continue - } - - // Fetch tx details from the corresponding RPC/API, then persist them on MongoDB. - p := ProcessSourceTxParams{ - VaaId: event.ID, - ChainId: event.ChainID, - Emitter: event.EmitterAddress, - Sequence: event.Sequence, - TxHash: event.TxHash, - } - err := c.ProcessSourceTx(ctx, &p) - if err == chains.ErrChainNotSupported { - c.logger.Debug("Skipping VAA - chain not supported", - zap.String("vaaId", event.ID), - ) - } else if err != nil { - c.logger.Error("Failed to upsert source transaction details", - zap.String("vaaId", event.ID), - zap.Error(err), - ) - } else { - c.logger.Debug("Updated source transaction details in the database", - zap.String("id", event.ID), - ) - } - - msg.Done() - } - }() + go c.producerLoop(ctx) } -// ProcessSourceTxParams is a struct that contains the parameters for the ProcessSourceTx method. -type ProcessSourceTxParams struct { - ChainId sdk.ChainID - VaaId string - Emitter string - Sequence string - TxHash string -} +func (c *Consumer) producerLoop(ctx context.Context) { -func (c *Consumer) ProcessSourceTx( - ctx context.Context, - params *ProcessSourceTxParams, -) error { + ch := c.consumeFunc(ctx) - // Get transaction details from the emitter blockchain - // - // If the transaction is not found, will retry a few times before giving up. - var txStatus domain.SourceTxStatus - var txDetail *chains.TxDetail - var err error - for attempts := numRetries; attempts > 0; attempts-- { + for msg := range ch { - txDetail, err = chains.FetchTx(ctx, c.rpcServiceProviderSettings, params.ChainId, params.TxHash) + event := msg.Data() - switch { - // If the transaction is not found, retry after a delay - case err == chains.ErrTransactionNotFound: - txStatus = domain.SourceTxStatusInternalError - time.Sleep(retryDelay) + // Check if message is expired. + if msg.IsExpired() { + c.logger.Warn("Message with VAA expired", zap.String("id", event.ID)) + msg.Failed() continue + } - // If the chain ID is not supported, we're done. - case err == chains.ErrChainNotSupported: - return err + // Do not process messages from PythNet + if event.ChainID == sdk.ChainIDPythNet { + msg.Done() + continue + } - // If there is an internal error, give up - case err != nil: - c.logger.Error("Failed to fetch source transaction details", - zap.String("vaaId", params.VaaId), + // Send the VAA to the worker pool. + // + // The worker pool is responsible for calling `msg.Done()` + err := c.workerPool.Push(ctx, msg) + if err != nil { + c.logger.Warn("failed to push message into worker pool", + zap.String("vaaId", event.ID), zap.Error(err), ) - txStatus = domain.SourceTxStatusInternalError - break - - // Success - case err == nil: - txStatus = domain.SourceTxStatusConfirmed - break + msg.Failed() } } - - // Store source transaction details in the database - p := UpsertDocumentParams{ - VaaId: params.VaaId, - ChainId: params.ChainId, - TxHash: params.TxHash, - TxDetail: txDetail, - TxStatus: txStatus, - } - return c.repository.UpsertDocument(ctx, &p) } diff --git a/tx-tracker/consumer/processor.go b/tx-tracker/consumer/processor.go new file mode 100644 index 00000000..06ee2745 --- /dev/null +++ b/tx-tracker/consumer/processor.go @@ -0,0 +1,88 @@ +package consumer + +import ( + "context" + "errors" + "time" + + "github.com/wormhole-foundation/wormhole-explorer/common/domain" + "github.com/wormhole-foundation/wormhole-explorer/txtracker/chains" + "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.uber.org/zap" +) + +// ProcessSourceTxParams is a struct that contains the parameters for the ProcessSourceTx method. +type ProcessSourceTxParams struct { + ChainId sdk.ChainID + VaaId string + Emitter string + Sequence string + TxHash string +} + +func ProcessSourceTx( + ctx context.Context, + logger *zap.Logger, + rpcServiceProviderSettings *config.RpcProviderSettings, + repository *Repository, + params *ProcessSourceTxParams, +) error { + + // Get transaction details from the emitter blockchain + // + // If the transaction is not found, will retry a few times before giving up. + var txStatus domain.SourceTxStatus + var txDetail *chains.TxDetail + var err error + for attempts := 1; attempts <= maxAttempts; attempts++ { + + txDetail, err = chains.FetchTx(ctx, rpcServiceProviderSettings, params.ChainId, params.TxHash) + + switch { + // If the transaction is not found, retry after a delay + case errors.Is(err, chains.ErrTransactionNotFound): + txStatus = domain.SourceTxStatusInternalError + logger.Warn("transaction not found, will retry after a delay", + zap.String("vaaId", params.VaaId), + zap.Duration("retryDelay", retryDelay), + zap.Int("attempts", attempts), + zap.Int("maxAttempts", maxAttempts), + ) + time.Sleep(retryDelay) + continue + + // If the chain ID is not supported, we're done. + case errors.Is(err, chains.ErrChainNotSupported): + return err + + // If the context was cancelled, do not attempt to save the result on the database + case errors.Is(err, context.Canceled): + return err + + // If there is an internal error, give up + case err != nil: + logger.Error("Failed to fetch source transaction details", + zap.String("vaaId", params.VaaId), + zap.Error(err), + ) + txStatus = domain.SourceTxStatusInternalError + break + + // Success + case err == nil: + txStatus = domain.SourceTxStatusConfirmed + break + } + } + + // Store source transaction details in the database + p := UpsertDocumentParams{ + VaaId: params.VaaId, + ChainId: params.ChainId, + TxHash: params.TxHash, + TxDetail: txDetail, + TxStatus: txStatus, + } + return repository.UpsertDocument(ctx, &p) +} diff --git a/tx-tracker/consumer/workerpool.go b/tx-tracker/consumer/workerpool.go new file mode 100644 index 00000000..2f6c5de0 --- /dev/null +++ b/tx-tracker/consumer/workerpool.go @@ -0,0 +1,129 @@ +package consumer + +import ( + "context" + "fmt" + "sync" + + "github.com/wormhole-foundation/wormhole-explorer/txtracker/chains" + "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" + "github.com/wormhole-foundation/wormhole-explorer/txtracker/queue" + "go.uber.org/zap" +) + +const numWorkers = 500 + +// WorkerPool is an abstraction to process VAAs concurrently. +type WorkerPool struct { + wg sync.WaitGroup + chInput chan queue.ConsumerMessage + ctx context.Context + logger *zap.Logger + rpcProviderSettings *config.RpcProviderSettings + repository *Repository +} + +// NewWorkerPool creates a new worker pool. +func NewWorkerPool( + ctx context.Context, + logger *zap.Logger, + rpcProviderSettings *config.RpcProviderSettings, + repository *Repository, +) *WorkerPool { + + w := WorkerPool{ + chInput: make(chan queue.ConsumerMessage), + ctx: ctx, + logger: logger, + rpcProviderSettings: rpcProviderSettings, + repository: repository, + } + + // Spawn worker goroutines + for i := 0; i < numWorkers; i++ { + w.wg.Add(1) + go w.consumerLoop() + } + + return &w +} + +// Push sends a new item to the worker pool. +// +// This function will block until either a worker is available or the context is cancelled. +func (w *WorkerPool) Push(ctx context.Context, msg queue.ConsumerMessage) error { + + select { + case w.chInput <- msg: + return nil + case <-ctx.Done(): + return fmt.Errorf("failed to push message into worker pool due to calcelled context: %w", ctx.Err()) + } +} + +// StopGracefully stops the worker pool gracefully. +// +// This function blocks until the consumer queue is empty. +func (w *WorkerPool) StopGracefully() { + + // Close the producer channel. + // This will stop sending items to the workers. + // After all items are consumed, the workers will exit. + close(w.chInput) + w.chInput = nil + + // Wait for all workers to finish gracefully + w.wg.Wait() +} + +// consumerLoop is the main loop of a worker. +// +// It will consume items from the input channel until the channel is closed or the context is cancelled. +func (w *WorkerPool) consumerLoop() { + for { + select { + case msg, ok := <-w.chInput: + if !ok { + w.wg.Done() + return + } + w.process(msg) + + case <-w.ctx.Done(): + w.wg.Done() + return + } + } +} + +// process consumes a single item from the input channel. +func (w *WorkerPool) process(msg queue.ConsumerMessage) { + + event := msg.Data() + + p := ProcessSourceTxParams{ + VaaId: event.ID, + ChainId: event.ChainID, + Emitter: event.EmitterAddress, + Sequence: event.Sequence, + TxHash: event.TxHash, + } + err := ProcessSourceTx(w.ctx, w.logger, w.rpcProviderSettings, w.repository, &p) + + if err == chains.ErrChainNotSupported { + w.logger.Debug("Skipping VAA - chain not supported", + zap.String("vaaId", event.ID), + ) + } else if err != nil { + w.logger.Error("Failed to upsert source transaction details", + zap.String("vaaId", event.ID), + zap.Error(err), + ) + } else { + w.logger.Debug("Updated source transaction details in the database", + zap.String("id", event.ID), + ) + } + + msg.Done() +}