Add retry logic to the `tx-tracker` service (#480)

### Summary

Tracking issue: https://github.com/wormhole-foundation/wormhole-explorer/issues/415

When the `tx-tracker` service processes a VAA, sometimes the transaction is not finalized (e.g.:for chains with slow finality, VAAs with low consistency level, out-of-sync nodes). This problem was observed on chains like Solana, Ethereum and Polygon, but could have happened on other chains.

This pull request adds the logic needed to wait and retry processing the VAA, hoping that the originating transaction reaches finality.

Also, the `tx-tracker` service now uses a worker pool to process transactions, which will lead to higher throughput. This will benefit the WormholeScan UI, specially in the case when a large number of VAAs are emitted in a short period of time.
This commit is contained in:
agodnic 2023-07-04 15:25:08 -03:00 committed by GitHub
parent b16f1739d0
commit 8ad4d6276f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 702 additions and 540 deletions

View File

@ -5,8 +5,6 @@ import (
"encoding/json"
"fmt"
"time"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
)
type algorandTransactionResponse struct {
@ -19,22 +17,25 @@ type algorandTransactionResponse struct {
func fetchAlgorandTx(
ctx context.Context,
cfg *config.RpcProviderSettings,
rateLimiter *time.Ticker,
baseUrl string,
txHash string,
) (*TxDetail, error) {
// Fetch tx data from the Algorand Indexer API
url := fmt.Sprintf("%s/v2/transactions/%s", cfg.AlgorandBaseUrl, txHash)
fmt.Println(url)
body, err := httpGet(ctx, url)
if err != nil {
return nil, fmt.Errorf("HTTP request to Algorand transactions endpoint failed: %w", err)
}
// Decode the response
// Call the transaction endpoint of the Algorand Indexer REST API
var response algorandTransactionResponse
if err := json.Unmarshal(body, &response); err != nil {
return nil, fmt.Errorf("failed to decode Algorand transactions response as JSON: %w", err)
{
// Perform the HTTP request
url := fmt.Sprintf("%s/v2/transactions/%s", baseUrl, txHash)
body, err := httpGet(ctx, rateLimiter, url)
if err != nil {
return nil, fmt.Errorf("HTTP request to Algorand transactions endpoint failed: %w", err)
}
// Decode the response
if err := json.Unmarshal(body, &response); err != nil {
return nil, fmt.Errorf("failed to decode Algorand transactions response as JSON: %w", err)
}
}
// Populate the result struct and return

View File

@ -6,8 +6,6 @@ import (
"fmt"
"strconv"
"time"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
)
const (
@ -26,7 +24,8 @@ type aptosTx struct {
func fetchAptosTx(
ctx context.Context,
cfg *config.RpcProviderSettings,
rateLimiter *time.Ticker,
baseUrl string,
txHash string,
) (*TxDetail, error) {
@ -41,14 +40,14 @@ func fetchAptosTx(
{
// Build the URI for the events endpoint
uri := fmt.Sprintf("%s/accounts/%s/events/%s::state::WormholeMessageHandle/event?start=%d&limit=1",
cfg.AptosBaseUrl,
baseUrl,
aptosCoreContractAddress,
aptosCoreContractAddress,
creationNumber,
)
// Query the events endpoint
body, err := httpGet(ctx, uri)
body, err := httpGet(ctx, rateLimiter, uri)
if err != nil {
return nil, fmt.Errorf("failed to query events endpoint: %w", err)
}
@ -67,10 +66,10 @@ func fetchAptosTx(
var tx aptosTx
{
// Build the URI for the events endpoint
uri := fmt.Sprintf("%s/transactions/by_version/%d", cfg.AptosBaseUrl, events[0].Version)
uri := fmt.Sprintf("%s/transactions/by_version/%d", baseUrl, events[0].Version)
// Query the events endpoint
body, err := httpGet(ctx, uri)
body, err := httpGet(ctx, rateLimiter, uri)
if err != nil {
return nil, fmt.Errorf("failed to query transactions endpoint: %w", err)
}

View File

@ -29,21 +29,25 @@ type cosmosTxsResponse struct {
func fetchCosmosTx(
ctx context.Context,
baseUri string,
rateLimiter *time.Ticker,
baseUrl string,
txHash string,
) (*TxDetail, error) {
// Query the Cosmos transaction endpoint
uri := fmt.Sprintf("%s/cosmos/tx/v1beta1/txs/%s", baseUri, txHash)
body, err := httpGet(ctx, uri)
if err != nil {
return nil, fmt.Errorf("failed to query cosmos tx endpoint: %w", err)
}
// Deserialize response body
// Call the transaction endpoint of the cosmos REST API
var response cosmosTxsResponse
if err := json.Unmarshal(body, &response); err != nil {
return nil, fmt.Errorf("failed to deserialize cosmos tx response: %w", err)
{
// Perform the HTTP request
uri := fmt.Sprintf("%s/cosmos/tx/v1beta1/txs/%s", baseUrl, txHash)
body, err := httpGet(ctx, rateLimiter, uri)
if err != nil {
return nil, fmt.Errorf("failed to query cosmos tx endpoint: %w", err)
}
// Deserialize response body
if err := json.Unmarshal(body, &response); err != nil {
return nil, fmt.Errorf("failed to deserialize cosmos tx response: %w", err)
}
}
// Find the sender address

View File

@ -4,8 +4,7 @@ import (
"context"
"fmt"
"strings"
"github.com/ethereum/go-ethereum/rpc"
"time"
)
type ethGetTransactionByHashResponse struct {
@ -22,12 +21,13 @@ type ethGetBlockByHashResponse struct {
func fetchEthTx(
ctx context.Context,
txHash string,
rateLimiter *time.Ticker,
baseUrl string,
txHash string,
) (*TxDetail, error) {
// initialize RPC client
client, err := rpc.DialContext(ctx, baseUrl)
client, err := rpcDialContext(ctx, baseUrl)
if err != nil {
return nil, fmt.Errorf("failed to initialize RPC client: %w", err)
}
@ -35,23 +35,27 @@ func fetchEthTx(
// query transaction data
var txReply ethGetTransactionByHashResponse
err = client.CallContext(ctx, &txReply, "eth_getTransactionByHash", "0x"+txHash)
if err != nil {
return nil, fmt.Errorf("failed to get tx by hash: %w", err)
}
if txReply.BlockHash == "" || txReply.From == "" {
return nil, ErrTransactionNotFound
{
err = client.CallContext(ctx, rateLimiter, &txReply, "eth_getTransactionByHash", "0x"+txHash)
if err != nil {
return nil, fmt.Errorf("failed to get tx by hash: %w", err)
}
if txReply.BlockHash == "" || txReply.From == "" {
return nil, ErrTransactionNotFound
}
}
// query block data
blkParams := []interface{}{
txReply.BlockHash, // tx hash
false, // include transactions?
}
var blkReply ethGetBlockByHashResponse
err = client.CallContext(ctx, &blkReply, "eth_getBlockByHash", blkParams...)
if err != nil {
return nil, fmt.Errorf("failed to get block by hash: %w", err)
{
blkParams := []interface{}{
txReply.BlockHash, // tx hash
false, // include transactions?
}
err = client.CallContext(ctx, rateLimiter, &blkReply, "eth_getBlockByHash", blkParams...)
if err != nil {
return nil, fmt.Errorf("failed to get block by hash: %w", err)
}
}
// parse transaction timestamp

View File

@ -0,0 +1,118 @@
package chains
import (
"context"
"encoding/hex"
"fmt"
"time"
"github.com/mr-tron/base58"
)
type solanaTransactionSignature struct {
Signature string `json:"signature"`
}
type solanaGetTransactionResponse struct {
BlockTime int64 `json:"blockTime"`
Meta struct {
InnerInstructions []struct {
Instructions []struct {
ParsedInstruction struct {
Type_ string `json:"type"`
Info struct {
Account string `json:"account"`
Amount string `json:"amount"`
Authority string `json:"authority"`
Destination string `json:"destination"`
Source string `json:"source"`
} `json:"info"`
} `json:"parsed"`
} `json:"instructions"`
} `json:"innerInstructions"`
Err []interface{} `json:"err"`
} `json:"meta"`
Transaction struct {
Message struct {
AccountKeys []struct {
Pubkey string `json:"pubkey"`
Signer bool `json:"signer"`
} `json:"accountKeys"`
} `json:"message"`
Signatures []string `json:"signatures"`
} `json:"transaction"`
}
func fetchSolanaTx(
ctx context.Context,
rateLimiter *time.Ticker,
baseUrl string,
txHash string,
) (*TxDetail, error) {
// Initialize RPC client
client, err := rpcDialContext(ctx, baseUrl)
if err != nil {
return nil, fmt.Errorf("failed to initialize RPC client: %w", err)
}
defer client.Close()
// Decode txHash bytes
// TODO: remove this when the fly fixes all txHash for Solana
h, err := hex.DecodeString(txHash)
if err != nil {
h, err = base58.Decode(txHash)
if err != nil {
return nil, fmt.Errorf("failed to decode from hex txHash=%s: %w", txHash, err)
}
}
// Get transaction signatures for the given account
var sigs []solanaTransactionSignature
{
err = client.CallContext(ctx, rateLimiter, &sigs, "getSignaturesForAddress", base58.Encode(h))
if err != nil {
return nil, fmt.Errorf("failed to get signatures for account: %w (%+v)", err, err)
}
if len(sigs) == 0 {
return nil, ErrTransactionNotFound
}
if len(sigs) > 1 {
return nil, fmt.Errorf("expected exactly one signature, but found %d", len(sigs))
}
}
// Fetch the portal token bridge transaction
var response solanaGetTransactionResponse
{
err = client.CallContext(ctx, rateLimiter, &response, "getTransaction", sigs[0].Signature, "jsonParsed")
if err != nil {
return nil, fmt.Errorf("failed to get tx by signature: %w", err)
}
if len(response.Meta.InnerInstructions) == 0 {
return nil, fmt.Errorf("response.Meta.InnerInstructions is empty")
}
if len(response.Meta.InnerInstructions[0].Instructions) == 0 {
return nil, fmt.Errorf("response.Meta.InnerInstructions[0].Instructions is empty")
}
}
// populate the response object
txDetail := TxDetail{
Timestamp: time.Unix(response.BlockTime, 0).UTC(),
NativeTxHash: sigs[0].Signature,
}
// set sender/receiver
for i := range response.Transaction.Message.AccountKeys {
if response.Transaction.Message.AccountKeys[i].Signer {
txDetail.From = response.Transaction.Message.AccountKeys[i].Pubkey
}
}
if txDetail.From == "" {
return nil, fmt.Errorf("failed to find source account")
}
return &txDetail, nil
}

View File

@ -6,7 +6,6 @@ import (
"time"
"github.com/ethereum/go-ethereum/rpc"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
)
type suiGetTransactionBlockResponse struct {
@ -30,12 +29,13 @@ type suiGetTransactionBlockOpts struct {
func fetchSuiTx(
ctx context.Context,
cfg *config.RpcProviderSettings,
rateLimiter *time.Ticker,
baseUrl string,
txHash string,
) (*TxDetail, error) {
// Initialize RPC client
client, err := rpc.DialContext(ctx, cfg.SuiBaseUrl)
client, err := rpc.DialContext(ctx, baseUrl)
if err != nil {
return nil, fmt.Errorf("failed to initialize RPC client: %w", err)
}
@ -43,10 +43,18 @@ func fetchSuiTx(
// Query transaction data
var reply suiGetTransactionBlockResponse
opts := suiGetTransactionBlockOpts{ShowInput: true}
err = client.CallContext(ctx, &reply, "sui_getTransactionBlock", txHash, opts)
if err != nil {
return nil, fmt.Errorf("failed to get tx by hash: %w", err)
{
// Wait for the rate limiter
if !waitForRateLimiter(ctx, rateLimiter) {
return nil, ctx.Err()
}
// Execute the remote procedure call
opts := suiGetTransactionBlockOpts{ShowInput: true}
err = client.CallContext(ctx, &reply, "sui_getTransactionBlock", txHash, opts)
if err != nil {
return nil, fmt.Errorf("failed to get tx by hash: %w", err)
}
}
// Populate the response struct and return

143
tx-tracker/chains/chains.go Normal file
View File

@ -0,0 +1,143 @@
package chains
import (
"context"
"errors"
"fmt"
"math"
"time"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
)
var (
ErrChainNotSupported = errors.New("chain id not supported")
ErrTransactionNotFound = errors.New("transaction not found")
)
var (
// rateLimitersByChain maps a chain ID to the request rate limiter for that chain.
rateLimitersByChain map[sdk.ChainID]*time.Ticker
// baseUrlsByChain maps a chain ID to the base URL of the RPC/API service for that chain.
baseUrlsByChain map[sdk.ChainID]string
)
type TxDetail struct {
// From is the address that signed the transaction, encoded in the chain's native format.
From string
// Timestamp indicates the time at which the transaction was confirmed.
Timestamp time.Time
// NativeTxHash contains the transaction hash, encoded in the chain's native format.
NativeTxHash string
}
func Initialize(cfg *config.RpcProviderSettings) {
// convertToRateLimiter converts "requests per minute" into the associated *time.Ticker
convertToRateLimiter := func(requestsPerMinute uint16) *time.Ticker {
division := float64(time.Minute) / float64(time.Duration(requestsPerMinute))
roundedUp := math.Ceil(division)
duration := time.Duration(roundedUp)
return time.NewTicker(duration)
}
// Initialize rate limiters for each chain
rateLimitersByChain = make(map[sdk.ChainID]*time.Ticker)
rateLimitersByChain[sdk.ChainIDArbitrum] = convertToRateLimiter(cfg.ArbitrumRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDAlgorand] = convertToRateLimiter(cfg.AlgorandRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDAptos] = convertToRateLimiter(cfg.AptosRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDAvalanche] = convertToRateLimiter(cfg.AvalancheRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDBSC] = convertToRateLimiter(cfg.BscRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDCelo] = convertToRateLimiter(cfg.CeloRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDEthereum] = convertToRateLimiter(cfg.EthereumRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDFantom] = convertToRateLimiter(cfg.FantomRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDKlaytn] = convertToRateLimiter(cfg.KlaytnRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDMoonbeam] = convertToRateLimiter(cfg.MoonbeamRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDOasis] = convertToRateLimiter(cfg.OasisRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDOptimism] = convertToRateLimiter(cfg.OptimismRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDPolygon] = convertToRateLimiter(cfg.PolygonRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDSolana] = convertToRateLimiter(cfg.SolanaRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDTerra2] = convertToRateLimiter(cfg.Terra2RequestsPerMinute)
rateLimitersByChain[sdk.ChainIDSui] = convertToRateLimiter(cfg.SuiRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDXpla] = convertToRateLimiter(cfg.XplaRequestsPerMinute)
// Initialize the RPC base URLs for each chain
baseUrlsByChain = make(map[sdk.ChainID]string)
baseUrlsByChain[sdk.ChainIDArbitrum] = cfg.ArbitrumBaseUrl
baseUrlsByChain[sdk.ChainIDAlgorand] = cfg.AlgorandBaseUrl
baseUrlsByChain[sdk.ChainIDAptos] = cfg.AptosBaseUrl
baseUrlsByChain[sdk.ChainIDAvalanche] = cfg.AvalancheBaseUrl
baseUrlsByChain[sdk.ChainIDBSC] = cfg.BscBaseUrl
baseUrlsByChain[sdk.ChainIDCelo] = cfg.CeloBaseUrl
baseUrlsByChain[sdk.ChainIDEthereum] = cfg.EthereumBaseUrl
baseUrlsByChain[sdk.ChainIDFantom] = cfg.FantomBaseUrl
baseUrlsByChain[sdk.ChainIDKlaytn] = cfg.KlaytnBaseUrl
baseUrlsByChain[sdk.ChainIDMoonbeam] = cfg.MoonbeamBaseUrl
baseUrlsByChain[sdk.ChainIDOasis] = cfg.OasisBaseUrl
baseUrlsByChain[sdk.ChainIDOptimism] = cfg.OptimismBaseUrl
baseUrlsByChain[sdk.ChainIDPolygon] = cfg.PolygonBaseUrl
baseUrlsByChain[sdk.ChainIDSolana] = cfg.SolanaBaseUrl
baseUrlsByChain[sdk.ChainIDTerra2] = cfg.Terra2BaseUrl
baseUrlsByChain[sdk.ChainIDSui] = cfg.SuiBaseUrl
baseUrlsByChain[sdk.ChainIDXpla] = cfg.XplaBaseUrl
}
func FetchTx(
ctx context.Context,
cfg *config.RpcProviderSettings,
chainId sdk.ChainID,
txHash string,
) (*TxDetail, error) {
// Decide which RPC/API service to use based on chain ID
var fetchFunc func(ctx context.Context, rateLimiter *time.Ticker, baseUrl string, txHash string) (*TxDetail, error)
switch chainId {
case sdk.ChainIDSolana:
fetchFunc = fetchSolanaTx
case sdk.ChainIDAlgorand:
fetchFunc = fetchAlgorandTx
case sdk.ChainIDAptos:
fetchFunc = fetchAptosTx
case sdk.ChainIDSui:
fetchFunc = fetchSuiTx
case sdk.ChainIDTerra2,
sdk.ChainIDXpla:
fetchFunc = fetchCosmosTx
case sdk.ChainIDArbitrum,
sdk.ChainIDAvalanche,
sdk.ChainIDBSC,
sdk.ChainIDCelo,
sdk.ChainIDEthereum,
sdk.ChainIDFantom,
sdk.ChainIDKlaytn,
sdk.ChainIDMoonbeam,
sdk.ChainIDOasis,
sdk.ChainIDOptimism,
sdk.ChainIDPolygon:
fetchFunc = fetchEthTx
default:
return nil, ErrChainNotSupported
}
// Get the rate limiter and base URL for the given chain ID
rateLimiter, ok := rateLimitersByChain[chainId]
if !ok {
return nil, fmt.Errorf("found no rate limiter for chain %s", chainId.String())
}
baseUrl, ok := baseUrlsByChain[chainId]
if !ok {
return nil, fmt.Errorf("found no base URL for chain %s", chainId.String())
}
// Get transaction details from the RPC/API service
txDetail, err := fetchFunc(ctx, rateLimiter, baseUrl, txHash)
if err != nil {
return nil, fmt.Errorf("failed to retrieve tx information: %w", err)
}
return txDetail, nil
}

View File

@ -1,131 +0,0 @@
package chains
import (
"context"
"encoding/hex"
"fmt"
"time"
"github.com/ethereum/go-ethereum/rpc"
"github.com/mr-tron/base58"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
)
type solanaTransactionSignature struct {
Signature string `json:"signature"`
}
type solanaGetTransactionResponse struct {
BlockTime int64 `json:"blockTime"`
Meta solanaTransactionMeta `json:"meta"`
Transaction solanaTransaction `json:"transaction"`
}
type solanaTransactionMeta struct {
InnerInstructions []solanaInnerInstruction `json:"innerInstructions"`
Err []interface{} `json:"err"`
}
type solanaInnerInstruction struct {
Instructions []solanaInstruction `json:"instructions"`
}
type solanaInstruction struct {
ParsedInstruction solanaParsedInstruction `json:"parsed"`
}
type solanaParsedInstruction struct {
Type_ string `json:"type"`
Info solanaParsedInstructionInfo `json:"info"`
}
type solanaParsedInstructionInfo struct {
Account string `json:"account"`
Amount string `json:"amount"`
Authority string `json:"authority"`
Destination string `json:"destination"`
Source string `json:"source"`
}
type solanaTransaction struct {
Message solanaTransactionMessage `json:"message"`
Signatures []string `json:"signatures"`
}
type solanaTransactionMessage struct {
AccountKeys []solanaAccountKey `json:"accountKeys"`
}
type solanaAccountKey struct {
Pubkey string `json:"pubkey"`
Signer bool `json:"signer"`
}
func fetchSolanaTx(
ctx context.Context,
cfg *config.RpcProviderSettings,
txHash string,
) (*TxDetail, error) {
// Initialize RPC client
client, err := rpc.DialContext(ctx, cfg.SolanaBaseUrl)
if err != nil {
return nil, fmt.Errorf("failed to initialize RPC client: %w", err)
}
defer client.Close()
// Decode txHash bytes
// TODO: remove this when the fly fixes all txHash for Solana
h, err := hex.DecodeString(txHash)
if err != nil {
h, err = base58.Decode(txHash)
if err != nil {
return nil, fmt.Errorf("failed to decode from hex txHash=%s: %w", txHash, err)
}
}
// Get transaction signatures for the given account
var sigs []solanaTransactionSignature
err = client.CallContext(ctx, &sigs, "getSignaturesForAddress", base58.Encode(h))
if err != nil {
return nil, fmt.Errorf("failed to get signatures for account: %w (%+v)", err, err)
}
if len(sigs) == 0 {
return nil, ErrTransactionNotFound
}
if len(sigs) > 1 {
return nil, fmt.Errorf("expected exactly one signature, but found %d", len(sigs))
}
// Fetch the portal token bridge transaction
var response solanaGetTransactionResponse
err = client.CallContext(ctx, &response, "getTransaction", sigs[0].Signature, "jsonParsed")
if err != nil {
return nil, fmt.Errorf("failed to get tx by signature: %w", err)
}
if len(response.Meta.InnerInstructions) == 0 {
return nil, fmt.Errorf("response.Meta.InnerInstructions is empty")
}
if len(response.Meta.InnerInstructions[0].Instructions) == 0 {
return nil, fmt.Errorf("response.Meta.InnerInstructions[0].Instructions is empty")
}
// populate the response object
txDetail := TxDetail{
Timestamp: time.Unix(response.BlockTime, 0).UTC(),
NativeTxHash: sigs[0].Signature,
}
// set sender/receiver
for i := range response.Transaction.Message.AccountKeys {
if response.Transaction.Message.AccountKeys[i].Signer {
txDetail.From = response.Transaction.Message.AccountKeys[i].Pubkey
}
}
if txDetail.From == "" {
return nil, fmt.Errorf("failed to find source account")
}
return &txDetail, nil
}

View File

@ -1,242 +0,0 @@
package chains
import (
"context"
"errors"
"fmt"
"io/ioutil"
"math"
"net/http"
"strconv"
"strings"
"time"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
const requestTimeout = 30 * time.Second
var (
ErrChainNotSupported = errors.New("chain id not supported")
ErrTransactionNotFound = errors.New("transaction not found")
)
type TxDetail struct {
// From is the address that signed the transaction, encoded in the chain's native format.
From string
// Timestamp indicates the time at which the transaction was confirmed.
Timestamp time.Time
// NativeTxHash contains the transaction hash, encoded in the chain's native format.
NativeTxHash string
}
var tickers = struct {
algorand *time.Ticker
aptos *time.Ticker
arbitrum *time.Ticker
avalanche *time.Ticker
bsc *time.Ticker
celo *time.Ticker
ethereum *time.Ticker
fantom *time.Ticker
klaytn *time.Ticker
moonbeam *time.Ticker
oasis *time.Ticker
optimism *time.Ticker
polygon *time.Ticker
solana *time.Ticker
sui *time.Ticker
terra2 *time.Ticker
xpla *time.Ticker
}{}
func Initialize(cfg *config.RpcProviderSettings) {
// f converts "requests per minute" into the associated time.Duration
f := func(requestsPerMinute uint16) time.Duration {
division := float64(time.Minute) / float64(time.Duration(requestsPerMinute))
roundedUp := math.Ceil(division)
return time.Duration(roundedUp)
}
// these adapters send 1 request per txHash
tickers.algorand = time.NewTicker(f(cfg.AlgorandRequestsPerMinute))
tickers.sui = time.NewTicker(f(cfg.SuiRequestsPerMinute))
tickers.terra2 = time.NewTicker(f(cfg.Terra2RequestsPerMinute))
tickers.xpla = time.NewTicker(f(cfg.XplaRequestsPerMinute))
// these adapters send 2 requests per txHash
tickers.aptos = time.NewTicker(f(cfg.AptosRequestsPerMinute / 2))
tickers.arbitrum = time.NewTicker(f(cfg.ArbitrumRequestsPerMinute / 2))
tickers.avalanche = time.NewTicker(f(cfg.AvalancheRequestsPerMinute / 2))
tickers.bsc = time.NewTicker(f(cfg.BscRequestsPerMinute / 2))
tickers.celo = time.NewTicker(f(cfg.CeloRequestsPerMinute / 2))
tickers.ethereum = time.NewTicker(f(cfg.EthereumRequestsPerMinute / 2))
tickers.fantom = time.NewTicker(f(cfg.FantomRequestsPerMinute / 2))
tickers.klaytn = time.NewTicker(f(cfg.KlaytnRequestsPerMinute / 2))
tickers.moonbeam = time.NewTicker(f(cfg.MoonbeamRequestsPerMinute / 2))
tickers.oasis = time.NewTicker(f(cfg.OasisRequestsPerMinute / 2))
tickers.optimism = time.NewTicker(f(cfg.OptimismRequestsPerMinute / 2))
tickers.polygon = time.NewTicker(f(cfg.PolygonRequestsPerMinute / 2))
tickers.solana = time.NewTicker(f(cfg.SolanaRequestsPerMinute / 2))
}
func FetchTx(
ctx context.Context,
cfg *config.RpcProviderSettings,
chainId vaa.ChainID,
txHash string,
) (*TxDetail, error) {
var fetchFunc func(context.Context, *config.RpcProviderSettings, string) (*TxDetail, error)
var rateLimiter time.Ticker
// decide which RPC/API service to use based on chain ID
switch chainId {
case vaa.ChainIDSolana:
fetchFunc = fetchSolanaTx
rateLimiter = *tickers.solana
case vaa.ChainIDAlgorand:
fetchFunc = fetchAlgorandTx
rateLimiter = *tickers.algorand
case vaa.ChainIDCelo:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.CeloBaseUrl)
}
rateLimiter = *tickers.celo
case vaa.ChainIDEthereum:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.EthereumBaseUrl)
}
rateLimiter = *tickers.ethereum
case vaa.ChainIDBSC:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.BscBaseUrl)
}
rateLimiter = *tickers.bsc
case vaa.ChainIDPolygon:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.PolygonBaseUrl)
}
rateLimiter = *tickers.polygon
case vaa.ChainIDFantom:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.FantomBaseUrl)
}
rateLimiter = *tickers.fantom
case vaa.ChainIDKlaytn:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.KlaytnBaseUrl)
}
rateLimiter = *tickers.fantom
case vaa.ChainIDArbitrum:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.ArbitrumBaseUrl)
}
rateLimiter = *tickers.arbitrum
case vaa.ChainIDOasis:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.OasisBaseUrl)
}
rateLimiter = *tickers.oasis
case vaa.ChainIDOptimism:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.OptimismBaseUrl)
}
rateLimiter = *tickers.optimism
case vaa.ChainIDAvalanche:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.AvalancheBaseUrl)
}
rateLimiter = *tickers.avalanche
case vaa.ChainIDMoonbeam:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchEthTx(ctx, txHash, cfg.MoonbeamBaseUrl)
}
rateLimiter = *tickers.avalanche
case vaa.ChainIDAptos:
fetchFunc = fetchAptosTx
rateLimiter = *tickers.aptos
case vaa.ChainIDSui:
fetchFunc = fetchSuiTx
rateLimiter = *tickers.sui
case vaa.ChainIDTerra2:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchCosmosTx(ctx, cfg.Terra2BaseUrl, txHash)
}
rateLimiter = *tickers.terra2
case vaa.ChainIDXpla:
fetchFunc = func(ctx context.Context, cfg *config.RpcProviderSettings, txHash string) (*TxDetail, error) {
return fetchCosmosTx(ctx, cfg.XplaBaseUrl, txHash)
}
rateLimiter = *tickers.xpla
default:
return nil, ErrChainNotSupported
}
// wait for rate limit - fail fast if context was cancelled
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-rateLimiter.C:
}
// get transaction details from the RPC/API service
subContext, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
txDetail, err := fetchFunc(subContext, cfg, txHash)
if err != nil {
return nil, fmt.Errorf("failed to retrieve tx information: %w", err)
}
return txDetail, nil
}
// timestampFromHex converts a hex timestamp into a `time.Time` value.
func timestampFromHex(s string) (time.Time, error) {
// remove the leading "0x" or "0X" from the hex string
hexDigits := strings.Replace(s, "0x", "", 1)
hexDigits = strings.Replace(hexDigits, "0X", "", 1)
// parse the hex digits into an integer
epoch, err := strconv.ParseInt(hexDigits, 16, 64)
if err != nil {
return time.Time{}, fmt.Errorf("failed to parse hex timestamp: %w", err)
}
// convert the unix epoch into a `time.Time` value
timestamp := time.Unix(epoch, 0).UTC()
return timestamp, nil
}
// httpGet is a helper function that performs an HTTP request.
func httpGet(ctx context.Context, url string) ([]byte, error) {
// Build the HTTP request
request, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
// Send it
var client http.Client
response, err := client.Do(request)
if err != nil {
return nil, fmt.Errorf("failed to query url: %w", err)
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected HTTP status code: %d", response.StatusCode)
}
// Read the response body and return
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
return body, nil
}

110
tx-tracker/chains/util.go Normal file
View File

@ -0,0 +1,110 @@
package chains
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"
"github.com/ethereum/go-ethereum/rpc"
)
// timestampFromHex converts a hex timestamp into a `time.Time` value.
func timestampFromHex(s string) (time.Time, error) {
// remove the leading "0x" or "0X" from the hex string
hexDigits := strings.Replace(s, "0x", "", 1)
hexDigits = strings.Replace(hexDigits, "0X", "", 1)
// parse the hex digits into an integer
epoch, err := strconv.ParseInt(hexDigits, 16, 64)
if err != nil {
return time.Time{}, fmt.Errorf("failed to parse hex timestamp: %w", err)
}
// convert the unix epoch into a `time.Time` value
timestamp := time.Unix(epoch, 0).UTC()
return timestamp, nil
}
// httpGet is a helper function that performs an HTTP request.
func httpGet(ctx context.Context, rateLimiter *time.Ticker, url string) ([]byte, error) {
// Wait for the rate limiter
if !waitForRateLimiter(ctx, rateLimiter) {
return nil, ctx.Err()
}
// Build the HTTP request
request, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
// Send it
var client http.Client
response, err := client.Do(request)
if err != nil {
return nil, fmt.Errorf("failed to query url: %w", err)
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected HTTP status code: %d", response.StatusCode)
}
// Read the response body and return
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
return body, nil
}
func waitForRateLimiter(ctx context.Context, t *time.Ticker) bool {
select {
case <-t.C:
return true
case <-ctx.Done():
return false
}
}
// rateLimitedRpcClient is a wrapper around `rpc.Client` that adds rate limits
type rateLimitedRpcClient struct {
client *rpc.Client
}
func rpcDialContext(ctx context.Context, url string) (*rateLimitedRpcClient, error) {
client, err := rpc.DialContext(ctx, url)
if err != nil {
return nil, err
}
tmp := rateLimitedRpcClient{
client: client,
}
return &tmp, nil
}
func (c *rateLimitedRpcClient) CallContext(
ctx context.Context,
rateLimiter *time.Ticker,
result interface{},
method string,
args ...interface{},
) error {
if !waitForRateLimiter(ctx, rateLimiter) {
return ctx.Err()
}
return c.client.CallContext(ctx, result, method, args...)
}
func (c *rateLimitedRpcClient) Close() {
c.client.Close()
}

View File

@ -249,14 +249,6 @@ type consumerParams struct {
// - the channel is closed (i.e.: no more items to process)
func consume(ctx context.Context, params *consumerParams) {
// Initialize the client, which processes source Txs.
client := consumer.New(
nil,
params.rpcProviderSettings,
params.logger,
params.repository,
)
// Main loop: fetch global txs and process them
for {
select {
@ -306,7 +298,7 @@ func consume(ctx context.Context, params *consumerParams) {
Sequence: v.Sequence,
TxHash: *v.TxHash,
}
err := client.ProcessSourceTx(ctx, &p)
err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcProviderSettings, params.repository, &p)
if err != nil {
params.logger.Error("Failed to track source tx",
zap.String("vaaId", globalTx.Id),

View File

@ -65,7 +65,7 @@ func main() {
// create and start a consumer.
vaaConsumeFunc := newVAAConsumeFunc(rootCtx, cfg, logger)
repository := consumer.NewRepository(logger, db)
consumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, logger, repository)
consumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, rootCtx, logger, repository)
consumer.Start(rootCtx)
logger.Info("Started wormhole-explorer-tx-tracker")
@ -81,9 +81,9 @@ func main() {
}
// graceful shutdown
logger.Info("Cancelling root context ...")
logger.Info("Cancelling root context...")
rootCtxCancel()
logger.Info("Closing Http server ...")
logger.Info("Closing Http server...")
server.Stop()
logger.Info("Terminated wormhole-explorer-tx-tracker")
}
@ -114,7 +114,14 @@ func newSqsConsumer(ctx context.Context, cfg *config.ServiceSettings) (*sqs.Cons
awsconfig,
cfg.SqsUrl,
sqs.WithMaxMessages(10),
sqs.WithVisibilityTimeout(120),
// We're setting a high visibility timeout to decrease the likelihood of a
// message being processed more than once.
//
// This is particularly relevant for the cases in which we receive a burst
// of traffic (e.g.: dozens of VAAs being emitted in the same minute), and
// also when a we have to retry fetching transaction metadata many times
// (due to finality delay, out-of-sync nodes, etc).
sqs.WithVisibilityTimeout(15*60),
)
return consumer, err
}

View File

@ -4,9 +4,6 @@ import (
"context"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/queue"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
@ -14,8 +11,8 @@ import (
)
const (
numRetries = 2
retryDelay = 10 * time.Second
maxAttempts = 5
retryDelay = 60 * time.Second
)
// Consumer consumer struct definition.
@ -24,21 +21,26 @@ type Consumer struct {
rpcServiceProviderSettings *config.RpcProviderSettings
logger *zap.Logger
repository *Repository
workerPool *WorkerPool
}
// New creates a new vaa consumer.
func New(
consumeFunc queue.VAAConsumeFunc,
rpcServiceProviderSettings *config.RpcProviderSettings,
ctx context.Context,
logger *zap.Logger,
repository *Repository,
) *Consumer {
workerPool := NewWorkerPool(ctx, logger, rpcServiceProviderSettings, repository)
c := Consumer{
consumeFunc: consumeFunc,
rpcServiceProviderSettings: rpcServiceProviderSettings,
logger: logger,
repository: repository,
workerPool: workerPool,
}
return &c
@ -46,110 +48,40 @@ func New(
// Start consumes messages from VAA queue, parse and store those messages in a repository.
func (c *Consumer) Start(ctx context.Context) {
go func() {
for msg := range c.consumeFunc(ctx) {
event := msg.Data()
// Check if message is expired.
if msg.IsExpired() {
c.logger.Warn("Message with VAA expired", zap.String("id", event.ID))
msg.Failed()
continue
}
// Do not process messages from PythNet
if event.ChainID == sdk.ChainIDPythNet {
msg.Done()
continue
}
// Fetch tx details from the corresponding RPC/API, then persist them on MongoDB.
p := ProcessSourceTxParams{
VaaId: event.ID,
ChainId: event.ChainID,
Emitter: event.EmitterAddress,
Sequence: event.Sequence,
TxHash: event.TxHash,
}
err := c.ProcessSourceTx(ctx, &p)
if err == chains.ErrChainNotSupported {
c.logger.Debug("Skipping VAA - chain not supported",
zap.String("vaaId", event.ID),
)
} else if err != nil {
c.logger.Error("Failed to upsert source transaction details",
zap.String("vaaId", event.ID),
zap.Error(err),
)
} else {
c.logger.Debug("Updated source transaction details in the database",
zap.String("id", event.ID),
)
}
msg.Done()
}
}()
go c.producerLoop(ctx)
}
// ProcessSourceTxParams is a struct that contains the parameters for the ProcessSourceTx method.
type ProcessSourceTxParams struct {
ChainId sdk.ChainID
VaaId string
Emitter string
Sequence string
TxHash string
}
func (c *Consumer) producerLoop(ctx context.Context) {
func (c *Consumer) ProcessSourceTx(
ctx context.Context,
params *ProcessSourceTxParams,
) error {
ch := c.consumeFunc(ctx)
// Get transaction details from the emitter blockchain
//
// If the transaction is not found, will retry a few times before giving up.
var txStatus domain.SourceTxStatus
var txDetail *chains.TxDetail
var err error
for attempts := numRetries; attempts > 0; attempts-- {
for msg := range ch {
txDetail, err = chains.FetchTx(ctx, c.rpcServiceProviderSettings, params.ChainId, params.TxHash)
event := msg.Data()
switch {
// If the transaction is not found, retry after a delay
case err == chains.ErrTransactionNotFound:
txStatus = domain.SourceTxStatusInternalError
time.Sleep(retryDelay)
// Check if message is expired.
if msg.IsExpired() {
c.logger.Warn("Message with VAA expired", zap.String("id", event.ID))
msg.Failed()
continue
}
// If the chain ID is not supported, we're done.
case err == chains.ErrChainNotSupported:
return err
// Do not process messages from PythNet
if event.ChainID == sdk.ChainIDPythNet {
msg.Done()
continue
}
// If there is an internal error, give up
case err != nil:
c.logger.Error("Failed to fetch source transaction details",
zap.String("vaaId", params.VaaId),
// Send the VAA to the worker pool.
//
// The worker pool is responsible for calling `msg.Done()`
err := c.workerPool.Push(ctx, msg)
if err != nil {
c.logger.Warn("failed to push message into worker pool",
zap.String("vaaId", event.ID),
zap.Error(err),
)
txStatus = domain.SourceTxStatusInternalError
break
// Success
case err == nil:
txStatus = domain.SourceTxStatusConfirmed
break
msg.Failed()
}
}
// Store source transaction details in the database
p := UpsertDocumentParams{
VaaId: params.VaaId,
ChainId: params.ChainId,
TxHash: params.TxHash,
TxDetail: txDetail,
TxStatus: txStatus,
}
return c.repository.UpsertDocument(ctx, &p)
}

View File

@ -0,0 +1,88 @@
package consumer
import (
"context"
"errors"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
// ProcessSourceTxParams is a struct that contains the parameters for the ProcessSourceTx method.
type ProcessSourceTxParams struct {
ChainId sdk.ChainID
VaaId string
Emitter string
Sequence string
TxHash string
}
func ProcessSourceTx(
ctx context.Context,
logger *zap.Logger,
rpcServiceProviderSettings *config.RpcProviderSettings,
repository *Repository,
params *ProcessSourceTxParams,
) error {
// Get transaction details from the emitter blockchain
//
// If the transaction is not found, will retry a few times before giving up.
var txStatus domain.SourceTxStatus
var txDetail *chains.TxDetail
var err error
for attempts := 1; attempts <= maxAttempts; attempts++ {
txDetail, err = chains.FetchTx(ctx, rpcServiceProviderSettings, params.ChainId, params.TxHash)
switch {
// If the transaction is not found, retry after a delay
case errors.Is(err, chains.ErrTransactionNotFound):
txStatus = domain.SourceTxStatusInternalError
logger.Warn("transaction not found, will retry after a delay",
zap.String("vaaId", params.VaaId),
zap.Duration("retryDelay", retryDelay),
zap.Int("attempts", attempts),
zap.Int("maxAttempts", maxAttempts),
)
time.Sleep(retryDelay)
continue
// If the chain ID is not supported, we're done.
case errors.Is(err, chains.ErrChainNotSupported):
return err
// If the context was cancelled, do not attempt to save the result on the database
case errors.Is(err, context.Canceled):
return err
// If there is an internal error, give up
case err != nil:
logger.Error("Failed to fetch source transaction details",
zap.String("vaaId", params.VaaId),
zap.Error(err),
)
txStatus = domain.SourceTxStatusInternalError
break
// Success
case err == nil:
txStatus = domain.SourceTxStatusConfirmed
break
}
}
// Store source transaction details in the database
p := UpsertDocumentParams{
VaaId: params.VaaId,
ChainId: params.ChainId,
TxHash: params.TxHash,
TxDetail: txDetail,
TxStatus: txStatus,
}
return repository.UpsertDocument(ctx, &p)
}

View File

@ -0,0 +1,129 @@
package consumer
import (
"context"
"fmt"
"sync"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/queue"
"go.uber.org/zap"
)
const numWorkers = 500
// WorkerPool is an abstraction to process VAAs concurrently.
type WorkerPool struct {
wg sync.WaitGroup
chInput chan queue.ConsumerMessage
ctx context.Context
logger *zap.Logger
rpcProviderSettings *config.RpcProviderSettings
repository *Repository
}
// NewWorkerPool creates a new worker pool.
func NewWorkerPool(
ctx context.Context,
logger *zap.Logger,
rpcProviderSettings *config.RpcProviderSettings,
repository *Repository,
) *WorkerPool {
w := WorkerPool{
chInput: make(chan queue.ConsumerMessage),
ctx: ctx,
logger: logger,
rpcProviderSettings: rpcProviderSettings,
repository: repository,
}
// Spawn worker goroutines
for i := 0; i < numWorkers; i++ {
w.wg.Add(1)
go w.consumerLoop()
}
return &w
}
// Push sends a new item to the worker pool.
//
// This function will block until either a worker is available or the context is cancelled.
func (w *WorkerPool) Push(ctx context.Context, msg queue.ConsumerMessage) error {
select {
case w.chInput <- msg:
return nil
case <-ctx.Done():
return fmt.Errorf("failed to push message into worker pool due to calcelled context: %w", ctx.Err())
}
}
// StopGracefully stops the worker pool gracefully.
//
// This function blocks until the consumer queue is empty.
func (w *WorkerPool) StopGracefully() {
// Close the producer channel.
// This will stop sending items to the workers.
// After all items are consumed, the workers will exit.
close(w.chInput)
w.chInput = nil
// Wait for all workers to finish gracefully
w.wg.Wait()
}
// consumerLoop is the main loop of a worker.
//
// It will consume items from the input channel until the channel is closed or the context is cancelled.
func (w *WorkerPool) consumerLoop() {
for {
select {
case msg, ok := <-w.chInput:
if !ok {
w.wg.Done()
return
}
w.process(msg)
case <-w.ctx.Done():
w.wg.Done()
return
}
}
}
// process consumes a single item from the input channel.
func (w *WorkerPool) process(msg queue.ConsumerMessage) {
event := msg.Data()
p := ProcessSourceTxParams{
VaaId: event.ID,
ChainId: event.ChainID,
Emitter: event.EmitterAddress,
Sequence: event.Sequence,
TxHash: event.TxHash,
}
err := ProcessSourceTx(w.ctx, w.logger, w.rpcProviderSettings, w.repository, &p)
if err == chains.ErrChainNotSupported {
w.logger.Debug("Skipping VAA - chain not supported",
zap.String("vaaId", event.ID),
)
} else if err != nil {
w.logger.Error("Failed to upsert source transaction details",
zap.String("vaaId", event.ID),
zap.Error(err),
)
} else {
w.logger.Debug("Updated source transaction details in the database",
zap.String("id", event.ID),
)
}
msg.Done()
}