Modify intialize to support fallback url and rate limit (#1093)

* Modify intialize to support fallback url and rate limit

* Add fallback and rate limit config for all the chains

* remove unused WormchainTxDetail struct

* Add pool of items to common

* Add rpc pool to tx-tracker

* Rename chain url to baseUrl

* add fallback to wormchain chains

* update tx-tracker fetchone to use rpc pool

* Modify backfiller to use rpc pool

* fix merge to main run files

* Add rpc pool to controller

* remove comments

* fix pool sort function

* Add rpc pool to all the chains

* Fix algorand, aptos, sui integrations

* Fix pool solana

* remove api_sei3 old implementation

* Add metrics to get success/error rpc call

* Add rpc field in call rpc metric

* Fix common go.mod and remove unused if condition

* Update go.mod tx-tracker

* Create rpc count prometheus metrics

* Add fallback to insert originTx if rcp fail

* Check if the transaction doesn't exists

* Modify in tx-tracker already processed method

* fix wormchain rpc

* Add rpc settings as json file

* Enable overwrite to stress test

* Add rpc provider as k8s secret

* Add metrics to get vaa retry processing
Co-authored-by: ftocal fert1335@gmail.com

* Add worker pool in consumer for tx-tracker

* fix config by env

* Set consumer workers in 1

---------

Co-authored-by: Fernando Torres <fert1335@gmail.com>
This commit is contained in:
walker-16 2024-03-19 15:47:43 -03:00 committed by GitHub
parent f98d8f28ba
commit bc3110f3d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
40 changed files with 1828 additions and 690 deletions

View File

@ -24,6 +24,8 @@ require (
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240109172745-cc0cd9fc5229
go.mongodb.org/mongo-driver v1.11.2
go.uber.org/zap v1.24.0
golang.org/x/net v0.17.0
golang.org/x/time v0.3.0
)
require (
@ -72,6 +74,7 @@ require (
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/streamingfast/logging v0.0.0-20220405224725-2755dab2ce75 // indirect
github.com/stretchr/objx v0.4.0 // indirect
github.com/teris-io/shortid v0.0.0-20201117134242-e59966efd125 // indirect
github.com/tidwall/gjson v1.9.3 // indirect
github.com/tidwall/match v1.1.1 // indirect
@ -88,7 +91,6 @@ require (
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect

View File

@ -384,6 +384,7 @@ github.com/streamingfast/logging v0.0.0-20220405224725-2755dab2ce75 h1:ZqpS7rAhh
github.com/streamingfast/logging v0.0.0-20220405224725-2755dab2ce75/go.mod h1:VlduQ80JcGJSargkRU4Sg9Xo63wZD/l8A5NC/Uo1/uU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=

13
common/pool/config.go Normal file
View File

@ -0,0 +1,13 @@
package pool
// Config is the configuration of an pool item.
type Config struct {
// id is the RPC service ID.
Id string
// description of the item.
Description string
// priority is the priority of the item.
Priority uint8
// amount of request per minute
RequestsPerMinute uint16
}

129
common/pool/pool.go Normal file
View File

@ -0,0 +1,129 @@
package pool
import (
"sort"
"time"
"golang.org/x/net/context"
"golang.org/x/time/rate"
)
// Pool is a pool of items.
type Pool struct {
items []Item
}
// Item defines the item of the pool.
type Item struct {
// Id is the item ID.
Id string
// description of the item.
Description string
// priority is the priority of the item.
// The lower the value, the higher the priority.
priority uint8
// rateLimit is the rate limiter for the item.
rateLimit *rate.Limiter
}
// NewPool creates a new pool.
func NewPool(cfg []Config) *Pool {
p := &Pool{}
for _, c := range cfg {
p.addItem(c)
}
return p
}
// addItem adds a new item to the pool.
func (p *Pool) addItem(cfg Config) {
i := Item{
Id: cfg.Id,
Description: cfg.Description,
priority: cfg.Priority,
rateLimit: rate.NewLimiter(
rate.Every(time.Minute/time.Duration(cfg.RequestsPerMinute)), 1),
}
p.items = append(p.items, i)
}
// GetItem returns the next available item of the pool.
func (p *Pool) GetItem() Item {
// check if there is no item
if len(p.items) == 0 {
return Item{}
}
// get the next available item
itemWithScore := []struct {
item Item
score float64
}{}
now := time.Now()
for _, i := range p.items {
tokenAt := i.rateLimit.TokensAt(now)
itemWithScore = append(itemWithScore, struct {
item Item
score float64
}{
item: i,
score: tokenAt,
})
}
// sort by score and priority
sort.Slice(itemWithScore, func(i, j int) bool {
if itemWithScore[i].score == itemWithScore[j].score {
return itemWithScore[i].item.priority < itemWithScore[j].item.priority
}
return itemWithScore[i].score > itemWithScore[j].score
})
return itemWithScore[0].item
}
// GetItems returns the list of items sorted by score and priority.
// Once there is an event on the item, it must be notified using the method NotifyEvent.
func (p *Pool) GetItems() []Item {
if len(p.items) == 0 {
return []Item{}
}
itemsWithScore := []struct {
item Item
score float64
}{}
now := time.Now()
for _, i := range p.items {
tokenAt := i.rateLimit.TokensAt(now)
itemsWithScore = append(itemsWithScore, struct {
item Item
score float64
}{
item: i,
score: tokenAt,
})
}
// sort by score and priority
sort.Slice(itemsWithScore, func(i, j int) bool {
if itemsWithScore[i].score == itemsWithScore[j].score {
return itemsWithScore[i].item.priority < itemsWithScore[j].item.priority
}
return itemsWithScore[i].score > itemsWithScore[j].score
})
// convert itemsWithScore to items
items := []Item{}
for _, i := range itemsWithScore {
items = append(items, i.item)
}
return items
}
// Wait waits for the rate limiter to allow the next item request.
func (i *Item) Wait(ctx context.Context) error {
return i.rateLimit.Wait(ctx)
}

31
common/utils/domain.go Normal file
View File

@ -0,0 +1,31 @@
package utils
import (
"regexp"
"strings"
)
// FindSubstringBeforeDomains finds the substring before specified domains appear
// and also removes http:// or https:// prefix from the string.
func FindSubstringBeforeDomains(s string, domains []string) string {
// Regular expression to match the protocol prefix (http:// or https://)
reProtocol := regexp.MustCompile(`^(http://|https://)`)
// Remove the protocol prefix
s = reProtocol.ReplaceAllString(s, "")
// Create a regular expression pattern for the domains
// Escape dots since they have a special meaning in regex, and join domains with |
domainPattern := strings.Join(domains, "|")
domainPattern = strings.Replace(domainPattern, ".", `\.`, -1)
reDomain := regexp.MustCompile("(" + domainPattern + ")")
// Find the index of the first occurrence of any domain
loc := reDomain.FindStringIndex(s)
if loc != nil {
// Return the substring from the start to the first domain occurrence
return s[:loc[0]]
}
// If no domain is found, return the original modified string
return s
}

View File

@ -32,4 +32,6 @@ SUI_URL=
TERRA_URL=
# protocols jobs
PROTOCOLS_STATS_VERSION=v1
PROTOCOLS_ACTIVITY_VERSION=v1
PROTOCOLS_ACTIVITY_VERSION=v1
# rpc provider json
RPC_PROVIDER_JSON=

View File

@ -36,4 +36,6 @@ SUI_URL=
TERRA_URL=
# protocols jobs
PROTOCOLS_STATS_VERSION=v1
PROTOCOLS_ACTIVITY_VERSION=v1
PROTOCOLS_ACTIVITY_VERSION=v1
# rpc provider json
RPC_PROVIDER_JSON=

View File

@ -32,4 +32,6 @@ SUI_URL=
TERRA_URL=
# protocols jobs
PROTOCOLS_STATS_VERSION=v1
PROTOCOLS_ACTIVITY_VERSION=v1
PROTOCOLS_ACTIVITY_VERSION=v1
# rpc provider json
RPC_PROVIDER_JSON=

View File

@ -36,4 +36,6 @@ SUI_URL=
TERRA_URL=
# protocols jobs
PROTOCOLS_STATS_VERSION=v1
PROTOCOLS_ACTIVITY_VERSION=v1
PROTOCOLS_ACTIVITY_VERSION=v1
# rpc provider json
RPC_PROVIDER_JSON=

View File

@ -0,0 +1,9 @@
---
kind: Secret
apiVersion: v1
metadata:
name: rpc-provider
namespace: {{ .NAMESPACE }}
type: Opaque
data:
rpc-provider.json: {{ .RPC_PROVIDER_JSON | b64enc }}

View File

@ -91,7 +91,7 @@ TERRA_REQUESTS_PER_MINUTE=12
TERRA2_BASE_URL=https://phoenix-lcd.terra.dev
TERRA2_REQUESTS_PER_MINUTE=12
WORMCHAIN_BASE_URL=https://wormchain.jumpisolated.com
WORMCHAIN_BASE_URL=https://wormchain-rpc.quickapi.com
WORMCHAIN_REQUESTS_PER_MINUTE=12
XPLA_BASE_URL=https://dimension-lcd.xpla.dev

View File

@ -91,7 +91,7 @@ TERRA_REQUESTS_PER_MINUTE=12
TERRA2_BASE_URL=https://phoenix-lcd.terra.dev
TERRA2_REQUESTS_PER_MINUTE=12
WORMCHAIN_BASE_URL=https://wormchain.jumpisolated.com
WORMCHAIN_BASE_URL=https://wormchain-rpc.quickapi.com
WORMCHAIN_REQUESTS_PER_MINUTE=12
XPLA_BASE_URL=https://dimension-lcd.xpla.dev

View File

@ -40,6 +40,9 @@ spec:
- name: {{ .NAME }}
image: {{ .IMAGE_NAME }}
imagePullPolicy: Always
volumeMounts:
- name: tx-tracker-config
mountPath: /opt/tx-tracker
readinessProbe:
initialDelaySeconds: 30
periodSeconds: 20
@ -91,187 +94,11 @@ spec:
- name: P2P_NETWORK
value: {{ .P2P_NETWORK }}
- name: METRICS_ENABLED
value: "{{ .METRICS_ENABLED }}"
- name: ACALA_BASE_URL
value: {{ .ACALA_BASE_URL }}
- name: ACALA_REQUESTS_PER_MINUTE
value: "{{ .ACALA_REQUESTS_PER_MINUTE }}"
- name: ALGORAND_BASE_URL
value: {{ .ALGORAND_BASE_URL }}
- name: ALGORAND_REQUESTS_PER_MINUTE
value: "{{ .ALGORAND_REQUESTS_PER_MINUTE }}"
- name: APTOS_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: aptos-url
- name: APTOS_REQUESTS_PER_MINUTE
value: "{{ .APTOS_REQUESTS_PER_MINUTE }}"
- name: ARBITRUM_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: arbitrum-url
- name: ARBITRUM_REQUESTS_PER_MINUTE
value: "{{ .ARBITRUM_REQUESTS_PER_MINUTE }}"
- name: AVALANCHE_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: avalanche-url
- name: AVALANCHE_REQUESTS_PER_MINUTE
value: "{{ .AVALANCHE_REQUESTS_PER_MINUTE }}"
- name: BASE_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: base-url
- name: BASE_REQUESTS_PER_MINUTE
value: "{{ .BASE_REQUESTS_PER_MINUTE }}"
- name: BSC_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: bsc-url
- name: BSC_REQUESTS_PER_MINUTE
value: "{{ .BSC_REQUESTS_PER_MINUTE }}"
- name: CELO_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: celo-url
- name: CELO_REQUESTS_PER_MINUTE
value: "{{ .CELO_REQUESTS_PER_MINUTE }}"
- name: ETHEREUM_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: ethereum-url
- name: ETHEREUM_REQUESTS_PER_MINUTE
value: "{{ .ETHEREUM_REQUESTS_PER_MINUTE }}"
- name: EVMOS_BASE_URL
value: {{ .EVMOS_BASE_URL }}
- name: EVMOS_REQUESTS_PER_MINUTE
value: "{{ .EVMOS_REQUESTS_PER_MINUTE }}"
- name: FANTOM_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: fantom-url
- name: FANTOM_REQUESTS_PER_MINUTE
value: "{{ .FANTOM_REQUESTS_PER_MINUTE }}"
- name: INJECTIVE_BASE_URL
value: {{ .INJECTIVE_BASE_URL }}
- name: INJECTIVE_REQUESTS_PER_MINUTE
value: "{{ .INJECTIVE_REQUESTS_PER_MINUTE }}"
- name: KARURA_BASE_URL
value: {{ .KARURA_BASE_URL }}
- name: KARURA_REQUESTS_PER_MINUTE
value: "{{ .KARURA_REQUESTS_PER_MINUTE }}"
- name: KLAYTN_BASE_URL
value: {{ .KLAYTN_BASE_URL }}
- name: KLAYTN_REQUESTS_PER_MINUTE
value: "{{ .KLAYTN_REQUESTS_PER_MINUTE }}"
- name: KUJIRA_BASE_URL
value: {{ .KUJIRA_BASE_URL }}
- name: KUJIRA_REQUESTS_PER_MINUTE
value: "{{ .KUJIRA_REQUESTS_PER_MINUTE }}"
- name: MOONBEAM_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: moonbeam-url
- name: MOONBEAM_REQUESTS_PER_MINUTE
value: "{{ .MOONBEAM_REQUESTS_PER_MINUTE }}"
- name: OASIS_BASE_URL
value: {{ .OASIS_BASE_URL }}
- name: OASIS_REQUESTS_PER_MINUTE
value: "{{ .OASIS_REQUESTS_PER_MINUTE }}"
- name: OPTIMISM_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: optimism-url
- name: OPTIMISM_REQUESTS_PER_MINUTE
value: "{{ .OPTIMISM_REQUESTS_PER_MINUTE }}"
- name: OSMOSIS_BASE_URL
value: {{ .OSMOSIS_BASE_URL }}
- name: OSMOSIS_REQUESTS_PER_MINUTE
value: "{{ .OSMOSIS_REQUESTS_PER_MINUTE }}"
- name: POLYGON_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: polygon-url
- name: POLYGON_REQUESTS_PER_MINUTE
value: "{{ .POLYGON_REQUESTS_PER_MINUTE }}"
- name: SEI_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: sei-url
- name: SEI_REQUESTS_PER_MINUTE
value: "{{ .SEI_REQUESTS_PER_MINUTE }}"
- name: SOLANA_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: solana-url
- name: SOLANA_REQUESTS_PER_MINUTE
value: "{{ .SOLANA_REQUESTS_PER_MINUTE }}"
- name: SUI_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: sui-url
- name: SUI_REQUESTS_PER_MINUTE
value: "{{ .SUI_REQUESTS_PER_MINUTE }}"
- name: TERRA_BASE_URL
value: {{ .TERRA_BASE_URL }}
- name: TERRA_REQUESTS_PER_MINUTE
value: "{{ .TERRA_REQUESTS_PER_MINUTE }}"
- name: TERRA2_BASE_URL
value: {{ .TERRA2_BASE_URL }}
- name: TERRA2_REQUESTS_PER_MINUTE
value: "{{ .TERRA2_REQUESTS_PER_MINUTE }}"
- name: WORMCHAIN_BASE_URL
value: {{ .WORMCHAIN_BASE_URL }}
- name: WORMCHAIN_REQUESTS_PER_MINUTE
value: "{{ .WORMCHAIN_REQUESTS_PER_MINUTE }}"
- name: XPLA_BASE_URL
value: {{ .XPLA_BASE_URL }}
- name: XPLA_REQUESTS_PER_MINUTE
value: "{{ .XPLA_REQUESTS_PER_MINUTE }}"
{{ if eq .P2P_NETWORK "testnet" }}
- name: ARBITRUM_SEPOLIA_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: arbitrum-sepolia-url
- name: ARBITRUM_SEPOLIA_REQUESTS_PER_MINUTE
value: "{{ .ARBITRUM_SEPOLIA_REQUESTS_PER_MINUTE }}"
- name: BASE_SEPOLIA_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: base-sepolia-url
- name: BASE_SEPOLIA_REQUESTS_PER_MINUTE
value: "{{ .BASE_SEPOLIA_REQUESTS_PER_MINUTE }}"
- name: ETHEREUM_SEPOLIA_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: ethereum-sepolia-url
- name: ETHEREUM_SEPOLIA_REQUESTS_PER_MINUTE
value: "{{ .ETHEREUM_SEPOLIA_REQUESTS_PER_MINUTE }}"
- name: OPTIMISM_SEPOLIA_BASE_URL
valueFrom:
secretKeyRef:
name: blockchain
key: optimism-sepolia-url
- name: OPTIMISM_SEPOLIA_REQUESTS_PER_MINUTE
value: "{{ .OPTIMISM_SEPOLIA_REQUESTS_PER_MINUTE }}"
{{ end }}
value: "{{ .METRICS_ENABLED }}"
- name: RPC_PROVIDER_PATH
value: "/opt/tx-tracker/rpc-provider.json"
- name: CONSUMER_WORKERS_SIZE
value: "1"
resources:
limits:
memory: {{ .RESOURCES_LIMITS_MEMORY }}
@ -279,4 +106,11 @@ spec:
requests:
memory: {{ .RESOURCES_REQUESTS_MEMORY }}
cpu: {{ .RESOURCES_REQUESTS_CPU }}
volumes:
- name: tx-tracker-config
secret:
secretName: rpc-provider
items:
- key: rpc-provider.json
path: rpc-provider.json

View File

@ -4,7 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
type algorandTransactionResponse struct {
@ -15,9 +19,42 @@ type algorandTransactionResponse struct {
} `json:"transaction"`
}
func FetchAlgorandTx(
ctx context.Context,
pool *pool.Pool,
txHash string,
metrics metrics.Metrics,
logger *zap.Logger,
) (*TxDetail, error) {
// get rpc sorted by score and priority.
rpcs := pool.GetItems()
if len(rpcs) == 0 {
return nil, ErrChainNotSupported
}
// Call the transaction endpoint of the Algorand Indexer REST API
var txDetail *TxDetail
var err error
for _, rpc := range rpcs {
// Wait for the RPC rate limiter
rpc.Wait(ctx)
txDetail, err = fetchAlgorandTx(ctx, rpc.Id, txHash)
if txDetail != nil {
metrics.IncCallRpcSuccess(uint16(sdk.ChainIDAlgorand), rpc.Description)
break
}
if err != nil {
metrics.IncCallRpcError(uint16(sdk.ChainIDAlgorand), rpc.Description)
logger.Debug("Failed to fetch transaction from Algorand indexer", zap.String("url", rpc.Id), zap.Error(err))
}
}
return txDetail, err
}
func fetchAlgorandTx(
ctx context.Context,
rateLimiter *time.Ticker,
baseUrl string,
txHash string,
) (*TxDetail, error) {
@ -27,7 +64,7 @@ func fetchAlgorandTx(
{
// Perform the HTTP request
url := fmt.Sprintf("%s/v2/transactions/%s", baseUrl, txHash)
body, err := httpGet(ctx, rateLimiter, url)
body, err := httpGet(ctx, url)
if err != nil {
return nil, fmt.Errorf("HTTP request to Algorand transactions endpoint failed: %w", err)
}

View File

@ -5,7 +5,11 @@ import (
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
const (
@ -22,11 +26,12 @@ type aptosTx struct {
Hash string `json:"hash"`
}
func fetchAptosTx(
func FetchAptosTx(
ctx context.Context,
rateLimiter *time.Ticker,
baseUrl string,
pool *pool.Pool,
txHash string,
metrics metrics.Metrics,
logger *zap.Logger,
) (*TxDetail, error) {
// Parse the Aptos event creation number
@ -35,28 +40,30 @@ func fetchAptosTx(
return nil, fmt.Errorf("failed to parse event creation number from Aptos tx hash: %w", err)
}
// get rpc sorted by score and priority.
rpcs := pool.GetItems()
if len(rpcs) == 0 {
return nil, ErrChainNotSupported
}
// Get the event from the Aptos node API.
var events []aptosEvent
{
// Build the URI for the events endpoint
uri := fmt.Sprintf("%s/v1/accounts/%s/events/%s::state::WormholeMessageHandle/event?start=%d&limit=1",
baseUrl,
aptosCoreContractAddress,
aptosCoreContractAddress,
creationNumber,
)
// Query the events endpoint
body, err := httpGet(ctx, rateLimiter, uri)
for _, rpc := range rpcs {
// Wait for the RPC rate limiter
rpc.Wait(ctx)
events, err = fetchAptosAccountEvents(ctx, rpc.Id, aptosCoreContractAddress, creationNumber, 1)
if err != nil {
return nil, fmt.Errorf("failed to query events endpoint: %w", err)
metrics.IncCallRpcError(uint16(sdk.ChainIDAptos), rpc.Description)
logger.Debug("Failed to fetch transaction from Aptos node", zap.String("url", rpc.Id), zap.Error(err))
continue
}
metrics.IncCallRpcSuccess(uint16(sdk.ChainIDAptos), rpc.Description)
break
}
// Deserialize the response
err = json.Unmarshal(body, &events)
if err != nil {
return nil, fmt.Errorf("failed to parse response body from events endpoint: %w", err)
}
// Return an error if the event is not found
if err != nil {
return nil, err
}
if len(events) == 0 {
return nil, ErrTransactionNotFound
@ -64,23 +71,30 @@ func fetchAptosTx(
return nil, fmt.Errorf("expected exactly one event, but got %d", len(events))
}
// Get the transaction
var tx aptosTx
{
// Build the URI for the events endpoint
uri := fmt.Sprintf("%s/v1/transactions/by_version/%d", baseUrl, events[0].Version)
// get rpc sorted by score and priority.
rpcs = pool.GetItems()
if len(rpcs) == 0 {
return nil, ErrChainNotSupported
}
// Query the events endpoint
body, err := httpGet(ctx, rateLimiter, uri)
// Get the transaction from the Aptos node API.
var tx *aptosTx
for _, rpc := range rpcs {
// Wait for the RPC rate limiter
rpc.Wait(ctx)
tx, err = fetchAptosTx(ctx, rpc.Id, events[0].Version)
if err != nil {
return nil, fmt.Errorf("failed to query transactions endpoint: %w", err)
metrics.IncCallRpcError(uint16(sdk.ChainIDAptos), rpc.Description)
logger.Debug("Failed to fetch transaction from Aptos node", zap.String("url", rpc.Id), zap.Error(err))
continue
}
metrics.IncCallRpcSuccess(uint16(sdk.ChainIDAptos), rpc.Description)
break
}
// Deserialize the response
err = json.Unmarshal(body, &tx)
if err != nil {
return nil, fmt.Errorf("failed to parse response body from transactions endpoint: %w", err)
}
// Return an error if the transaction is not found
if tx == nil {
return nil, ErrTransactionNotFound
}
// Build the result struct and return
@ -90,3 +104,51 @@ func fetchAptosTx(
}
return &TxDetail, nil
}
// fetchAptosAccountEvents queries the Aptos node API for the events of a given account.
func fetchAptosAccountEvents(ctx context.Context, baseUrl string, contractAddress string, start uint64, limit uint64) ([]aptosEvent, error) {
// Build the URI for the events endpoint
uri := fmt.Sprintf("%s/v1/accounts/%s/events/%s::state::WormholeMessageHandle/event?start=%d&limit=%d",
baseUrl,
contractAddress,
contractAddress,
start,
limit,
)
// Query the events endpoint
body, err := httpGet(ctx, uri)
if err != nil {
return nil, fmt.Errorf("failed to query events endpoint: %w", err)
}
// Deserialize the response
var events []aptosEvent
err = json.Unmarshal(body, &events)
if err != nil {
return nil, fmt.Errorf("failed to parse response body from events endpoint: %w", err)
}
return events, nil
}
// fetchAptosTx queries the Aptos node API for the transaction details of a given version.
func fetchAptosTx(ctx context.Context, baseUrl string, version uint64) (*aptosTx, error) {
// Build the URI for the events endpoint
uri := fmt.Sprintf("%s/v1/transactions/by_version/%d", baseUrl, version)
// Query the events endpoint
body, err := httpGet(ctx, uri)
if err != nil {
return nil, fmt.Errorf("failed to query transactions endpoint: %w", err)
}
// Deserialize the response
var tx aptosTx
err = json.Unmarshal(body, &tx)
if err != nil {
return nil, fmt.Errorf("failed to parse response body from transactions endpoint: %w", err)
}
return &tx, nil
}

View File

@ -5,7 +5,11 @@ import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
const (
@ -29,9 +33,44 @@ type cosmosTxsResponse struct {
} `json:"tx_response"`
}
func fetchCosmosTx(
type apiCosmos struct {
chainId sdk.ChainID
}
func (c *apiCosmos) FetchCosmosTx(
ctx context.Context,
pool *pool.Pool,
txHash string,
metrics metrics.Metrics,
logger *zap.Logger,
) (*TxDetail, error) {
// get rpc sorted by score and priority.
rpcs := pool.GetItems()
if len(rpcs) == 0 {
return nil, ErrChainNotSupported
}
var txDetail *TxDetail
var err error
for _, rpc := range rpcs {
// Wait for the RPC rate limiter
rpc.Wait(ctx)
txDetail, err = c.fetchCosmosTx(ctx, rpc.Id, txHash)
if err != nil {
metrics.IncCallRpcError(uint16(c.chainId), rpc.Description)
logger.Debug("Failed to fetch transaction from cosmos node", zap.String("url", rpc.Id), zap.Error(err))
continue
}
metrics.IncCallRpcSuccess(uint16(c.chainId), rpc.Description)
break
}
return txDetail, err
}
func (c *apiCosmos) fetchCosmosTx(
ctx context.Context,
rateLimiter *time.Ticker,
baseUrl string,
txHash string,
) (*TxDetail, error) {
@ -41,7 +80,7 @@ func fetchCosmosTx(
{
// Perform the HTTP request
uri := fmt.Sprintf("%s/cosmos/tx/v1beta1/txs/%s", baseUrl, txHash)
body, err := httpGet(ctx, rateLimiter, uri)
body, err := httpGet(ctx, uri)
if err != nil {
if strings.Contains(err.Error(), "404") {
return nil, ErrTransactionNotFound

View File

@ -4,7 +4,11 @@ import (
"context"
"fmt"
"strings"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
type ethGetTransactionByHashResponse struct {
@ -14,14 +18,42 @@ type ethGetTransactionByHashResponse struct {
To string `json:"to"`
}
type ethGetBlockByHashResponse struct {
Timestamp string `json:"timestamp"`
Number string `json:"number"`
type apiEvm struct {
chainId sdk.ChainID
}
func fetchEthTx(
func (e *apiEvm) FetchEvmTx(
ctx context.Context,
pool *pool.Pool,
txHash string,
metrics metrics.Metrics,
logger *zap.Logger,
) (*TxDetail, error) {
// get rpc sorted by score and priority.
rpcs := pool.GetItems()
if len(rpcs) == 0 {
return nil, ErrChainNotSupported
}
var txDetail *TxDetail
var err error
for _, rpc := range rpcs {
// Wait for the RPC rate limiter
rpc.Wait(ctx)
txDetail, err = e.fetchEvmTx(ctx, rpc.Id, txHash)
if err != nil {
metrics.IncCallRpcError(uint16(e.chainId), rpc.Description)
logger.Debug("Failed to fetch transaction from evm node", zap.String("url", rpc.Id), zap.Error(err))
continue
}
metrics.IncCallRpcSuccess(uint16(e.chainId), rpc.Description)
break
}
return txDetail, err
}
func (e *apiEvm) fetchEvmTx(
ctx context.Context,
rateLimiter *time.Ticker,
baseUrl string,
txHash string,
) (*TxDetail, error) {
@ -37,7 +69,7 @@ func fetchEthTx(
// query transaction data
var txReply ethGetTransactionByHashResponse
{
err = client.CallContext(ctx, rateLimiter, &txReply, "eth_getTransactionByHash", nativeTxHash)
err = client.CallContext(ctx, &txReply, "eth_getTransactionByHash", nativeTxHash)
if err != nil {
return nil, fmt.Errorf("failed to get tx by hash: %w", err)
}

View File

@ -2,9 +2,12 @@ package chains
import (
"context"
"time"
"errors"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
type seiTx struct {
@ -30,31 +33,83 @@ func seiTxSearchExtractor(tx *cosmosTxSearchResponse, logs []cosmosLogWrapperRes
}
type apiSei struct {
wormchainUrl string
wormchainRateLimiter *time.Ticker
p2pNetwork string
p2pNetwork string
wormchainPool *pool.Pool
}
func fetchSeiDetail(ctx context.Context, baseUrl string, rateLimiter *time.Ticker, sequence, timestamp, srcChannel, dstChannel string) (*seiTx, error) {
func fetchSeiDetail(ctx context.Context, baseUrl string, sequence, timestamp, srcChannel, dstChannel string) (*seiTx, error) {
params := &cosmosTxSearchParams{Sequence: sequence, Timestamp: timestamp, SrcChannel: srcChannel, DstChannel: dstChannel}
return fetchTxSearch[seiTx](ctx, baseUrl, rateLimiter, params, seiTxSearchExtractor)
return fetchTxSearch[seiTx](ctx, baseUrl, params, seiTxSearchExtractor)
}
func (a *apiSei) fetchSeiTx(
func (a *apiSei) FetchSeiTx(
ctx context.Context,
rateLimiter *time.Ticker,
baseUrl string,
pool *pool.Pool,
txHash string,
metrics metrics.Metrics,
logger *zap.Logger,
) (*TxDetail, error) {
txHash = txHashLowerCaseWith0x(txHash)
wormchainTx, err := fetchWormchainDetail(ctx, a.wormchainUrl, a.wormchainRateLimiter, txHash)
// Get the wormchain rpcs sorted by availability.
wormchainRpcs := a.wormchainPool.GetItems()
if len(wormchainRpcs) == 0 {
return nil, errors.New("wormchain rpc pool is empty")
}
// Fetch the wormchain transaction
var wormchainTx *wormchainTx
var err error
for _, rpc := range wormchainRpcs {
// wait for the rpc to be available
rpc.Wait(ctx)
wormchainTx, err = fetchWormchainDetail(ctx, rpc.Id, txHash)
if err != nil {
metrics.IncCallRpcError(uint16(vaa.ChainIDWormchain), rpc.Description)
logger.Debug("Failed to fetch transaction from wormchain", zap.String("url", rpc.Id), zap.Error(err))
continue
}
metrics.IncCallRpcSuccess(uint16(vaa.ChainIDWormchain), rpc.Description)
break
}
// If the transaction is not found, return an error
if err != nil {
return nil, err
}
seiTx, err := fetchSeiDetail(ctx, baseUrl, rateLimiter, wormchainTx.sequence, wormchainTx.timestamp, wormchainTx.srcChannel, wormchainTx.dstChannel)
if wormchainTx == nil {
return nil, ErrTransactionNotFound
}
// Get the sei rpcs sorted by availability.
seiRpcs := pool.GetItems()
if len(seiRpcs) == 0 {
return nil, errors.New("sei rpc pool is empty")
}
// Fetch the sei transaction
var seiTx *seiTx
for _, rpc := range seiRpcs {
// wait for the rpc to be available
rpc.Wait(ctx)
seiTx, err = fetchSeiDetail(ctx, rpc.Id, wormchainTx.sequence, wormchainTx.timestamp, wormchainTx.srcChannel, wormchainTx.dstChannel)
if err != nil {
metrics.IncCallRpcError(uint16(vaa.ChainIDSei), rpc.Description)
logger.Debug("Failed to fetch transaction from sei", zap.String("url", rpc.Id), zap.Error(err))
continue
}
metrics.IncCallRpcSuccess(uint16(vaa.ChainIDSei), rpc.Description)
break
}
// If the transaction is not found, return an error
if err != nil {
return nil, err
}
if seiTx == nil {
return nil, ErrTransactionNotFound
}
return &TxDetail{
NativeTxHash: txHash,
From: wormchainTx.receiver,

View File

@ -7,7 +7,11 @@ import (
"time"
"github.com/mr-tron/base58"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/common/types"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
type solanaTransactionSignature struct {
@ -55,9 +59,41 @@ type apiSolana struct {
timestamp *time.Time
}
func (a *apiSolana) FetchSolanaTx(
ctx context.Context,
pool *pool.Pool,
txHash string,
metrics metrics.Metrics,
logger *zap.Logger,
) (*TxDetail, error) {
// get rpc sorted by score and priority.
rpcs := pool.GetItems()
if len(rpcs) == 0 {
return nil, ErrChainNotSupported
}
// Get the transaction from the Solana node API.
var txDetail *TxDetail
var err error
for _, rpc := range rpcs {
// Wait for the RPC rate limiter
rpc.Wait(ctx)
txDetail, err = a.fetchSolanaTx(ctx, rpc.Id, txHash)
if txDetail != nil {
metrics.IncCallRpcSuccess(uint16(sdk.ChainIDSolana), rpc.Description)
break
}
if err != nil {
metrics.IncCallRpcError(uint16(sdk.ChainIDSolana), rpc.Description)
logger.Debug("Failed to fetch transaction from Solana node", zap.String("url", rpc.Id), zap.Error(err))
}
}
return txDetail, err
}
func (a *apiSolana) fetchSolanaTx(
ctx context.Context,
rateLimiter *time.Ticker,
baseUrl string,
txHash string,
) (*TxDetail, error) {
@ -86,7 +122,7 @@ func (a *apiSolana) fetchSolanaTx(
if isNotNativeTxHash {
// Get transaction signatures for the given account
{
err = client.CallContext(ctx, rateLimiter, &sigs, "getSignaturesForAddress", base58.Encode(h))
err = client.CallContext(ctx, &sigs, "getSignaturesForAddress", base58.Encode(h))
if err != nil {
return nil, fmt.Errorf("failed to get signatures for account: %w (%+v)", err, err)
}
@ -114,7 +150,7 @@ func (a *apiSolana) fetchSolanaTx(
// Fetch the portal token bridge transaction
var response solanaGetTransactionResponse
{
err = client.CallContext(ctx, rateLimiter, &response, "getTransaction", nativeTxHash,
err = client.CallContext(ctx, &response, "getTransaction", nativeTxHash,
getTransactionConfig{
Encoding: "jsonParsed",
MaxSupportedTransactionVersion: 0,

View File

@ -4,9 +4,11 @@ import (
"context"
"fmt"
"strings"
"time"
"github.com/ethereum/go-ethereum/rpc"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
"go.uber.org/zap"
)
type suiGetTransactionBlockResponse struct {
@ -28,9 +30,37 @@ type suiGetTransactionBlockOpts struct {
ShowBalanceChanges bool `json:"showBalanceChanges"`
}
func FetchSuiTx(
ctx context.Context,
pool *pool.Pool,
txHash string,
metrics metrics.Metrics,
logger *zap.Logger,
) (*TxDetail, error) {
// get rpc sorted by score and priority.
rpcs := pool.GetItems()
if len(rpcs) == 0 {
return nil, ErrChainNotSupported
}
var txDetail *TxDetail
var err error
for _, rpc := range rpcs {
// Wait for the RPC rate limiter
rpc.Wait(ctx)
txDetail, err = fetchSuiTx(ctx, rpc.Id, txHash)
if err != nil {
logger.Debug("Failed to fetch transaction from SUI node", zap.String("url", rpc.Id), zap.Error(err))
continue
}
return txDetail, nil
}
return txDetail, err
}
func fetchSuiTx(
ctx context.Context,
rateLimiter *time.Ticker,
baseUrl string,
txHash string,
) (*TxDetail, error) {
@ -45,11 +75,6 @@ func fetchSuiTx(
// Query transaction data
var reply suiGetTransactionBlockResponse
{
// Wait for the rate limiter
if !waitForRateLimiter(ctx, rateLimiter) {
return nil, ctx.Err()
}
// Execute the remote procedure call
opts := suiGetTransactionBlockOpts{ShowInput: true}
err = client.CallContext(ctx, &reply, "sui_getTransactionBlock", txHash, opts)

View File

@ -3,22 +3,22 @@ package chains
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
type apiWormchain struct {
osmosisUrl string
osmosisRateLimiter *time.Ticker
kujiraUrl string
kujiraRateLimiter *time.Ticker
evmosUrl string
evmosRateLimiter *time.Ticker
p2pNetwork string
p2pNetwork string
evmosPool *pool.Pool
kujiraPool *pool.Pool
osmosisPool *pool.Pool
}
type wormchainTxDetail struct {
@ -66,13 +66,13 @@ type logWrapper struct {
Events []event `json:"events"`
}
type worchainTx struct {
type wormchainTx struct {
srcChannel, dstChannel, sender, receiver, timestamp, sequence string
}
func fetchWormchainDetail(ctx context.Context, baseUrl string, rateLimiter *time.Ticker, txHash string) (*worchainTx, error) {
func fetchWormchainDetail(ctx context.Context, baseUrl string, txHash string) (*wormchainTx, error) {
uri := fmt.Sprintf("%s/tx?hash=%s", baseUrl, txHash)
body, err := httpGet(ctx, rateLimiter, uri)
body, err := httpGet(ctx, uri)
if err != nil {
return nil, err
}
@ -121,7 +121,7 @@ func fetchWormchainDetail(ctx context.Context, baseUrl string, rateLimiter *time
}
}
}
return &worchainTx{
return &wormchainTx{
srcChannel: srcChannel,
dstChannel: dstChannel,
sender: sender,
@ -177,7 +177,32 @@ type osmosisTx struct {
txHash string
}
func fetchOsmosisDetail(ctx context.Context, baseUrl string, rateLimiter *time.Ticker, sequence, timestamp, srcChannel, dstChannel string) (*osmosisTx, error) {
func (a *apiWormchain) fetchOsmosisDetail(ctx context.Context, pool *pool.Pool, sequence, timestamp, srcChannel, dstChannel string, metrics metrics.Metrics) (*osmosisTx, error) {
if pool == nil {
return nil, fmt.Errorf("osmosis rpc pool not found")
}
osmosisRpcs := pool.GetItems()
if len(osmosisRpcs) == 0 {
return nil, fmt.Errorf("osmosis rpcs not found")
}
for _, rpc := range osmosisRpcs {
rpc.Wait(ctx)
osmosisTx, err := fetchOsmosisDetail(ctx, rpc.Id, sequence, timestamp, srcChannel, dstChannel)
if osmosisTx != nil {
metrics.IncCallRpcSuccess(uint16(sdk.ChainIDOsmosis), rpc.Description)
return osmosisTx, nil
}
if err != nil {
metrics.IncCallRpcError(uint16(sdk.ChainIDOsmosis), rpc.Description)
}
}
return nil, fmt.Errorf("osmosis tx not found")
}
func fetchOsmosisDetail(ctx context.Context, baseUrl string, sequence, timestamp, srcChannel, dstChannel string) (*osmosisTx, error) {
queryTemplate := `send_packet.packet_sequence='%s' AND send_packet.packet_timeout_timestamp='%s' AND send_packet.packet_src_channel='%s' AND send_packet.packet_dst_channel='%s'`
query := fmt.Sprintf(queryTemplate, sequence, timestamp, srcChannel, dstChannel)
q := osmosisRequest{
@ -193,7 +218,7 @@ func fetchOsmosisDetail(ctx context.Context, baseUrl string, rateLimiter *time.T
},
}
response, err := httpPost(ctx, rateLimiter, baseUrl, q)
response, err := httpPost(ctx, baseUrl, q)
if err != nil {
return nil, err
}
@ -255,7 +280,31 @@ type evmosTx struct {
txHash string
}
func fetchEvmosDetail(ctx context.Context, baseUrl string, rateLimiter *time.Ticker, sequence, timestamp, srcChannel, dstChannel string) (*evmosTx, error) {
func (a *apiWormchain) fetchEvmosDetail(ctx context.Context, pool *pool.Pool, sequence, timestamp, srcChannel, dstChannel string, metrics metrics.Metrics) (*evmosTx, error) {
if pool == nil {
return nil, fmt.Errorf("evmos rpc pool not found")
}
evmosRpcs := pool.GetItems()
if len(evmosRpcs) == 0 {
return nil, fmt.Errorf("evmos rpcs not found")
}
for _, rpc := range evmosRpcs {
rpc.Wait(ctx)
evmosTx, err := fetchEvmosDetail(ctx, rpc.Id, sequence, timestamp, srcChannel, dstChannel)
if evmosTx != nil {
metrics.IncCallRpcSuccess(uint16(sdk.ChainIDEvmos), rpc.Description)
return evmosTx, nil
}
if err != nil {
metrics.IncCallRpcError(uint16(sdk.ChainIDEvmos), rpc.Description)
}
}
return nil, fmt.Errorf("evmos tx not found")
}
func fetchEvmosDetail(ctx context.Context, baseUrl string, sequence, timestamp, srcChannel, dstChannel string) (*evmosTx, error) {
queryTemplate := `send_packet.packet_sequence='%s' AND send_packet.packet_timeout_timestamp='%s' AND send_packet.packet_src_channel='%s' AND send_packet.packet_dst_channel='%s'`
query := fmt.Sprintf(queryTemplate, sequence, timestamp, srcChannel, dstChannel)
q := evmosRequest{
@ -271,7 +320,8 @@ func fetchEvmosDetail(ctx context.Context, baseUrl string, rateLimiter *time.Tic
},
}
response, err := httpPost(ctx, rateLimiter, baseUrl, q)
//response, err := httpPost(ctx, rateLimiter, baseUrl, q)
response, err := httpPost(ctx, baseUrl, q)
if err != nil {
return nil, err
}
@ -333,7 +383,29 @@ type kujiraTx struct {
txHash string
}
func fetchKujiraDetail(ctx context.Context, baseUrl string, rateLimiter *time.Ticker, sequence, timestamp, srcChannel, dstChannel string) (*kujiraTx, error) {
func (a *apiWormchain) fetchKujiraDetail(ctx context.Context, pool *pool.Pool, sequence, timestamp, srcChannel, dstChannel string, metrics metrics.Metrics) (*kujiraTx, error) {
if pool == nil {
return nil, fmt.Errorf("kujira rpc pool not found")
}
kujiraRpcs := pool.GetItems()
if len(kujiraRpcs) == 0 {
return nil, fmt.Errorf("kujira rpcs not found")
}
for _, rpc := range kujiraRpcs {
rpc.Wait(ctx)
kujiraTx, err := fetchKujiraDetail(ctx, rpc.Id, sequence, timestamp, srcChannel, dstChannel)
if kujiraTx != nil {
metrics.IncCallRpcSuccess(uint16(sdk.ChainIDKujira), rpc.Description)
return kujiraTx, nil
}
if err != nil {
metrics.IncCallRpcError(uint16(sdk.ChainIDKujira), rpc.Description)
}
}
return nil, fmt.Errorf("kujira tx not found")
}
func fetchKujiraDetail(ctx context.Context, baseUrl string, sequence, timestamp, srcChannel, dstChannel string) (*kujiraTx, error) {
queryTemplate := `send_packet.packet_sequence='%s' AND send_packet.packet_timeout_timestamp='%s' AND send_packet.packet_src_channel='%s' AND send_packet.packet_dst_channel='%s'`
query := fmt.Sprintf(queryTemplate, sequence, timestamp, srcChannel, dstChannel)
q := kujiraRequest{
@ -349,7 +421,7 @@ func fetchKujiraDetail(ctx context.Context, baseUrl string, rateLimiter *time.Ti
},
}
response, err := httpPost(ctx, rateLimiter, baseUrl, q)
response, err := httpPost(ctx, baseUrl, q)
if err != nil {
return nil, err
}
@ -372,26 +444,51 @@ type WorchainAttributeTxDetail struct {
OriginAddress string `bson:"originAddress"`
}
func (a *apiWormchain) fetchWormchainTx(
func (a *apiWormchain) FetchWormchainTx(
ctx context.Context,
rateLimiter *time.Ticker,
baseUrl string,
wormchainPool *pool.Pool,
txHash string,
metrics metrics.Metrics,
logger *zap.Logger,
) (*TxDetail, error) {
txHash = txHashLowerCaseWith0x(txHash)
wormchainTx, err := fetchWormchainDetail(ctx, baseUrl, rateLimiter, txHash)
// Get the wormchain rpcs sorted by availability.
wormchainRpcs := wormchainPool.GetItems()
if len(wormchainRpcs) == 0 {
return nil, errors.New("wormchain rpc pool is empty")
}
var wormchainTx *wormchainTx
var err error
for _, rpc := range wormchainRpcs {
// wait for the rpc to be available
rpc.Wait(ctx)
wormchainTx, err = fetchWormchainDetail(ctx, rpc.Id, txHash)
if err != nil {
metrics.IncCallRpcError(uint16(sdk.ChainIDWormchain), rpc.Description)
logger.Debug("Failed to fetch transaction from wormchain", zap.String("url", rpc.Id), zap.Error(err))
continue
}
metrics.IncCallRpcSuccess(uint16(sdk.ChainIDWormchain), rpc.Description)
break
}
if err != nil {
return nil, err
}
if wormchainTx == nil {
return nil, errors.New("failed to fetch wormchain transaction details")
}
// Verify if this transaction is from osmosis by wormchain
if a.isOsmosisTx(wormchainTx) {
osmosisTx, err := fetchOsmosisDetail(ctx, a.osmosisUrl, a.osmosisRateLimiter, wormchainTx.sequence, wormchainTx.timestamp, wormchainTx.srcChannel, wormchainTx.dstChannel)
osmosisTx, err := a.fetchOsmosisDetail(ctx, a.osmosisPool, wormchainTx.sequence, wormchainTx.timestamp, wormchainTx.srcChannel, wormchainTx.dstChannel, metrics)
if err != nil {
return nil, err
}
return &TxDetail{
NativeTxHash: txHash,
From: wormchainTx.receiver,
@ -408,10 +505,11 @@ func (a *apiWormchain) fetchWormchainTx(
// Verify if this transaction is from kujira by wormchain
if a.isKujiraTx(wormchainTx) {
kujiraTx, err := fetchKujiraDetail(ctx, a.kujiraUrl, a.kujiraRateLimiter, wormchainTx.sequence, wormchainTx.timestamp, wormchainTx.srcChannel, wormchainTx.dstChannel)
kujiraTx, err := a.fetchKujiraDetail(ctx, a.kujiraPool, wormchainTx.sequence, wormchainTx.timestamp, wormchainTx.srcChannel, wormchainTx.dstChannel, metrics)
if err != nil {
return nil, err
}
return &TxDetail{
NativeTxHash: txHash,
From: wormchainTx.receiver,
@ -428,10 +526,11 @@ func (a *apiWormchain) fetchWormchainTx(
// Verify if this transaction is from evmos by wormchain
if a.isEvmosTx(wormchainTx) {
evmosTx, err := fetchEvmosDetail(ctx, a.evmosUrl, a.evmosRateLimiter, wormchainTx.sequence, wormchainTx.timestamp, wormchainTx.srcChannel, wormchainTx.dstChannel)
evmosTx, err := a.fetchEvmosDetail(ctx, a.evmosPool, wormchainTx.sequence, wormchainTx.timestamp, wormchainTx.srcChannel, wormchainTx.dstChannel, metrics)
if err != nil {
return nil, err
}
return &TxDetail{
NativeTxHash: txHash,
From: wormchainTx.receiver,
@ -452,7 +551,7 @@ func (a *apiWormchain) fetchWormchainTx(
}, nil
}
func (a *apiWormchain) isOsmosisTx(tx *worchainTx) bool {
func (a *apiWormchain) isOsmosisTx(tx *wormchainTx) bool {
if a.p2pNetwork == domain.P2pMainNet {
return tx.srcChannel == "channel-2186" && tx.dstChannel == "channel-3"
}
@ -462,7 +561,7 @@ func (a *apiWormchain) isOsmosisTx(tx *worchainTx) bool {
return false
}
func (a *apiWormchain) isKujiraTx(tx *worchainTx) bool {
func (a *apiWormchain) isKujiraTx(tx *wormchainTx) bool {
if a.p2pNetwork == domain.P2pMainNet {
return tx.srcChannel == "channel-113" && tx.dstChannel == "channel-9"
}
@ -473,7 +572,7 @@ func (a *apiWormchain) isKujiraTx(tx *worchainTx) bool {
return false
}
func (a *apiWormchain) isEvmosTx(tx *worchainTx) bool {
func (a *apiWormchain) isEvmosTx(tx *wormchainTx) bool {
if a.p2pNetwork == domain.P2pMainNet {
return tx.srcChannel == "channel-94" && tx.dstChannel == "channel-5"
}

View File

@ -4,11 +4,12 @@ import (
"context"
"errors"
"fmt"
"math"
"time"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
var (
@ -16,15 +17,6 @@ var (
ErrTransactionNotFound = errors.New("transaction not found")
)
var (
// rateLimitersByChain maps a chain ID to the request rate limiter for that chain.
rateLimitersByChain map[sdk.ChainID]*time.Ticker
// baseUrlsByChain maps a chain ID to the base URL of the RPC/API service for that chain.
baseUrlsByChain map[sdk.ChainID]string
)
type WormchainTxDetail struct {
}
type TxDetail struct {
// From is the address that signed the transaction, encoded in the chain's native format.
From string
@ -39,115 +31,38 @@ type AttributeTxDetail struct {
Value any
}
func Initialize(cfg *config.RpcProviderSettings, testnetConfig *config.TestnetRpcProviderSettings) {
// convertToRateLimiter converts "requests per minute" into the associated *time.Ticker
convertToRateLimiter := func(requestsPerMinute uint16) *time.Ticker {
division := float64(time.Minute) / float64(time.Duration(requestsPerMinute))
roundedUp := math.Ceil(division)
duration := time.Duration(roundedUp)
return time.NewTicker(duration)
}
// Initialize rate limiters for each chain
rateLimitersByChain = make(map[sdk.ChainID]*time.Ticker)
rateLimitersByChain[sdk.ChainIDAcala] = convertToRateLimiter(cfg.AcalaRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDArbitrum] = convertToRateLimiter(cfg.ArbitrumRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDAlgorand] = convertToRateLimiter(cfg.AlgorandRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDAptos] = convertToRateLimiter(cfg.AptosRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDAvalanche] = convertToRateLimiter(cfg.AvalancheRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDBase] = convertToRateLimiter(cfg.BaseRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDBSC] = convertToRateLimiter(cfg.BscRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDCelo] = convertToRateLimiter(cfg.CeloRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDEthereum] = convertToRateLimiter(cfg.EthereumRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDFantom] = convertToRateLimiter(cfg.FantomRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDInjective] = convertToRateLimiter(cfg.InjectiveRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDKarura] = convertToRateLimiter(cfg.KaruraRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDKlaytn] = convertToRateLimiter(cfg.KlaytnRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDMoonbeam] = convertToRateLimiter(cfg.MoonbeamRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDOasis] = convertToRateLimiter(cfg.OasisRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDOptimism] = convertToRateLimiter(cfg.OptimismRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDPolygon] = convertToRateLimiter(cfg.PolygonRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDSolana] = convertToRateLimiter(cfg.SolanaRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDTerra] = convertToRateLimiter(cfg.TerraRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDTerra2] = convertToRateLimiter(cfg.Terra2RequestsPerMinute)
rateLimitersByChain[sdk.ChainIDSui] = convertToRateLimiter(cfg.SuiRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDXpla] = convertToRateLimiter(cfg.XplaRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDWormchain] = convertToRateLimiter(cfg.WormchainRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDOsmosis] = convertToRateLimiter(cfg.OsmosisRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDSei] = convertToRateLimiter(cfg.SeiRequestsPerMinute)
// Initialize the RPC base URLs for each chain
baseUrlsByChain = make(map[sdk.ChainID]string)
baseUrlsByChain[sdk.ChainIDAcala] = cfg.AcalaBaseUrl
baseUrlsByChain[sdk.ChainIDArbitrum] = cfg.ArbitrumBaseUrl
baseUrlsByChain[sdk.ChainIDAlgorand] = cfg.AlgorandBaseUrl
baseUrlsByChain[sdk.ChainIDAptos] = cfg.AptosBaseUrl
baseUrlsByChain[sdk.ChainIDAvalanche] = cfg.AvalancheBaseUrl
baseUrlsByChain[sdk.ChainIDBase] = cfg.BaseBaseUrl
baseUrlsByChain[sdk.ChainIDBSC] = cfg.BscBaseUrl
baseUrlsByChain[sdk.ChainIDCelo] = cfg.CeloBaseUrl
baseUrlsByChain[sdk.ChainIDEthereum] = cfg.EthereumBaseUrl
baseUrlsByChain[sdk.ChainIDFantom] = cfg.FantomBaseUrl
baseUrlsByChain[sdk.ChainIDInjective] = cfg.InjectiveBaseUrl
baseUrlsByChain[sdk.ChainIDKarura] = cfg.KaruraBaseUrl
baseUrlsByChain[sdk.ChainIDKlaytn] = cfg.KlaytnBaseUrl
baseUrlsByChain[sdk.ChainIDMoonbeam] = cfg.MoonbeamBaseUrl
baseUrlsByChain[sdk.ChainIDOasis] = cfg.OasisBaseUrl
baseUrlsByChain[sdk.ChainIDOptimism] = cfg.OptimismBaseUrl
baseUrlsByChain[sdk.ChainIDPolygon] = cfg.PolygonBaseUrl
baseUrlsByChain[sdk.ChainIDSolana] = cfg.SolanaBaseUrl
baseUrlsByChain[sdk.ChainIDTerra] = cfg.TerraBaseUrl
baseUrlsByChain[sdk.ChainIDTerra2] = cfg.Terra2BaseUrl
baseUrlsByChain[sdk.ChainIDSui] = cfg.SuiBaseUrl
baseUrlsByChain[sdk.ChainIDXpla] = cfg.XplaBaseUrl
baseUrlsByChain[sdk.ChainIDWormchain] = cfg.WormchainBaseUrl
baseUrlsByChain[sdk.ChainIDSei] = cfg.SeiBaseUrl
if testnetConfig != nil {
rateLimitersByChain[sdk.ChainIDArbitrumSepolia] = convertToRateLimiter(testnetConfig.ArbitrumSepoliaRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDBaseSepolia] = convertToRateLimiter(testnetConfig.BaseSepoliaRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDSepolia] = convertToRateLimiter(testnetConfig.EthereumSepoliaRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDOptimismSepolia] = convertToRateLimiter(testnetConfig.OptimismSepoliaRequestsPerMinute)
baseUrlsByChain[sdk.ChainIDArbitrumSepolia] = testnetConfig.ArbitrumSepoliaBaseUrl
baseUrlsByChain[sdk.ChainIDBaseSepolia] = testnetConfig.BaseSepoliaBaseUrl
baseUrlsByChain[sdk.ChainIDSepolia] = testnetConfig.EthereumSepoliaBaseUrl
baseUrlsByChain[sdk.ChainIDOptimismSepolia] = testnetConfig.OptimismSepoliaBaseUrl
}
}
func FetchTx(
ctx context.Context,
cfg *config.RpcProviderSettings,
rpcPool map[sdk.ChainID]*pool.Pool,
chainId sdk.ChainID,
txHash string,
timestamp *time.Time,
p2pNetwork string,
m metrics.Metrics,
logger *zap.Logger,
) (*TxDetail, error) {
// Decide which RPC/API service to use based on chain ID
var fetchFunc func(ctx context.Context, rateLimiter *time.Ticker, baseUrl string, txHash string) (*TxDetail, error)
var fetchFunc func(ctx context.Context, pool *pool.Pool, txHash string, metrics metrics.Metrics, logger *zap.Logger) (*TxDetail, error)
switch chainId {
case sdk.ChainIDSolana:
apiSolana := &apiSolana{
timestamp: timestamp,
}
fetchFunc = apiSolana.fetchSolanaTx
fetchFunc = apiSolana.FetchSolanaTx
case sdk.ChainIDAlgorand:
fetchFunc = fetchAlgorandTx
fetchFunc = FetchAlgorandTx
case sdk.ChainIDAptos:
fetchFunc = fetchAptosTx
fetchFunc = FetchAptosTx
case sdk.ChainIDSui:
fetchFunc = fetchSuiTx
fetchFunc = FetchSuiTx
case sdk.ChainIDInjective,
sdk.ChainIDTerra,
sdk.ChainIDTerra2,
sdk.ChainIDXpla:
fetchFunc = fetchCosmosTx
apiCosmos := &apiCosmos{
chainId: chainId,
}
fetchFunc = apiCosmos.FetchCosmosTx
case sdk.ChainIDAcala,
sdk.ChainIDArbitrum,
sdk.ChainIDArbitrumSepolia,
@ -166,50 +81,34 @@ func FetchTx(
sdk.ChainIDOptimism,
sdk.ChainIDOptimismSepolia,
sdk.ChainIDPolygon:
fetchFunc = fetchEthTx
apiEvm := &apiEvm{
chainId: chainId,
}
fetchFunc = apiEvm.FetchEvmTx
case sdk.ChainIDWormchain:
rateLimiter, ok := rateLimitersByChain[sdk.ChainIDOsmosis]
if !ok {
return nil, errors.New("found no rate limiter for chain osmosis")
}
apiWormchain := &apiWormchain{
osmosisUrl: cfg.OsmosisBaseUrl,
osmosisRateLimiter: rateLimiter,
evmosUrl: cfg.EvmosBaseUrl,
evmosRateLimiter: rateLimiter,
kujiraUrl: cfg.KujiraBaseUrl,
kujiraRateLimiter: rateLimiter,
p2pNetwork: p2pNetwork,
p2pNetwork: p2pNetwork,
evmosPool: rpcPool[sdk.ChainIDEvmos],
kujiraPool: rpcPool[sdk.ChainIDKujira],
osmosisPool: rpcPool[sdk.ChainIDOsmosis],
}
fetchFunc = apiWormchain.fetchWormchainTx
fetchFunc = apiWormchain.FetchWormchainTx
case sdk.ChainIDSei:
rateLimiter, ok := rateLimitersByChain[sdk.ChainIDWormchain]
if !ok {
return nil, errors.New("found no rate limiter for chain osmosis")
}
apiSei := &apiSei{
wormchainRateLimiter: rateLimiter,
wormchainUrl: cfg.WormchainBaseUrl,
p2pNetwork: p2pNetwork,
p2pNetwork: p2pNetwork,
wormchainPool: rpcPool[sdk.ChainIDWormchain],
}
fetchFunc = apiSei.fetchSeiTx
fetchFunc = apiSei.FetchSeiTx
default:
return nil, ErrChainNotSupported
}
// Get the rate limiter and base URL for the given chain ID
rateLimiter, ok := rateLimitersByChain[chainId]
pool, ok := rpcPool[chainId]
if !ok {
return nil, fmt.Errorf("found no rate limiter for chain %s", chainId.String())
}
baseUrl, ok := baseUrlsByChain[chainId]
if !ok {
return nil, fmt.Errorf("found no base URL for chain %s", chainId.String())
return nil, fmt.Errorf("not found rpc pool for chain %s", chainId.String())
}
// Get transaction details from the RPC/API service
txDetail, err := fetchFunc(ctx, rateLimiter, baseUrl, txHash)
txDetail, err := fetchFunc(ctx, pool, txHash, m, logger)
if err != nil {
return nil, fmt.Errorf("failed to retrieve tx information: %w", err)
}

View File

@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"time"
)
type cosmosRequest struct {
@ -69,7 +68,7 @@ type cosmosLogWrapperResponse struct {
type txSearchExtractor[T any] func(tx *cosmosTxSearchResponse, log []cosmosLogWrapperResponse) (T, error)
func fetchTxSearch[T any](ctx context.Context, baseUrl string, rl *time.Ticker, p *cosmosTxSearchParams, extractor txSearchExtractor[*T]) (*T, error) {
func fetchTxSearch[T any](ctx context.Context, baseUrl string, p *cosmosTxSearchParams, extractor txSearchExtractor[*T]) (*T, error) {
queryTemplate := `send_packet.packet_sequence='%s' AND send_packet.packet_timeout_timestamp='%s' AND send_packet.packet_src_channel='%s' AND send_packet.packet_dst_channel='%s'`
query := fmt.Sprintf(queryTemplate, p.Sequence, p.Timestamp, p.SrcChannel, p.DstChannel)
q := cosmosRequest{
@ -84,7 +83,7 @@ func fetchTxSearch[T any](ctx context.Context, baseUrl string, rl *time.Ticker,
Page: "1",
},
}
response, err := httpPost(ctx, rl, baseUrl, q)
response, err := httpPost(ctx, baseUrl, q)
if err != nil {
return nil, err
}

View File

@ -7,38 +7,14 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/ethereum/go-ethereum/rpc"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
)
// timestampFromHex converts a hex timestamp into a `time.Time` value.
func timestampFromHex(s string) (time.Time, error) {
// remove the leading "0x" or "0X" from the hex string
hexDigits := strings.Replace(s, "0x", "", 1)
hexDigits = strings.Replace(hexDigits, "0X", "", 1)
// parse the hex digits into an integer
epoch, err := strconv.ParseInt(hexDigits, 16, 64)
if err != nil {
return time.Time{}, fmt.Errorf("failed to parse hex timestamp: %w", err)
}
// convert the unix epoch into a `time.Time` value
timestamp := time.Unix(epoch, 0).UTC()
return timestamp, nil
}
// httpGet is a helper function that performs an HTTP request.
func httpGet(ctx context.Context, rateLimiter *time.Ticker, url string) ([]byte, error) {
// Wait for the rate limiter
if !waitForRateLimiter(ctx, rateLimiter) {
return nil, ctx.Err()
}
func httpGet(ctx context.Context, url string) ([]byte, error) {
// Build the HTTP request
request, err := http.NewRequestWithContext(ctx, "GET", url, nil)
@ -66,12 +42,8 @@ func httpGet(ctx context.Context, rateLimiter *time.Ticker, url string) ([]byte,
}
// httpPost is a helper function that performs an HTTP request.
func httpPost(ctx context.Context, rateLimiter *time.Ticker, url string, body any) ([]byte, error) {
// Wait for the rate limiter
if !waitForRateLimiter(ctx, rateLimiter) {
return nil, ctx.Err()
}
// func httpPost(ctx context.Context, rateLimiter *time.Ticker, url string, body any) ([]byte, error) {
func httpPost(ctx context.Context, url string, body any) ([]byte, error) {
b, err := json.Marshal(body)
if err != nil {
@ -104,16 +76,6 @@ func httpPost(ctx context.Context, rateLimiter *time.Ticker, url string, body an
return result, nil
}
func waitForRateLimiter(ctx context.Context, t *time.Ticker) bool {
select {
case <-t.C:
return true
case <-ctx.Done():
return false
}
}
// rateLimitedRpcClient is a wrapper around `rpc.Client` that adds rate limits
type rateLimitedRpcClient struct {
client *rpc.Client
}
@ -133,16 +95,10 @@ func rpcDialContext(ctx context.Context, url string) (*rateLimitedRpcClient, err
func (c *rateLimitedRpcClient) CallContext(
ctx context.Context,
rateLimiter *time.Ticker,
result interface{},
method string,
args ...interface{},
) error {
if !waitForRateLimiter(ctx, rateLimiter) {
return ctx.Err()
}
return c.client.CallContext(ctx, result, method, args...)
}
@ -156,3 +112,31 @@ func txHashLowerCaseWith0x(v string) string {
}
return "0x" + strings.ToLower(v)
}
func FormatTxHashByChain(chainId sdk.ChainID, txHash string) string {
switch chainId {
case sdk.ChainIDAcala,
sdk.ChainIDArbitrum,
sdk.ChainIDArbitrumSepolia,
sdk.ChainIDAvalanche,
sdk.ChainIDBase,
sdk.ChainIDBaseSepolia,
sdk.ChainIDBSC,
sdk.ChainIDCelo,
sdk.ChainIDEthereum,
sdk.ChainIDSepolia,
sdk.ChainIDFantom,
sdk.ChainIDKarura,
sdk.ChainIDKlaytn,
sdk.ChainIDMoonbeam,
sdk.ChainIDOasis,
sdk.ChainIDOptimism,
sdk.ChainIDOptimismSepolia,
sdk.ChainIDPolygon:
return txHashLowerCaseWith0x(txHash)
case sdk.ChainIDSei, sdk.ChainIDWormchain:
return txHashLowerCaseWith0x(txHash)
default:
return txHash
}
}

View File

@ -2,6 +2,7 @@ package backfiller
import (
"context"
"errors"
"fmt"
"log"
"os"
@ -15,7 +16,8 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/common/utils"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/consumer"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
@ -101,22 +103,17 @@ func run(getStrategyCallbacksFunc getStrategyCallbacksFunc) {
rootCtx, rootCtxCancel := context.WithCancel(context.Background())
// Load config
cfg, err := config.LoadFromEnv[config.BackfillerSettings]()
cfg, err := config.NewBackfillerSettings()
if err != nil {
log.Fatal("Failed to load config: ", err)
}
var testRpcConfig *config.TestnetRpcProviderSettings
if configuration.IsTestnet(cfg.P2pNetwork) {
testRpcConfig, err = config.LoadFromEnv[config.TestnetRpcProviderSettings]()
if err != nil {
log.Fatal("Error loading testnet rpc config: ", err)
}
// create rpc pool
rpcPool, err := newRpcPool(cfg)
if err != nil {
log.Fatal("Failed to initialize rpc pool: ", zap.Error(err))
}
// Initialize rate limiters
chains.Initialize(&cfg.RpcProviderSettings, testRpcConfig)
// Initialize logger
rootLogger := logger.New("backfiller", logger.WithLevel(cfg.LogLevel))
mainLogger := makeLogger(rootLogger, "main")
@ -175,14 +172,14 @@ func run(getStrategyCallbacksFunc getStrategyCallbacksFunc) {
for i := uint(0); i < cfg.NumWorkers; i++ {
name := fmt.Sprintf("worker-%d", i)
p := consumerParams{
logger: makeLogger(rootLogger, name),
rpcProviderSettings: &cfg.RpcProviderSettings,
repository: repository,
queueRx: queue,
wg: &wg,
totalDocuments: totalDocuments,
processedDocuments: &processedDocuments,
p2pNetwork: cfg.P2pNetwork,
logger: makeLogger(rootLogger, name),
rpcPool: rpcPool,
repository: repository,
queueRx: queue,
wg: &wg,
totalDocuments: totalDocuments,
processedDocuments: &processedDocuments,
p2pNetwork: cfg.P2pNetwork,
}
go consume(rootCtx, &p)
}
@ -258,14 +255,14 @@ func produce(ctx context.Context, params *producerParams) {
// consumerParams contains the parameters for the consumer goroutine.
type consumerParams struct {
logger *zap.Logger
rpcProviderSettings *config.RpcProviderSettings
repository *consumer.Repository
queueRx <-chan consumer.GlobalTransaction
wg *sync.WaitGroup
totalDocuments uint64
processedDocuments *atomic.Uint64
p2pNetwork string
logger *zap.Logger
rpcPool map[sdk.ChainID]*pool.Pool
repository *consumer.Repository
queueRx <-chan consumer.GlobalTransaction
wg *sync.WaitGroup
totalDocuments uint64
processedDocuments *atomic.Uint64
p2pNetwork string
}
// consume reads VAA IDs from a channel, processes them, and updates the database accordingly.
@ -331,7 +328,7 @@ func consume(ctx context.Context, params *consumerParams) {
Overwrite: true, // Overwrite old contents
Metrics: metrics,
}
_, err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcProviderSettings, params.repository, &p, params.p2pNetwork)
_, err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcPool, params.repository, &p, params.p2pNetwork)
if err != nil {
params.logger.Error("Failed to track source tx",
zap.String("vaaId", globalTx.Id),
@ -358,3 +355,69 @@ func consume(ctx context.Context, params *consumerParams) {
}
}
func newRpcPool(cfg *config.BackfillerSettings) (map[sdk.ChainID]*pool.Pool, error) {
var rpcConfigMap map[sdk.ChainID][]config.RpcConfig
var err error
if cfg.RpcProviderSettingsJson != nil {
rpcConfigMap, err = cfg.MapRpcProviderToRpcConfig()
if err != nil {
return nil, err
}
} else if cfg.RpcProviderSettings != nil {
// get rpc settings map
rpcConfigMap, err = cfg.RpcProviderSettings.ToMap()
if err != nil {
return nil, err
}
var testRpcConfig *config.TestnetRpcProviderSettings
if configuration.IsTestnet(cfg.P2pNetwork) {
testRpcConfig, err = config.LoadFromEnv[config.TestnetRpcProviderSettings]()
if err != nil {
log.Fatal("Error loading testnet rpc config: ", err)
}
}
// get rpc testnet settings map
var rpcTestnetMap map[sdk.ChainID][]config.RpcConfig
if testRpcConfig != nil {
rpcTestnetMap, err = cfg.TestnetRpcProviderSettings.ToMap()
if err != nil {
return nil, err
}
}
// merge rpc testnet settings to rpc settings map
if len(rpcTestnetMap) > 0 {
for chainID, rpcConfig := range rpcTestnetMap {
rpcConfigMap[chainID] = append(rpcConfigMap[chainID], rpcConfig...)
}
}
} else {
return nil, errors.New("rpc provider settings not found")
}
domains := []string{".network", ".cloud", ".com", ".io", ".build", ".team", ".dev", ".zone", ".org", ".net", ".in"}
// convert rpc settings map to rpc pool
convertFn := func(rpcConfig []config.RpcConfig) []pool.Config {
poolConfigs := make([]pool.Config, 0, len(rpcConfig))
for _, rpc := range rpcConfig {
poolConfigs = append(poolConfigs, pool.Config{
Id: rpc.Url,
Priority: rpc.Priority,
Description: utils.FindSubstringBeforeDomains(rpc.Url, domains),
RequestsPerMinute: rpc.RequestsPerMinute,
})
}
return poolConfigs
}
// create rpc pool
rpcPool := make(map[sdk.ChainID]*pool.Pool)
for chainID, rpcConfig := range rpcConfigMap {
rpcPool[chainID] = pool.NewPool(convertFn(rpcConfig))
}
return rpcPool, nil
}

View File

@ -1,49 +0,0 @@
package fetchone
import (
"context"
"log"
"os"
"strconv"
"time"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
func Run() {
// validate commandline arguments
if len(os.Args) != 5 {
log.Fatalf("Usage: ./%s <chain name> <tx hash> <block time> <p2p network>\n", os.Args[0])
}
// load config settings
cfg, err := config.LoadFromEnv[config.RpcProviderSettings]()
if err != nil {
log.Fatalf("Failed to load credentials from environment: %v", err)
}
// get chain ID from args
chainId, err := vaa.ChainIDFromString(os.Args[1])
if err != nil {
log.Fatalf("Failed to convert chain name to chain ID: %v", err)
}
blockTime, err := strconv.ParseInt(os.Args[3], 10, 64)
if err != nil {
log.Fatalf("Failed to convert block time to int64: %v", err)
}
timestamp := time.Unix(blockTime, 0)
// fetch tx data
chains.Initialize(cfg, nil)
txDetail, err := chains.FetchTx(context.Background(), cfg, chainId, os.Args[2], &timestamp, os.Args[4])
if err != nil {
log.Fatalf("Failed to get transaction data: %v", err)
}
// print tx details
log.Printf("tx detail: %+v", txDetail)
}

View File

@ -2,6 +2,7 @@ package service
import (
"context"
"errors"
"log"
"os"
"os/signal"
@ -18,13 +19,15 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/common/utils"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/consumer"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/http/infrastructure"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/http/vaa"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/queue"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
@ -32,19 +35,11 @@ func Run() {
rootCtx, rootCtxCancel := context.WithCancel(context.Background())
// load config
cfg, err := config.LoadFromEnv[config.ServiceSettings]()
cfg, err := config.New()
if err != nil {
log.Fatal("Error loading config: ", err)
}
var testRpcConfig *config.TestnetRpcProviderSettings
if configuration.IsTestnet(cfg.P2pNetwork) {
testRpcConfig, err = config.LoadFromEnv[config.TestnetRpcProviderSettings]()
if err != nil {
log.Fatal("Error loading testnet rpc config: ", err)
}
}
// initialize metrics
metrics := newMetrics(cfg)
@ -53,8 +48,12 @@ func Run() {
logger.Info("Starting wormhole-explorer-tx-tracker ...")
// initialize rate limiters
chains.Initialize(&cfg.RpcProviderSettings, testRpcConfig)
// create rpc pool
// TODO: review: RpcProviderSettings
rpcPool, err := newRpcPool(cfg)
if err != nil {
logger.Fatal("Failed to initialize rpc pool: ", zap.Error(err))
}
// initialize the database client
db, err := dbutil.Connect(rootCtx, logger, cfg.MongodbUri, cfg.MongodbDatabase, false)
@ -67,7 +66,7 @@ func Run() {
vaaRepository := vaa.NewRepository(db.Database, logger)
// create controller
vaaController := vaa.NewController(vaaRepository, repository, &cfg.RpcProviderSettings, cfg.P2pNetwork, logger)
vaaController := vaa.NewController(rpcPool, vaaRepository, repository, cfg.P2pNetwork, logger)
// start serving /health and /ready endpoints
healthChecks, err := makeHealthChecks(rootCtx, cfg, db.Database)
@ -79,12 +78,12 @@ func Run() {
// create and start a pipeline consumer.
vaaConsumeFunc := newVAAConsumeFunc(rootCtx, cfg, metrics, logger)
vaaConsumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, rootCtx, logger, repository, metrics, cfg.P2pNetwork)
vaaConsumer := consumer.New(vaaConsumeFunc, rpcPool, rootCtx, logger, repository, metrics, cfg.P2pNetwork, cfg.ConsumerWorkersSize)
vaaConsumer.Start(rootCtx)
// create and start a notification consumer.
notificationConsumeFunc := newNotificationConsumeFunc(rootCtx, cfg, metrics, logger)
notificationConsumer := consumer.New(notificationConsumeFunc, &cfg.RpcProviderSettings, rootCtx, logger, repository, metrics, cfg.P2pNetwork)
notificationConsumer := consumer.New(notificationConsumeFunc, rpcPool, rootCtx, logger, repository, metrics, cfg.P2pNetwork, cfg.ConsumerWorkersSize)
notificationConsumer.Start(rootCtx)
logger.Info("Started wormhole-explorer-tx-tracker")
@ -217,3 +216,69 @@ func newMetrics(cfg *config.ServiceSettings) metrics.Metrics {
}
return metrics.NewPrometheusMetrics(cfg.Environment)
}
func newRpcPool(cfg *config.ServiceSettings) (map[sdk.ChainID]*pool.Pool, error) {
var rpcConfigMap map[sdk.ChainID][]config.RpcConfig
var err error
if cfg.RpcProviderSettingsJson != nil {
rpcConfigMap, err = cfg.MapRpcProviderToRpcConfig()
if err != nil {
return nil, err
}
} else if cfg.RpcProviderSettings != nil {
// get rpc settings map
rpcConfigMap, err = cfg.RpcProviderSettings.ToMap()
if err != nil {
return nil, err
}
var testRpcConfig *config.TestnetRpcProviderSettings
if configuration.IsTestnet(cfg.P2pNetwork) {
testRpcConfig, err = config.LoadFromEnv[config.TestnetRpcProviderSettings]()
if err != nil {
log.Fatal("Error loading testnet rpc config: ", err)
}
}
// get rpc testnet settings map
var rpcTestnetMap map[sdk.ChainID][]config.RpcConfig
if testRpcConfig != nil {
rpcTestnetMap, err = cfg.TestnetRpcProviderSettings.ToMap()
if err != nil {
return nil, err
}
}
// merge rpc testnet settings to rpc settings map
if len(rpcTestnetMap) > 0 {
for chainID, rpcConfig := range rpcTestnetMap {
rpcConfigMap[chainID] = append(rpcConfigMap[chainID], rpcConfig...)
}
}
} else {
return nil, errors.New("rpc provider settings not found")
}
domains := []string{".network", ".cloud", ".com", ".io", ".build", ".team", ".dev", ".zone", ".org", ".net", ".in"}
// convert rpc settings map to rpc pool
convertFn := func(rpcConfig []config.RpcConfig) []pool.Config {
poolConfigs := make([]pool.Config, 0, len(rpcConfig))
for _, rpc := range rpcConfig {
poolConfigs = append(poolConfigs, pool.Config{
Id: rpc.Url,
Priority: rpc.Priority,
Description: utils.FindSubstringBeforeDomains(rpc.Url, domains),
RequestsPerMinute: rpc.RequestsPerMinute,
})
}
return poolConfigs
}
// create rpc pool
rpcPool := make(map[sdk.ChainID]*pool.Pool)
for chainID, rpcConfig := range rpcConfigMap {
rpcPool[chainID] = pool.NewPool(convertFn(rpcConfig))
}
return rpcPool, nil
}

View File

@ -1,10 +1,17 @@
package config
import (
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
"strings"
"github.com/joho/godotenv"
"github.com/kelseyhightower/envconfig"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
)
type BackfillingStrategy string
@ -19,10 +26,11 @@ const (
)
type BackfillerSettings struct {
LogLevel string `split_words:"true" default:"INFO"`
NumWorkers uint `split_words:"true" required:"true"`
BulkSize uint `split_words:"true" required:"true"`
P2pNetwork string `split_words:"true" required:"true"`
LogLevel string `split_words:"true" default:"INFO"`
NumWorkers uint `split_words:"true" required:"true"`
BulkSize uint `split_words:"true" required:"true"`
P2pNetwork string `split_words:"true" required:"true"`
RpcProviderPath string `split_words:"true" required:"false"`
// Strategy determines which VAAs will be affected by the backfiller.
Strategy struct {
@ -32,20 +40,42 @@ type BackfillerSettings struct {
}
MongodbSettings
RpcProviderSettings
*RpcProviderSettings `required:"false"`
*TestnetRpcProviderSettings `required:"false"`
*RpcProviderSettingsJson `required:"false"`
}
type ServiceSettings struct {
// MonitoringPort defines the TCP port for the /health and /ready endpoints.
MonitoringPort string `split_words:"true" default:"8000"`
Environment string `split_words:"true" required:"true"`
LogLevel string `split_words:"true" default:"INFO"`
PprofEnabled bool `split_words:"true" default:"false"`
MetricsEnabled bool `split_words:"true" default:"false"`
P2pNetwork string `split_words:"true" required:"true"`
MonitoringPort string `split_words:"true" default:"8000"`
Environment string `split_words:"true" required:"true"`
LogLevel string `split_words:"true" default:"INFO"`
PprofEnabled bool `split_words:"true" default:"false"`
MetricsEnabled bool `split_words:"true" default:"false"`
P2pNetwork string `split_words:"true" required:"true"`
RpcProviderPath string `split_words:"true" required:"false"`
ConsumerWorkersSize int `split_words:"true" default:"10"`
AwsSettings
MongodbSettings
RpcProviderSettings
*RpcProviderSettings `required:"false"`
*TestnetRpcProviderSettings `required:"false"`
*RpcProviderSettingsJson `required:"false"`
}
type RpcProviderSettingsJson struct {
RpcProviders []ChainRpcProviderSettings `json:"rpcProviders"`
}
type ChainRpcProviderSettings struct {
ChainId uint16 `json:"chainId"`
Chain string `json:"chain"`
RpcSettings []RpcSettings `json:"rpcs"`
}
type RpcSettings struct {
Url string `json:"url"`
RequestPerMinute uint16 `json:"requestPerMinute"`
Priority uint8 `json:"priority"`
}
type AwsSettings struct {
@ -63,75 +93,212 @@ type MongodbSettings struct {
}
type RpcProviderSettings struct {
AcalaBaseUrl string `split_words:"true" required:"true"`
AcalaRequestsPerMinute uint16 `split_words:"true" required:"true"`
AlgorandBaseUrl string `split_words:"true" required:"true"`
AlgorandRequestsPerMinute uint16 `split_words:"true" required:"true"`
AptosBaseUrl string `split_words:"true" required:"true"`
AptosRequestsPerMinute uint16 `split_words:"true" required:"true"`
ArbitrumBaseUrl string `split_words:"true" required:"true"`
ArbitrumRequestsPerMinute uint16 `split_words:"true" required:"true"`
AvalancheBaseUrl string `split_words:"true" required:"true"`
AvalancheRequestsPerMinute uint16 `split_words:"true" required:"true"`
BaseBaseUrl string `split_words:"true" required:"true"`
BaseRequestsPerMinute uint16 `split_words:"true" required:"true"`
BscBaseUrl string `split_words:"true" required:"true"`
BscRequestsPerMinute uint16 `split_words:"true" required:"true"`
CeloBaseUrl string `split_words:"true" required:"true"`
CeloRequestsPerMinute uint16 `split_words:"true" required:"true"`
EthereumBaseUrl string `split_words:"true" required:"true"`
EthereumRequestsPerMinute uint16 `split_words:"true" required:"true"`
EvmosBaseUrl string `split_words:"true" required:"true"`
EvmosRequestsPerMinute uint16 `split_words:"true" required:"true"`
FantomBaseUrl string `split_words:"true" required:"true"`
FantomRequestsPerMinute uint16 `split_words:"true" required:"true"`
InjectiveBaseUrl string `split_words:"true" required:"true"`
InjectiveRequestsPerMinute uint16 `split_words:"true" required:"true"`
KaruraBaseUrl string `split_words:"true" required:"true"`
KaruraRequestsPerMinute uint16 `split_words:"true" required:"true"`
KlaytnBaseUrl string `split_words:"true" required:"true"`
KlaytnRequestsPerMinute uint16 `split_words:"true" required:"true"`
KujiraBaseUrl string `split_words:"true" required:"true"`
KujiraRequestsPerMinute uint16 `split_words:"true" required:"true"`
MoonbeamBaseUrl string `split_words:"true" required:"true"`
MoonbeamRequestsPerMinute uint16 `split_words:"true" required:"true"`
OasisBaseUrl string `split_words:"true" required:"true"`
OasisRequestsPerMinute uint16 `split_words:"true" required:"true"`
OptimismBaseUrl string `split_words:"true" required:"true"`
OptimismRequestsPerMinute uint16 `split_words:"true" required:"true"`
OsmosisBaseUrl string `split_words:"true" required:"true"`
OsmosisRequestsPerMinute uint16 `split_words:"true" required:"true"`
PolygonBaseUrl string `split_words:"true" required:"true"`
PolygonRequestsPerMinute uint16 `split_words:"true" required:"true"`
SeiBaseUrl string `split_words:"true" required:"true"`
SeiRequestsPerMinute uint16 `split_words:"true" required:"true"`
SolanaBaseUrl string `split_words:"true" required:"true"`
SolanaRequestsPerMinute uint16 `split_words:"true" required:"true"`
SuiBaseUrl string `split_words:"true" required:"true"`
SuiRequestsPerMinute uint16 `split_words:"true" required:"true"`
TerraBaseUrl string `split_words:"true" required:"true"`
TerraRequestsPerMinute uint16 `split_words:"true" required:"true"`
Terra2BaseUrl string `split_words:"true" required:"true"`
Terra2RequestsPerMinute uint16 `split_words:"true" required:"true"`
XplaBaseUrl string `split_words:"true" required:"true"`
XplaRequestsPerMinute uint16 `split_words:"true" required:"true"`
WormchainBaseUrl string `split_words:"true" required:"true"`
WormchainRequestsPerMinute uint16 `split_words:"true" required:"true"`
AcalaBaseUrl string `split_words:"true" required:"false"`
AcalaRequestsPerMinute uint16 `split_words:"true" required:"false"`
AcalaFallbackUrls string `split_words:"true" required:"false"`
AcalaFallbackRequestsPerMinute string `split_words:"true" required:"false"`
AlgorandBaseUrl string `split_words:"true" required:"false"`
AlgorandRequestsPerMinute uint16 `split_words:"true" required:"false"`
AlgorandFallbackUrls string `split_words:"true" required:"false"`
AlgorandFallbackRequestsPerMinute string `split_words:"true" required:"false"`
AptosBaseUrl string `split_words:"true" required:"false"`
AptosRequestsPerMinute uint16 `split_words:"true" required:"false"`
AptosFallbackUrls string `split_words:"true" required:"false"`
AptosFallbackRequestsPerMinute string `split_words:"true" required:"false"`
ArbitrumBaseUrl string `split_words:"true" required:"false"`
ArbitrumRequestsPerMinute uint16 `split_words:"true" required:"false"`
ArbitrumFallbackUrls string `split_words:"true" required:"false"`
ArbitrumFallbackRequestsPerMinute string `split_words:"true" required:"false"`
AvalancheBaseUrl string `split_words:"true" required:"false"`
AvalancheRequestsPerMinute uint16 `split_words:"true" required:"false"`
AvalancheFallbackUrls string `split_words:"true" required:"false"`
AvalancheFallbackRequestsPerMinute string `split_words:"true" required:"false"`
BaseBaseUrl string `split_words:"true" required:"false"`
BaseRequestsPerMinute uint16 `split_words:"true" required:"false"`
BaseFallbackUrls string `split_words:"true" required:"false"`
BaseFallbackRequestsPerMinute string `split_words:"true" required:"false"`
BscBaseUrl string `split_words:"true" required:"false"`
BscRequestsPerMinute uint16 `split_words:"true" required:"false"`
BscFallbackUrls string `split_words:"true" required:"false"`
BscFallbackRequestsPerMinute string `split_words:"true" required:"false"`
CeloBaseUrl string `split_words:"true" required:"false"`
CeloRequestsPerMinute uint16 `split_words:"true" required:"false"`
CeloFallbackUrls string `split_words:"true" required:"false"`
CeloFallbackRequestsPerMinute string `split_words:"true" required:"false"`
EthereumBaseUrl string `split_words:"true" required:"false"`
EthereumRequestsPerMinute uint16 `split_words:"true" required:"false"`
EthereumFallbackUrls string `split_words:"true" required:"false"`
EthereumFallbackRequestsPerMinute string `split_words:"true" required:"false"`
EvmosBaseUrl string `split_words:"true" required:"false"`
EvmosRequestsPerMinute uint16 `split_words:"true" required:"false"`
EvmosFallbackUrls string `split_words:"true" required:"false"`
EvmosFallbackRequestsPerMinute string `split_words:"true" required:"false"`
FantomBaseUrl string `split_words:"true" required:"false"`
FantomRequestsPerMinute uint16 `split_words:"true" required:"false"`
FantomFallbackUrls string `split_words:"true" required:"false"`
FantomFallbackRequestsPerMinute string `split_words:"true" required:"false"`
InjectiveBaseUrl string `split_words:"true" required:"false"`
InjectiveRequestsPerMinute uint16 `split_words:"true" required:"false"`
InjectiveFallbackUrls string `split_words:"true" required:"false"`
InjectiveFallbackRequestsPerMinute string `split_words:"true" required:"false"`
KaruraBaseUrl string `split_words:"true" required:"false"`
KaruraRequestsPerMinute uint16 `split_words:"true" required:"false"`
KaruraFallbackUrls string `split_words:"true" required:"false"`
KaruraFallbackRequestsPerMinute string `split_words:"true" required:"false"`
KlaytnBaseUrl string `split_words:"true" required:"false"`
KlaytnRequestsPerMinute uint16 `split_words:"true" required:"false"`
KlaytnFallbackUrls string `split_words:"true" required:"false"`
KlaytnFallbackRequestsPerMinute string `split_words:"true" required:"false"`
KujiraBaseUrl string `split_words:"true" required:"false"`
KujiraRequestsPerMinute uint16 `split_words:"true" required:"false"`
KujiraFallbackUrls string `split_words:"true" required:"false"`
KujiraFallbackRequestsPerMinute string `split_words:"true" required:"false"`
MoonbeamBaseUrl string `split_words:"true" required:"false"`
MoonbeamRequestsPerMinute uint16 `split_words:"true" required:"false"`
MoonbeamFallbackUrls string `split_words:"true" required:"false"`
MoonbeamFallbackRequestsPerMinute string `split_words:"true" required:"false"`
OasisBaseUrl string `split_words:"true" required:"false"`
OasisRequestsPerMinute uint16 `split_words:"true" required:"false"`
OasisFallbackUrls string `split_words:"true" required:"false"`
OasisFallbackRequestsPerMinute string `split_words:"true" required:"false"`
OptimismBaseUrl string `split_words:"true" required:"false"`
OptimismRequestsPerMinute uint16 `split_words:"true" required:"false"`
OptimismFallbackUrls string `split_words:"true" required:"false"`
OptimismFallbackRequestsPerMinute string `split_words:"true" required:"false"`
OsmosisBaseUrl string `split_words:"true" required:"false"`
OsmosisRequestsPerMinute uint16 `split_words:"true" required:"false"`
OsmosisFallbackUrls string `split_words:"true" required:"false"`
OsmosisFallbackRequestsPerMinute string `split_words:"true" required:"false"`
PolygonBaseUrl string `split_words:"true" required:"false"`
PolygonRequestsPerMinute uint16 `split_words:"true" required:"false"`
PolygonFallbackUrls string `split_words:"true" required:"false"`
PolygonFallbackRequestsPerMinute string `split_words:"true" required:"false"`
SeiBaseUrl string `split_words:"true" required:"false"`
SeiRequestsPerMinute uint16 `split_words:"true" required:"false"`
SeiFallbackUrls string `split_words:"true" required:"false"`
SeiFallbackRequestsPerMinute string `split_words:"true" required:"false"`
SolanaBaseUrl string `split_words:"true" required:"false"`
SolanaRequestsPerMinute uint16 `split_words:"true" required:"false"`
SolanaFallbackUrls string `split_words:"true" required:"false"`
SolanaFallbackRequestsPerMinute string `split_words:"true" required:"false"`
SuiBaseUrl string `split_words:"true" required:"false"`
SuiRequestsPerMinute uint16 `split_words:"true" required:"false"`
SuiFallbackUrls string `split_words:"true" required:"false"`
SuiFallbackRequestsPerMinute string `split_words:"true" required:"false"`
TerraBaseUrl string `split_words:"true" required:"false"`
TerraRequestsPerMinute uint16 `split_words:"true" required:"false"`
TerraFallbackUrls string `split_words:"true" required:"false"`
TerraFallbackRequestsPerMinute string `split_words:"true" required:"false"`
Terra2BaseUrl string `split_words:"true" required:"false"`
Terra2RequestsPerMinute uint16 `split_words:"true" required:"false"`
Terra2FallbackUrls string `split_words:"true" required:"false"`
Terra2FallbackRequestsPerMinute string `split_words:"true" required:"false"`
XplaBaseUrl string `split_words:"true" required:"false"`
XplaRequestsPerMinute uint16 `split_words:"true" required:"false"`
XplaFallbackUrls string `split_words:"true" required:"false"`
XplaFallbackRequestsPerMinute string `split_words:"true" required:"false"`
WormchainBaseUrl string `split_words:"true" required:"false"`
WormchainRequestsPerMinute uint16 `split_words:"true" required:"false"`
WormchainFallbackUrls string `split_words:"true" required:"false"`
WormchainFallbackRequestsPerMinute string `split_words:"true" required:"false"`
}
type TestnetRpcProviderSettings struct {
ArbitrumSepoliaBaseUrl string `split_words:"true" required:"true"`
ArbitrumSepoliaRequestsPerMinute uint16 `split_words:"true" required:"true"`
BaseSepoliaBaseUrl string `split_words:"true" required:"true"`
BaseSepoliaRequestsPerMinute uint16 `split_words:"true" required:"true"`
EthereumSepoliaBaseUrl string `split_words:"true" required:"true"`
EthereumSepoliaRequestsPerMinute uint16 `split_words:"true" required:"true"`
OptimismSepoliaBaseUrl string `split_words:"true" required:"true"`
OptimismSepoliaRequestsPerMinute uint16 `split_words:"true" required:"true"`
ArbitrumSepoliaBaseUrl string `split_words:"true" required:"false"`
ArbitrumSepoliaRequestsPerMinute uint16 `split_words:"true" required:"false"`
ArbitrumSepoliaFallbackUrls string `split_words:"true" required:"false"`
ArbitrumSepoliaFallbackRequestsPerMinute string `split_words:"true" required:"false"`
BaseSepoliaBaseUrl string `split_words:"true" required:"false"`
BaseSepoliaRequestsPerMinute uint16 `split_words:"true" required:"false"`
BaseSepoliaFallbackUrls string `split_words:"true" required:"false"`
BaseSepoliaFallbackRequestsPerMinute string `split_words:"true" required:"false"`
EthereumSepoliaBaseUrl string `split_words:"true" required:"false"`
EthereumSepoliaRequestsPerMinute uint16 `split_words:"true" required:"false"`
EthereumSepoliaFallbackUrls string `split_words:"true" required:"false"`
EthereumSepoliaFallbackRequestsPerMinute string `split_words:"true" required:"false"`
OptimismSepoliaBaseUrl string `split_words:"true" required:"false"`
OptimismSepoliaRequestsPerMinute uint16 `split_words:"true" required:"false"`
OptimismSepoliaFallbackUrls string `split_words:"true" required:"false"`
OptimismSepoliaFallbackRequestsPerMinute string `split_words:"true" required:"false"`
}
func NewBackfillerSettings() (*BackfillerSettings, error) {
_ = godotenv.Load()
var settings BackfillerSettings
err := envconfig.Process("", &settings)
if err != nil {
return nil, fmt.Errorf("failed to read config from environment: %w", err)
}
if settings.RpcProviderPath != "" {
rpcJsonFile, err := os.ReadFile(settings.RpcProviderPath)
if err != nil {
return nil, fmt.Errorf("failed to read rpc provider settings from file: %w", err)
}
var rpcProviderSettingsJson RpcProviderSettingsJson
err = json.Unmarshal(rpcJsonFile, &rpcProviderSettingsJson)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal rpc provider settings from file: %w", err)
}
settings.RpcProviderSettingsJson = &rpcProviderSettingsJson
} else {
rpcProviderSettings, err := LoadFromEnv[RpcProviderSettings]()
if err != nil {
return nil, err
}
settings.RpcProviderSettings = rpcProviderSettings
}
return &settings, nil
}
// MapRpcProviderToRpcConfig converts the RpcProviderSettings to a map of RpcConfig
func (s *BackfillerSettings) MapRpcProviderToRpcConfig() (map[sdk.ChainID][]RpcConfig, error) {
if s.RpcProviderSettingsJson != nil {
return s.RpcProviderSettingsJson.ToMap()
}
return s.RpcProviderSettings.ToMap()
}
func New() (*ServiceSettings, error) {
_ = godotenv.Load()
var settings ServiceSettings
err := envconfig.Process("", &settings)
if err != nil {
return nil, fmt.Errorf("failed to read config from environment: %w", err)
}
if settings.RpcProviderPath != "" {
rpcJsonFile, err := os.ReadFile(settings.RpcProviderPath)
if err != nil {
return nil, fmt.Errorf("failed to read rpc provider settings from file: %w", err)
}
var rpcProviderSettingsJson RpcProviderSettingsJson
err = json.Unmarshal(rpcJsonFile, &rpcProviderSettingsJson)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal rpc provider settings from file: %w", err)
}
settings.RpcProviderSettingsJson = &rpcProviderSettingsJson
settings.RpcProviderSettings = nil
} else {
rpcProviderSettings, err := LoadFromEnv[RpcProviderSettings]()
if err != nil {
return nil, err
}
settings.RpcProviderSettings = rpcProviderSettings
settings.RpcProviderSettingsJson = nil
}
return &settings, nil
}
func LoadFromEnv[T any]() (*T, error) {
_ = godotenv.Load()
var settings T
@ -143,3 +310,432 @@ func LoadFromEnv[T any]() (*T, error) {
return &settings, nil
}
// RpcConfig defines the configuration for a single RPC provider
type RpcConfig struct {
Url string
Priority uint8
RequestsPerMinute uint16
}
// MapRpcProviderToRpcConfig converts the RpcProviderSettings to a map of RpcConfig
func (s *ServiceSettings) MapRpcProviderToRpcConfig() (map[sdk.ChainID][]RpcConfig, error) {
if s.RpcProviderSettingsJson != nil {
return s.RpcProviderSettingsJson.ToMap()
}
return s.RpcProviderSettings.ToMap()
}
// ToMap converts the RpcProviderSettingsJson to a map of RpcConfig
func (r RpcProviderSettingsJson) ToMap() (map[sdk.ChainID][]RpcConfig, error) {
rpcs := make(map[sdk.ChainID][]RpcConfig)
for _, rpcProvider := range r.RpcProviders {
chainID := sdk.ChainID(rpcProvider.ChainId)
var rpcConfigs []RpcConfig
for _, rpcSetting := range rpcProvider.RpcSettings {
rpcConfigs = append(rpcConfigs, RpcConfig{
Url: rpcSetting.Url,
Priority: rpcSetting.Priority,
RequestsPerMinute: rpcSetting.RequestPerMinute,
})
}
rpcs[chainID] = rpcConfigs
}
return rpcs, nil
}
// ToMap converts the RpcProviderSettings to a map of RpcConfig
func (r RpcProviderSettings) ToMap() (map[sdk.ChainID][]RpcConfig, error) {
rpcs := make(map[sdk.ChainID][]RpcConfig)
// add acala rpcs
acalaRpcConfigs, err := addRpcConfig(
r.AcalaBaseUrl,
r.AcalaRequestsPerMinute,
r.AcalaFallbackUrls,
r.AcalaFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDAcala] = acalaRpcConfigs
// add algorand rpcs
algorandRpcConfigs, err := addRpcConfig(
r.AlgorandBaseUrl,
r.AlgorandRequestsPerMinute,
r.AlgorandFallbackUrls,
r.AlgorandFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDAlgorand] = algorandRpcConfigs
// add aptos rpcs
aptosRpcConfigs, err := addRpcConfig(
r.AptosBaseUrl,
r.AptosRequestsPerMinute,
r.AptosFallbackUrls,
r.AptosFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDAptos] = aptosRpcConfigs
// add arbitrum rpcs
arbitrumRpcConfigs, err := addRpcConfig(
r.ArbitrumBaseUrl,
r.ArbitrumRequestsPerMinute,
r.ArbitrumFallbackUrls,
r.ArbitrumFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDArbitrum] = arbitrumRpcConfigs
// add avalanche rpcs
avalancheRpcConfigs, err := addRpcConfig(
r.AvalancheBaseUrl,
r.AvalancheRequestsPerMinute,
r.AvalancheFallbackUrls,
r.AvalancheFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDAvalanche] = avalancheRpcConfigs
// add base rpcs
baseRpcConfigs, err := addRpcConfig(
r.BaseBaseUrl,
r.BaseRequestsPerMinute,
r.BaseFallbackUrls,
r.BaseFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDBase] = baseRpcConfigs
// add bsc rpcs
bscRpcConfigs, err := addRpcConfig(
r.BscBaseUrl,
r.BscRequestsPerMinute,
r.BscFallbackUrls,
r.BscFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDBSC] = bscRpcConfigs
// add celo rpcs
celoRpcConfigs, err := addRpcConfig(
r.CeloBaseUrl,
r.CeloRequestsPerMinute,
r.CeloFallbackUrls,
r.CeloFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDCelo] = celoRpcConfigs
// add ethereum rpcs
ethereumRpcConfigs, err := addRpcConfig(
r.EthereumBaseUrl,
r.EthereumRequestsPerMinute,
r.EthereumFallbackUrls,
r.EthereumFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDEthereum] = ethereumRpcConfigs
// add evmos rpcs
evmosRpcConfigs, err := addRpcConfig(
r.EvmosBaseUrl,
r.EvmosRequestsPerMinute,
r.EvmosFallbackUrls,
r.EvmosFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDEvmos] = evmosRpcConfigs
// add fantom rpcs
fantomRpcConfigs, err := addRpcConfig(
r.FantomBaseUrl,
r.FantomRequestsPerMinute,
r.FantomFallbackUrls,
r.FantomFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDFantom] = fantomRpcConfigs
// add injective rpcs
injectiveRpcConfigs, err := addRpcConfig(
r.InjectiveBaseUrl,
r.InjectiveRequestsPerMinute,
r.InjectiveFallbackUrls,
r.InjectiveFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDInjective] = injectiveRpcConfigs
// add karura rpcs
karuraRpcConfigs, err := addRpcConfig(
r.KaruraBaseUrl,
r.KaruraRequestsPerMinute,
r.KaruraFallbackUrls,
r.KaruraFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDKarura] = karuraRpcConfigs
// add klaytn rpcs
klaytnRpcConfigs, err := addRpcConfig(
r.KlaytnBaseUrl,
r.KlaytnRequestsPerMinute,
r.KlaytnFallbackUrls,
r.KlaytnFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDKlaytn] = klaytnRpcConfigs
// add kujira rpcs
kujiraRpcConfigs, err := addRpcConfig(
r.KujiraBaseUrl,
r.KujiraRequestsPerMinute,
r.KujiraFallbackUrls,
r.KujiraFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDKujira] = kujiraRpcConfigs
// add moonbeam rpcs
moonbeamRpcConfigs, err := addRpcConfig(
r.MoonbeamBaseUrl,
r.MoonbeamRequestsPerMinute,
r.MoonbeamFallbackUrls,
r.MoonbeamFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDMoonbeam] = moonbeamRpcConfigs
// add oasis rpcs
oasisRpcConfigs, err := addRpcConfig(
r.OasisBaseUrl,
r.OasisRequestsPerMinute,
r.OasisFallbackUrls,
r.OasisFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDOasis] = oasisRpcConfigs
// add optimism rpcs
optimismRpcConfigs, err := addRpcConfig(
r.OptimismBaseUrl,
r.OptimismRequestsPerMinute,
r.OptimismFallbackUrls,
r.OptimismFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDOptimism] = optimismRpcConfigs
// add osmosis rpcs
osmosisRpcConfigs, err := addRpcConfig(
r.OsmosisBaseUrl,
r.OsmosisRequestsPerMinute,
r.OsmosisFallbackUrls,
r.OsmosisFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDOsmosis] = osmosisRpcConfigs
// add polygon rpcs
polygonRpcConfigs, err := addRpcConfig(
r.PolygonBaseUrl,
r.PolygonRequestsPerMinute,
r.PolygonFallbackUrls,
r.PolygonFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDPolygon] = polygonRpcConfigs
// add sei rpcs
seiRpcConfigs, err := addRpcConfig(
r.SeiBaseUrl,
r.SeiRequestsPerMinute,
r.SeiFallbackUrls,
r.SeiFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDSei] = seiRpcConfigs
// add solana rpcs
solanaRpcConfigs, err := addRpcConfig(
r.SolanaBaseUrl,
r.SolanaRequestsPerMinute,
r.SolanaFallbackUrls,
r.SolanaFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDSolana] = solanaRpcConfigs
// add sui rpcs
suiRpcConfigs, err := addRpcConfig(
r.SuiBaseUrl,
r.SuiRequestsPerMinute,
r.SuiFallbackUrls,
r.SuiFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDSui] = suiRpcConfigs
// add terra rpcs
terraRpcConfigs, err := addRpcConfig(
r.TerraBaseUrl,
r.TerraRequestsPerMinute,
r.TerraFallbackUrls,
r.TerraFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDTerra] = terraRpcConfigs
// add terra2 rpcs
terra2RpcConfigs, err := addRpcConfig(
r.Terra2BaseUrl,
r.Terra2RequestsPerMinute,
r.Terra2FallbackUrls,
r.Terra2FallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDTerra2] = terra2RpcConfigs
// add xpla rpcs
xplaRpcConfigs, err := addRpcConfig(
r.XplaBaseUrl,
r.XplaRequestsPerMinute,
r.XplaFallbackUrls,
r.XplaFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDXpla] = xplaRpcConfigs
// add wormchain rpcs
wormchainRpcConfigs, err := addRpcConfig(
r.WormchainBaseUrl,
r.WormchainRequestsPerMinute,
r.WormchainFallbackUrls,
r.WormchainFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDWormchain] = wormchainRpcConfigs
return rpcs, nil
}
// ToMap converts the TestnetRpcProviderSettings to a map of RpcConfig
func (r TestnetRpcProviderSettings) ToMap() (map[sdk.ChainID][]RpcConfig, error) {
rpcs := make(map[sdk.ChainID][]RpcConfig)
// add arbitrum sepolia rpcs
arbitrumSepoliaRpcConfigs, err := addRpcConfig(
r.ArbitrumSepoliaBaseUrl,
r.ArbitrumSepoliaRequestsPerMinute,
r.ArbitrumSepoliaFallbackUrls,
r.ArbitrumSepoliaFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDArbitrumSepolia] = arbitrumSepoliaRpcConfigs
// add base sepolia rpcs
baseSepoliaRpcConfigs, err := addRpcConfig(
r.BaseSepoliaBaseUrl,
r.BaseSepoliaRequestsPerMinute,
r.BaseSepoliaFallbackUrls,
r.BaseSepoliaFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDBaseSepolia] = baseSepoliaRpcConfigs
// add ethereum sepolia rpcs
ethereumSepoliaRpcConfigs, err := addRpcConfig(
r.EthereumSepoliaBaseUrl,
r.EthereumSepoliaRequestsPerMinute,
r.EthereumSepoliaFallbackUrls,
r.EthereumSepoliaFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDSepolia] = ethereumSepoliaRpcConfigs
// add optimism sepolia rpcs
optimismSepoliaRpcConfigs, err := addRpcConfig(
r.OptimismSepoliaBaseUrl,
r.OptimismSepoliaRequestsPerMinute,
r.OptimismSepoliaFallbackUrls,
r.OptimismSepoliaFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDOptimismSepolia] = optimismSepoliaRpcConfigs
return rpcs, nil
}
// addRpcConfig convert chain rpc settings to RpcConfig
func addRpcConfig(baseURl string, requestPerMinute uint16, fallbackUrls string, fallbackRequestPerMinute string) ([]RpcConfig, error) {
// check if the primary rpc url and rate limit are empty
if baseURl == "" {
return []RpcConfig{}, errors.New("primary rpc url is empty")
}
if requestPerMinute == 0 {
return []RpcConfig{}, errors.New("primary rpc rate limit is 0")
}
var rpcConfigs []RpcConfig
// add primary rpc
rpcConfigs = append(rpcConfigs, RpcConfig{
Url: baseURl,
Priority: 1,
RequestsPerMinute: requestPerMinute,
})
// add fallback rpc
if fallbackUrls == "" {
return rpcConfigs, nil
}
sfallbackUrls := strings.Split(fallbackUrls, ",")
sFallbackRequestPerMinute := strings.Split(fallbackRequestPerMinute, ",")
// check if the number of fallback urls and fallback rate limits are matched
if len(sfallbackUrls) != len(sFallbackRequestPerMinute) {
return rpcConfigs, errors.New("fallback urls and fallback rate limits are not matched")
}
// add fallback rpcs
for i, v := range sFallbackRequestPerMinute {
uRateLimiter, err := strconv.ParseUint(v, 10, 64)
if err != nil {
return rpcConfigs, err
}
rpcConfigs = append(rpcConfigs, RpcConfig{
Url: sfallbackUrls[i],
Priority: 2,
RequestsPerMinute: uint16(uRateLimiter),
})
}
return rpcConfigs, nil
}

View File

@ -5,42 +5,46 @@ import (
"errors"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/queue"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
// Consumer consumer struct definition.
type Consumer struct {
consumeFunc queue.ConsumeFunc
rpcProviderSettings *config.RpcProviderSettings
logger *zap.Logger
repository *Repository
metrics metrics.Metrics
p2pNetwork string
consumeFunc queue.ConsumeFunc
rpcpool map[vaa.ChainID]*pool.Pool
logger *zap.Logger
repository *Repository
metrics metrics.Metrics
p2pNetwork string
workersSize int
}
// New creates a new vaa consumer.
func New(
consumeFunc queue.ConsumeFunc,
rpcProviderSettings *config.RpcProviderSettings,
rpcPool map[vaa.ChainID]*pool.Pool,
ctx context.Context,
logger *zap.Logger,
repository *Repository,
metrics metrics.Metrics,
p2pNetwork string,
workersSize int,
) *Consumer {
c := Consumer{
consumeFunc: consumeFunc,
rpcProviderSettings: rpcProviderSettings,
logger: logger,
repository: repository,
metrics: metrics,
p2pNetwork: p2pNetwork,
consumeFunc: consumeFunc,
rpcpool: rpcPool,
logger: logger,
repository: repository,
metrics: metrics,
p2pNetwork: p2pNetwork,
workersSize: workersSize,
}
return &c
@ -48,22 +52,28 @@ func New(
// Start consumes messages from VAA queue, parse and store those messages in a repository.
func (c *Consumer) Start(ctx context.Context) {
go c.producerLoop(ctx)
ch := c.consumeFunc(ctx)
for i := 0; i < c.workersSize; i++ {
go c.producerLoop(ctx, ch)
}
}
func (c *Consumer) producerLoop(ctx context.Context) {
func (c *Consumer) producerLoop(ctx context.Context, ch <-chan queue.ConsumerMessage) {
ch := c.consumeFunc(ctx)
for msg := range ch {
c.logger.Debug("Received message", zap.String("vaaId", msg.Data().ID), zap.String("trackId", msg.Data().TrackID))
switch msg.Data().Type {
case queue.SourceChainEvent:
c.processSourceTx(ctx, msg)
case queue.TargetChainEvent:
c.processTargetTx(ctx, msg)
default:
c.logger.Error("Unknown message type", zap.String("trackId", msg.Data().TrackID), zap.Any("type", msg.Data().Type))
for {
select {
case <-ctx.Done():
return
case msg := <-ch:
c.logger.Debug("Received message", zap.String("vaaId", msg.Data().ID), zap.String("trackId", msg.Data().TrackID))
switch msg.Data().Type {
case queue.SourceChainEvent:
c.processSourceTx(ctx, msg)
case queue.TargetChainEvent:
c.processTargetTx(ctx, msg)
default:
c.logger.Error("Unknown message type", zap.String("trackId", msg.Data().TrackID), zap.Any("type", msg.Data().Type))
}
}
}
}
@ -101,7 +111,7 @@ func (c *Consumer) processSourceTx(ctx context.Context, msg queue.ConsumerMessag
Metrics: c.metrics,
Overwrite: false, // avoid processing the same transaction twice
}
_, err := ProcessSourceTx(ctx, c.logger, c.rpcProviderSettings, c.repository, &p, c.p2pNetwork)
_, err := ProcessSourceTx(ctx, c.logger, c.rpcpool, c.repository, &p, c.p2pNetwork)
// add vaa processing duration metrics
c.metrics.AddVaaProcessedDuration(uint16(event.ChainID), time.Since(start).Seconds())

View File

@ -66,6 +66,7 @@ type UpsertOriginTxParams struct {
TxDetail *chains.TxDetail
TxStatus domain.SourceTxStatus
Timestamp *time.Time
Processed bool
}
func createChangesDoc(source, _type string, timestamp *time.Time) bson.D {
@ -89,19 +90,21 @@ func (r *Repository) UpsertOriginTx(ctx context.Context, params *UpsertOriginTxP
{Key: "chainId", Value: params.ChainId},
{Key: "status", Value: params.TxStatus},
{Key: "updatedAt", Value: now},
{Key: "processed", Value: params.Processed},
}
if params.TxDetail != nil {
fields = append(fields, primitive.E{Key: "nativeTxHash", Value: params.TxDetail.NativeTxHash})
fields = append(fields, primitive.E{Key: "from", Value: params.TxDetail.From})
if params.Timestamp != nil {
fields = append(fields, primitive.E{Key: "timestamp", Value: params.Timestamp})
}
if params.TxDetail.Attribute != nil {
fields = append(fields, primitive.E{Key: "attribute", Value: params.TxDetail.Attribute})
}
}
if params.Timestamp != nil {
fields = append(fields, primitive.E{Key: "timestamp", Value: params.Timestamp})
}
update := bson.D{
{
Key: "$set",
@ -136,7 +139,14 @@ func (r *Repository) AlreadyProcessed(ctx context.Context, vaaId string) (bool,
FindOne(ctx, bson.D{
{Key: "_id", Value: vaaId},
{Key: "originTx", Value: bson.D{{Key: "$exists", Value: true}}},
{Key: "$or", Value: bson.A{
bson.D{{Key: "originTx.processed", Value: true}},
bson.D{{Key: "originTx.processed", Value: bson.D{{Key: "$exists", Value: false}}}},
}},
})
// The originTx.processed will be true if the vaa was processed successfully.
// If exists and error getting the transactions from the rpcs, a partial originTx will save in the db and
// the originTx.processed will be false.
var tx GlobalTransaction
err := result.Decode(&tx)

View File

@ -6,9 +6,10 @@ import (
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
@ -37,7 +38,7 @@ type ProcessSourceTxParams struct {
func ProcessSourceTx(
ctx context.Context,
logger *zap.Logger,
rpcServiceProviderSettings *config.RpcProviderSettings,
rpcPool map[vaa.ChainID]*pool.Pool,
repository *Repository,
params *ProcessSourceTxParams,
p2pNetwork string,
@ -53,7 +54,7 @@ func ProcessSourceTx(
processed, err := repository.AlreadyProcessed(ctx, params.VaaId)
if err != nil {
return nil, err
} else if err == nil && processed {
} else if processed {
return nil, ErrAlreadyProcessed
}
}
@ -100,8 +101,12 @@ func ProcessSourceTx(
}
// Get transaction details from the emitter blockchain
txDetail, err = chains.FetchTx(ctx, rpcServiceProviderSettings, params.ChainId, params.TxHash, params.Timestamp, p2pNetwork)
txDetail, err = chains.FetchTx(ctx, rpcPool, params.ChainId, params.TxHash, params.Timestamp, p2pNetwork, params.Metrics, logger)
if err != nil {
errHandleFetchTx := handleFetchTxError(ctx, logger, repository, params, err)
if errHandleFetchTx == nil {
params.Metrics.IncStoreUnprocessedOriginTx(uint16(params.ChainId))
}
return nil, err
}
@ -113,6 +118,7 @@ func ProcessSourceTx(
Timestamp: params.Timestamp,
TxDetail: txDetail,
TxStatus: domain.SourceTxStatusConfirmed,
Processed: true,
}
err = repository.UpsertOriginTx(ctx, &p)
@ -121,3 +127,46 @@ func ProcessSourceTx(
}
return txDetail, nil
}
func handleFetchTxError(
ctx context.Context,
logger *zap.Logger,
repository *Repository,
params *ProcessSourceTxParams,
err error,
) error {
// If the chain is not supported, we don't want to store the unprocessed originTx in the database.
if errors.Is(chains.ErrChainNotSupported, err) {
return nil
}
// if the transactions is solana or aptos, we don't want to store the txHash in the
// unprocessed originTx in the database.
var vaaTxDetail *chains.TxDetail
isSolanaOrAptos := params.ChainId == vaa.ChainIDAptos || params.ChainId == vaa.ChainIDSolana
if !isSolanaOrAptos {
txHash := chains.FormatTxHashByChain(params.ChainId, params.TxHash)
vaaTxDetail = &chains.TxDetail{
NativeTxHash: txHash,
}
}
e := UpsertOriginTxParams{
VaaId: params.VaaId,
TrackID: params.TrackID,
ChainId: params.ChainId,
Timestamp: params.Timestamp,
TxDetail: vaaTxDetail,
TxStatus: domain.SourceTxStatusConfirmed,
Processed: false,
}
errUpsert := repository.UpsertOriginTx(ctx, &e)
if errUpsert != nil {
logger.Error("failed to upsert originTx",
zap.Error(errUpsert),
zap.String("vaaId", params.VaaId))
}
return nil
}

View File

@ -75,7 +75,7 @@ func checkTxShouldBeUpdated(ctx context.Context, tx *TargetTxUpdate, repository
return true, nil
}
// if the transaction was already confirmed, then no update it.
if oldTx.Destination.Status == domain.DstTxStatusConfirmed {
if oldTx != nil && oldTx.Destination.Status == domain.DstTxStatusConfirmed {
return false, errTxFailedCannotBeUpdated
}
return true, nil

View File

@ -111,6 +111,7 @@ require (
golang.org/x/sync v0.2.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect

View File

@ -859,6 +859,8 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@ -4,7 +4,7 @@ import (
"strconv"
"github.com/gofiber/fiber/v2"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/consumer"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
@ -13,23 +13,23 @@ import (
// Controller definition.
type Controller struct {
logger *zap.Logger
vaaRepository *Repository
repository *consumer.Repository
rpcProviderSettings *config.RpcProviderSettings
metrics metrics.Metrics
p2pNetwork string
logger *zap.Logger
rpcPool map[sdk.ChainID]*pool.Pool
vaaRepository *Repository
repository *consumer.Repository
metrics metrics.Metrics
p2pNetwork string
}
// NewController creates a Controller instance.
func NewController(vaaRepository *Repository, repository *consumer.Repository, rpcProviderSettings *config.RpcProviderSettings, p2pNetwork string, logger *zap.Logger) *Controller {
func NewController(rpcPool map[sdk.ChainID]*pool.Pool, vaaRepository *Repository, repository *consumer.Repository, p2pNetwork string, logger *zap.Logger) *Controller {
return &Controller{
metrics: metrics.NewDummyMetrics(),
vaaRepository: vaaRepository,
repository: repository,
rpcProviderSettings: rpcProviderSettings,
p2pNetwork: p2pNetwork,
logger: logger}
metrics: metrics.NewDummyMetrics(),
rpcPool: rpcPool,
vaaRepository: vaaRepository,
repository: repository,
p2pNetwork: p2pNetwork,
logger: logger}
}
func (c *Controller) Process(ctx *fiber.Ctx) error {
@ -65,7 +65,7 @@ func (c *Controller) Process(ctx *fiber.Ctx) error {
Overwrite: true,
}
result, err := consumer.ProcessSourceTx(ctx.Context(), c.logger, c.rpcProviderSettings, c.repository, p, c.p2pNetwork)
result, err := consumer.ProcessSourceTx(ctx.Context(), c.logger, c.rpcPool, c.repository, p, c.p2pNetwork)
if err != nil {
return err
}

View File

@ -25,3 +25,18 @@ func (d *DummyMetrics) IncVaaWithTxHashFixed(chainID uint16) {}
// AddVaaProcessedDuration is a dummy implementation of AddVaaProcessedDuration.
func (d *DummyMetrics) AddVaaProcessedDuration(chainID uint16, duration float64) {}
// IncCallRpcSuccess is a dummy implementation of IncCallRpcSuccess.
func (d *DummyMetrics) IncCallRpcSuccess(chainID uint16, rpc string) {}
// IncCallRpcError is a dummy implementation of IncCallRpcError.
func (d *DummyMetrics) IncCallRpcError(chainID uint16, rpc string) {}
// IncStoreUnprocessedOriginTx is a dummy implementation of IncStoreUnprocessedOriginTx.
func (d *DummyMetrics) IncStoreUnprocessedOriginTx(chainID uint16) {}
// IncVaaProcessed is a dummy implementation of IncVaaProcessed.
func (d *DummyMetrics) IncVaaProcessed(chainID uint16, retry uint8) {}
// IncVaaFailed is a dummy implementation of IncVaaFailed.
func (d *DummyMetrics) IncVaaFailed(chainID uint16, retry uint8) {}

View File

@ -9,4 +9,9 @@ type Metrics interface {
IncVaaWithoutTxHash(chainID uint16)
IncVaaWithTxHashFixed(chainID uint16)
AddVaaProcessedDuration(chainID uint16, duration float64)
IncCallRpcSuccess(chainID uint16, rpc string)
IncCallRpcError(chainID uint16, rpc string)
IncStoreUnprocessedOriginTx(chainID uint16)
IncVaaProcessed(chainID uint16, retry uint8)
IncVaaFailed(chainID uint16, retry uint8)
}

View File

@ -1,6 +1,8 @@
package metrics
import (
"strconv"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
@ -8,8 +10,11 @@ import (
// PrometheusMetrics is a Prometheus implementation of Metric interface.
type PrometheusMetrics struct {
vaaTxTrackerCount *prometheus.CounterVec
vaaProcesedDuration *prometheus.HistogramVec
vaaTxTrackerCount *prometheus.CounterVec
vaaProcesedDuration *prometheus.HistogramVec
rpcCallCount *prometheus.CounterVec
storeUnprocessedOriginTx *prometheus.CounterVec
vaaProcessed *prometheus.CounterVec
}
// NewPrometheusMetrics returns a new instance of PrometheusMetrics.
@ -32,10 +37,39 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
},
Buckets: []float64{.01, .05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 120, 300, 600, 1200},
}, []string{"chain"})
rpcCallCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "rpc_call_count_by_chain",
Help: "Total number of rpc calls by chain",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
}, []string{"chain", "rpc", "status"})
storeUnprocessedOriginTx := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "store_unprocessed_origin_tx",
Help: "Total number of unprocessed origin tx",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
}, []string{"chain"})
vaaProcessed := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "vaa_processed",
Help: "Total number of processed vaa with retry context",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
}, []string{"chain", "retry", "status"})
return &PrometheusMetrics{
vaaTxTrackerCount: vaaTxTrackerCount,
vaaProcesedDuration: vaaProcesedDuration,
vaaTxTrackerCount: vaaTxTrackerCount,
vaaProcesedDuration: vaaProcesedDuration,
rpcCallCount: rpcCallCount,
storeUnprocessedOriginTx: storeUnprocessedOriginTx,
vaaProcessed: vaaProcessed,
}
}
@ -74,3 +108,33 @@ func (m *PrometheusMetrics) IncVaaWithTxHashFixed(chainID uint16) {
chain := vaa.ChainID(chainID).String()
m.vaaTxTrackerCount.WithLabelValues(chain, "vaa_txhash_fixed").Inc()
}
// IncCallRpcSuccess increments the number of successful rpc calls.
func (m *PrometheusMetrics) IncCallRpcSuccess(chainID uint16, rpc string) {
chain := vaa.ChainID(chainID).String()
m.rpcCallCount.WithLabelValues(chain, rpc, "success").Inc()
}
// IncCallRpcError increments the number of failed rpc calls.
func (m *PrometheusMetrics) IncCallRpcError(chainID uint16, rpc string) {
chain := vaa.ChainID(chainID).String()
m.rpcCallCount.WithLabelValues(chain, rpc, "error").Inc()
}
// IncStoreUnprocessedOriginTx increments the number of unprocessed origin tx.
func (m *PrometheusMetrics) IncStoreUnprocessedOriginTx(chainID uint16) {
chain := vaa.ChainID(chainID).String()
m.storeUnprocessedOriginTx.WithLabelValues(chain).Inc()
}
// IncVaaProcessed increments the number of processed vaa.
func (m *PrometheusMetrics) IncVaaProcessed(chainID uint16, retry uint8) {
chain := vaa.ChainID(chainID).String()
m.vaaProcessed.WithLabelValues(chain, strconv.Itoa(int(retry)), "success").Inc()
}
// IncVaaFailed increments the number of failed vaa.
func (m *PrometheusMetrics) IncVaaFailed(chainID uint16, retry uint8) {
chain := vaa.ChainID(chainID).String()
m.vaaProcessed.WithLabelValues(chain, strconv.Itoa(int(retry)), "failed").Inc()
}

View File

@ -3,6 +3,7 @@ package queue
import (
"context"
"encoding/json"
"strconv"
"sync"
"time"
@ -96,6 +97,7 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
}
q.metrics.IncVaaConsumedQueue(uint16(event.ChainID))
retry, _ := strconv.Atoi(msg.Attributes["ApproximateReceiveCount"])
q.wg.Add(1)
q.ch <- &sqsConsumerMessage{
id: msg.ReceiptHandle,
@ -104,6 +106,8 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
logger: q.logger,
consumer: q.consumer,
expiredAt: expiredAt,
retry: uint8(retry),
metrics: q.metrics,
ctx: ctx,
}
}
@ -126,6 +130,8 @@ type sqsConsumerMessage struct {
id *string
logger *zap.Logger
expiredAt time.Time
retry uint8
metrics metrics.Metrics
ctx context.Context
}
@ -142,13 +148,19 @@ func (m *sqsConsumerMessage) Done() {
zap.Error(err),
)
}
m.metrics.IncVaaProcessed(uint16(m.data.ChainID), m.retry)
m.wg.Done()
}
func (m *sqsConsumerMessage) Failed() {
m.metrics.IncVaaFailed(uint16(m.data.ChainID), m.retry)
m.wg.Done()
}
func (m *sqsConsumerMessage) IsExpired() bool {
return m.expiredAt.Before(time.Now())
}
func (m *sqsConsumerMessage) Retry() uint8 {
return m.retry
}

View File

@ -60,6 +60,7 @@ func GetAttributes[T EventAttributes](e *Event) (T, bool) {
// ConsumerMessage defition.
type ConsumerMessage interface {
Retry() uint8
Data() *Event
Done()
Failed()