Add wormchain injective integration (#1260)

* Add wormchain injective integration

* Add metrics and logger for unknown wormchain

* Add wormchain rpc config
Co-authored-by: ftocal <fert1335@gmail.com>

* Add evmos, kujira, osmosis to wormchainRpcProviders
Co-authored-by: ftocal <fert1335@gmail.com>

* Add backfiller for tx-tracker with emitterChainId, emitterAddress and range time parameters

Co-authored-by: walker-16 <agpazos85@gmail.com>

---------

Co-authored-by: Fernando Torres <fert1335@gmail.com>
This commit is contained in:
walker-16 2024-04-05 15:24:21 -03:00 committed by GitHub
parent aabad70e4d
commit 04da0b3e4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 784 additions and 634 deletions

3
.gitignore vendored
View File

@ -17,4 +17,5 @@ serviceAccountKey.json
bigtableAccountKey.json
tsconfig.tsbuildinfo
serviceAccount.json
.run/
.run/
__debug_*

View File

@ -30,8 +30,8 @@ type VaaDoc struct {
Timestamp *time.Time `bson:"timestamp"`
UpdatedAt *time.Time `bson:"updatedAt"`
TxHash string `bson:"txHash"`
Version uint16 `bson:"version"`
Revision uint16 `bson:"revision"`
Version int `bson:"version"`
Revision int `bson:"revision"`
}
// VaaQuery is a query for VAA.

View File

@ -0,0 +1,73 @@
---
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .NAME }}-backfiller
namespace: {{ .NAMESPACE }}
data:
rpcProviders.json: |-
---
apiVersion: batch/v1
kind: Job
metadata:
name: {{ .NAME }}-backfiller
namespace: {{ .NAMESPACE }}
spec:
template:
metadata:
labels:
app: {{ .NAME }}-backfiller
spec:
restartPolicy: Never
terminationGracePeriodSeconds: 40
containers:
- name: {{ .NAME }}-backfiller
image: {{ .IMAGE_NAME }}
imagePullPolicy: Always
env:
- name: MONGODB_URI
valueFrom:
secretKeyRef:
name: mongodb
key: mongo-uri
- name: MONGODB_DATABASE
valueFrom:
configMapKeyRef:
name: config
key: mongo-database
command: ["/tx-tracker"]
args:
- backfiller
- vaas
- --mongo-uri
- "$(MONGODB_URI)"
- --mongo-database
- "$(MONGODB_DATABASE)"
- --rpc-providers-path
- "/opt/tx-tracker/config/rpcProviders.json"
- --p2p-network
- "{{ .P2P_NETWORK }}"
- --start-time
- "2023-04-19T00:00:00Z"
- --end-time
- "2023-04-21T00:00:00Z"
- --emitter-chain
- "1"
- --page-size
- "1000"
- --requests-per-minute
- "12"
- --num-workers
- "10"
- --overwrite
- --disable-db-upsert
volumeMounts:
- name: config-volume
mountPath: /opt/tx-tracker/config
volumes:
- name: config-volume
configMap:
name: {{ .NAME }}-backfiller
items:
- key: rpcProviders.json
path: rpcProviders.json

View File

@ -173,8 +173,8 @@ func publishVaa(ctx context.Context, push topic.PushFunc, queue chan *repository
Timestamp: vaa.Timestamp,
UpdatedAt: vaa.UpdatedAt,
TxHash: vaa.TxHash,
Version: vaa.Version,
Revision: vaa.Revision,
Version: uint16(vaa.Version),
Revision: uint16(vaa.Revision),
}); err != nil {
logger.Error("Failed to push vaa", zap.Error(err))
} else {

View File

@ -15,10 +15,11 @@ import (
)
type apiWormchain struct {
p2pNetwork string
evmosPool *pool.Pool
kujiraPool *pool.Pool
osmosisPool *pool.Pool
p2pNetwork string
evmosPool *pool.Pool
kujiraPool *pool.Pool
osmosisPool *pool.Pool
injectivePool *pool.Pool
}
type wormchainTxDetail struct {
@ -438,6 +439,110 @@ func fetchKujiraDetail(ctx context.Context, baseUrl string, sequence, timestamp,
return &kujiraTx{txHash: strings.ToLower(kReponse.Result.Txs[0].Hash)}, nil
}
type injectiveRequest struct {
Jsonrpc string `json:"jsonrpc"`
ID int `json:"id"`
Method string `json:"method"`
Params struct {
Query string `json:"query"`
Page string `json:"page"`
} `json:"params"`
}
type injectiveResponse struct {
Jsonrpc string `json:"jsonrpc"`
ID int `json:"id"`
Result struct {
Txs []struct {
Hash string `json:"hash"`
Height string `json:"height"`
Index int `json:"index"`
TxResult struct {
Code int `json:"code"`
Data string `json:"data"`
Log string `json:"log"`
Info string `json:"info"`
GasWanted string `json:"gas_wanted"`
GasUsed string `json:"gas_used"`
Events []struct {
Type string `json:"type"`
Attributes []struct {
Key string `json:"key"`
Value string `json:"value"`
Index bool `json:"index"`
} `json:"attributes"`
} `json:"events"`
Codespace string `json:"codespace"`
} `json:"tx_result"`
Tx string `json:"tx"`
} `json:"txs"`
TotalCount string `json:"total_count"`
} `json:"result"`
}
type injectiveTx struct {
txHash string
}
func (a *apiWormchain) fetchInjectiveDetail(ctx context.Context, pool *pool.Pool, sequence, timestamp, srcChannel, dstChannel string, metrics metrics.Metrics) (*injectiveTx, error) {
if pool == nil {
return nil, fmt.Errorf("injective rpc pool not found")
}
injectiveRpcs := pool.GetItems()
if len(injectiveRpcs) == 0 {
return nil, fmt.Errorf("injective rpcs not found")
}
for _, rpc := range injectiveRpcs {
rpc.Wait(ctx)
injectiveTx, err := fetchInjectiveDetail(ctx, rpc.Id, sequence, timestamp, srcChannel, dstChannel)
if injectiveTx != nil {
success := fmt.Sprintf("Successfully fetched transaction from injective: %s", rpc.Id)
fmt.Sprintln(success)
metrics.IncCallRpcSuccess(uint16(sdk.ChainIDInjective), rpc.Description)
return injectiveTx, nil
}
error := fmt.Sprintf("Failed to fetch transaction from injective: %s", rpc.Id)
fmt.Sprintln(error)
if err != nil {
metrics.IncCallRpcError(uint16(sdk.ChainIDInjective), rpc.Description)
}
}
return nil, fmt.Errorf("injective tx not found")
}
func fetchInjectiveDetail(ctx context.Context, baseUrl string, sequence, timestamp, srcChannel, dstChannel string) (*injectiveTx, error) {
queryTemplate := `send_packet.packet_sequence='%s' AND send_packet.packet_timeout_timestamp='%s' AND send_packet.packet_src_channel='%s' AND send_packet.packet_dst_channel='%s'`
query := fmt.Sprintf(queryTemplate, sequence, timestamp, srcChannel, dstChannel)
q := injectiveRequest{
Jsonrpc: "2.0",
ID: 1,
Method: "tx_search",
Params: struct {
Query string `json:"query"`
Page string `json:"page"`
}{
Query: query,
Page: "1",
},
}
response, err := httpPost(ctx, baseUrl, q)
if err != nil {
return nil, err
}
var iReponse injectiveResponse
err = json.Unmarshal(response, &iReponse)
if err != nil {
return nil, err
}
if len(iReponse.Result.Txs) == 0 {
return nil, fmt.Errorf("can not found hash for sequence %s, timestamp %s, srcChannel %s, dstChannel %s", sequence, timestamp, srcChannel, dstChannel)
}
return &injectiveTx{txHash: strings.ToLower(iReponse.Result.Txs[0].Hash)}, nil
}
type WorchainAttributeTxDetail struct {
OriginChainID sdk.ChainID `bson:"originChainId"`
OriginTxHash string `bson:"originTxHash"`
@ -545,6 +650,34 @@ func (a *apiWormchain) FetchWormchainTx(
}, nil
}
// Verify if this transaction is from injective by wormchain
if a.isInjectiveTx(wormchainTx) {
injectiveTx, err := a.fetchInjectiveDetail(ctx, a.injectivePool, wormchainTx.sequence, wormchainTx.timestamp, wormchainTx.srcChannel, wormchainTx.dstChannel, metrics)
if err != nil {
return nil, err
}
return &TxDetail{
NativeTxHash: txHash,
From: wormchainTx.receiver,
Attribute: &AttributeTxDetail{
Type: "wormchain-gateway",
Value: &WorchainAttributeTxDetail{
OriginChainID: sdk.ChainIDInjective,
OriginTxHash: injectiveTx.txHash,
OriginAddress: wormchainTx.sender,
},
},
}, nil
}
// If the transaction is not from any known cosmos chain, increment the unknown wormchain transaction metric.
metrics.IncWormchainUnknown(wormchainTx.srcChannel, wormchainTx.dstChannel)
logger.Debug("Unknown wormchain transaction",
zap.String("srcChannel", wormchainTx.srcChannel),
zap.String("dstChannel", wormchainTx.dstChannel),
zap.String("txHash", txHash))
return &TxDetail{
NativeTxHash: txHash,
From: wormchainTx.receiver,
@ -582,3 +715,14 @@ func (a *apiWormchain) isEvmosTx(tx *wormchainTx) bool {
// }
return false
}
func (a *apiWormchain) isInjectiveTx(tx *wormchainTx) bool {
if a.p2pNetwork == domain.P2pMainNet {
return tx.srcChannel == "channel-183" && tx.dstChannel == "channel-13"
}
// Pending get channels for testnet
// if a.p2pNetwork == domain.P2pTestNet {
// return tx.srcChannel == "" && tx.dstChannel == ""
// }
return false
}

View File

@ -34,6 +34,7 @@ type AttributeTxDetail struct {
func FetchTx(
ctx context.Context,
rpcPool map[sdk.ChainID]*pool.Pool,
wormchainRpcPool map[sdk.ChainID]*pool.Pool,
chainId sdk.ChainID,
txHash string,
timestamp *time.Time,
@ -87,10 +88,11 @@ func FetchTx(
fetchFunc = apiEvm.FetchEvmTx
case sdk.ChainIDWormchain:
apiWormchain := &apiWormchain{
p2pNetwork: p2pNetwork,
evmosPool: rpcPool[sdk.ChainIDEvmos],
kujiraPool: rpcPool[sdk.ChainIDKujira],
osmosisPool: rpcPool[sdk.ChainIDOsmosis],
p2pNetwork: p2pNetwork,
evmosPool: wormchainRpcPool[sdk.ChainIDEvmos],
kujiraPool: wormchainRpcPool[sdk.ChainIDKujira],
osmosisPool: wormchainRpcPool[sdk.ChainIDOsmosis],
injectivePool: wormchainRpcPool[sdk.ChainIDInjective],
}
fetchFunc = apiWormchain.FetchWormchainTx
case sdk.ChainIDSei:

View File

@ -1,423 +0,0 @@
package backfiller
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/configuration"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/common/utils"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/consumer"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
func makeLogger(logger *zap.Logger, name string) *zap.Logger {
rightPadding := fmt.Sprintf("%-10s", name)
l := logger.Named(rightPadding)
return l
}
type getStrategyCallbacksFunc func(logger *zap.Logger, cfg *config.BackfillerSettings, r *consumer.Repository) (*strategyCallbacks, error)
func RunByTimeRange(after, before string) {
timestampAfter, err := time.Parse(time.RFC3339, after)
if err != nil {
log.Fatal("Failed to parse timestampAfter: ", err)
}
timestampBefore, err := time.Parse(time.RFC3339, before)
if err != nil {
log.Fatal("Failed to parse timestampBefore: ", err)
}
callback := func(logger *zap.Logger, cfg *config.BackfillerSettings, r *consumer.Repository) (*strategyCallbacks, error) {
cb := strategyCallbacks{
countFn: func(ctx context.Context) (uint64, error) {
return r.CountDocumentsByTimeRange(ctx, timestampAfter, timestampBefore)
},
iteratorFn: func(ctx context.Context, lastId string, lastTimestamp *time.Time, limit uint) ([]consumer.GlobalTransaction, error) {
return r.GetDocumentsByTimeRange(ctx, lastId, lastTimestamp, limit, timestampAfter, timestampBefore)
},
}
return &cb, nil
}
run(callback)
}
func RunForIncompletes() {
callback := func(logger *zap.Logger, cfg *config.BackfillerSettings, r *consumer.Repository) (*strategyCallbacks, error) {
cb := strategyCallbacks{
countFn: r.CountIncompleteDocuments,
iteratorFn: r.GetIncompleteDocuments,
}
return &cb, nil
}
run(callback)
}
func RunByVaas(emitterChainID uint16, emitterAddress string, sequence string) {
chainID := sdk.ChainID(emitterChainID)
if !domain.ChainIdIsValid(chainID) {
log.Fatalf("Invalid chain ID [%d]", emitterChainID)
}
callback := func(logger *zap.Logger, cfg *config.BackfillerSettings, r *consumer.Repository) (*strategyCallbacks, error) {
cb := strategyCallbacks{
countFn: func(ctx context.Context) (uint64, error) {
return r.CountDocumentsByVaas(ctx, chainID, emitterAddress, sequence)
},
iteratorFn: func(ctx context.Context, lastId string, lastTimestamp *time.Time, limit uint) ([]consumer.GlobalTransaction, error) {
return r.GetDocumentsByVaas(ctx, lastId, lastTimestamp, limit, chainID, emitterAddress, sequence)
},
}
return &cb, nil
}
run(callback)
}
func run(getStrategyCallbacksFunc getStrategyCallbacksFunc) {
// Create the top-level context
rootCtx, rootCtxCancel := context.WithCancel(context.Background())
// Load config
cfg, err := config.NewBackfillerSettings()
if err != nil {
log.Fatal("Failed to load config: ", err)
}
// create rpc pool
rpcPool, err := newRpcPool(cfg)
if err != nil {
log.Fatal("Failed to initialize rpc pool: ", zap.Error(err))
}
// Initialize logger
rootLogger := logger.New("backfiller", logger.WithLevel(cfg.LogLevel))
mainLogger := makeLogger(rootLogger, "main")
mainLogger.Info("Starting")
// Spawn a goroutine that will call `cancelFunc` if a signal is received.
go func() {
l := makeLogger(rootLogger, "watcher")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-rootCtx.Done():
l.Info("Closing due to cancelled context")
case <-sigterm:
l.Info("Cancelling root context")
rootCtxCancel()
}
}()
// Initialize the database client
db, err := dbutil.Connect(rootCtx, mainLogger, cfg.MongodbUri, cfg.MongodbDatabase, false)
if err != nil {
log.Fatal("Failed to initialize MongoDB client: ", err)
}
repository := consumer.NewRepository(rootLogger, db.Database)
strategyCallbacks, err := getStrategyCallbacksFunc(mainLogger, cfg, repository)
if err != nil {
log.Fatal("Failed to parse strategy callbacks: ", err)
}
// Count the number of documents to process
totalDocuments, err := strategyCallbacks.countFn(rootCtx)
if err != nil {
log.Fatal("Closing - failed to count number of global transactions: ", err)
}
mainLogger.Info("Starting", zap.Uint64("documentsToProcess", totalDocuments))
// Spawn the producer goroutine.
//
// The producer sends tasks to the workers via a buffered channel.
queue := make(chan consumer.GlobalTransaction, cfg.BulkSize)
p := producerParams{
logger: makeLogger(rootLogger, "producer"),
repository: repository,
queueTx: queue,
bulkSize: cfg.BulkSize,
strategyCallbacks: strategyCallbacks,
}
go produce(rootCtx, &p)
// Spawn a goroutine for each worker
var wg sync.WaitGroup
var processedDocuments atomic.Uint64
wg.Add(int(cfg.NumWorkers))
for i := uint(0); i < cfg.NumWorkers; i++ {
name := fmt.Sprintf("worker-%d", i)
p := consumerParams{
logger: makeLogger(rootLogger, name),
rpcPool: rpcPool,
repository: repository,
queueRx: queue,
wg: &wg,
totalDocuments: totalDocuments,
processedDocuments: &processedDocuments,
p2pNetwork: cfg.P2pNetwork,
}
go consume(rootCtx, &p)
}
// Wait for all workers to finish before closing
mainLogger.Info("Waiting for all workers to finish...")
wg.Wait()
mainLogger.Info("Closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
mainLogger.Info("Closing main goroutine")
}
type strategyCallbacks struct {
countFn func(ctx context.Context) (uint64, error)
iteratorFn func(ctx context.Context, lastId string, lastTimestamp *time.Time, limit uint) ([]consumer.GlobalTransaction, error)
}
// producerParams contains the parameters for the producer goroutine.
type producerParams struct {
logger *zap.Logger
repository *consumer.Repository
queueTx chan<- consumer.GlobalTransaction
bulkSize uint
strategyCallbacks *strategyCallbacks
}
// produce reads VAA IDs from the database, and sends them through a channel for the workers to consume.
//
// The function will return when:
// - the context is cancelled
// - a fatal error is encountered
// - there are no more items to process
func produce(ctx context.Context, params *producerParams) {
defer close(params.queueTx)
// Producer main loop
var lastId = ""
var lastTimestamp *time.Time
for {
// Get a batch of VAA IDs from the database
globalTxs, err := params.strategyCallbacks.iteratorFn(ctx, lastId, lastTimestamp, params.bulkSize)
if err != nil {
params.logger.Error("Closing: failed to read from cursor", zap.Error(err))
return
}
// If there are no more documents to process, close the goroutine
if len(globalTxs) == 0 {
params.logger.Info("Closing: no documents left in database")
return
}
// Enqueue the VAA IDs, and update the pagination cursor
params.logger.Debug("queueing batch for consumers", zap.Int("numElements", len(globalTxs)))
for _, globalTx := range globalTxs {
select {
case params.queueTx <- globalTx:
if len(globalTx.Vaas) != 0 {
lastId = globalTx.Id
lastTimestamp = globalTx.Vaas[0].Timestamp
}
case <-ctx.Done():
params.logger.Info("Closing: context was cancelled")
return
}
}
}
}
// consumerParams contains the parameters for the consumer goroutine.
type consumerParams struct {
logger *zap.Logger
rpcPool map[sdk.ChainID]*pool.Pool
repository *consumer.Repository
queueRx <-chan consumer.GlobalTransaction
wg *sync.WaitGroup
totalDocuments uint64
processedDocuments *atomic.Uint64
p2pNetwork string
}
// consume reads VAA IDs from a channel, processes them, and updates the database accordingly.
//
// The function will return when:
// - the context is cancelled
// - a fatal error is encountered
// - the channel is closed (i.e.: no more items to process)
func consume(ctx context.Context, params *consumerParams) {
metrics := metrics.NewDummyMetrics()
// Main loop: fetch global txs and process them
for {
select {
// Try to pop a globalTransaction from the queue
case globalTx, ok := <-params.queueRx:
// If the channel was closed, exit immediately
if !ok {
params.logger.Debug("Closing, channel was closed")
params.wg.Done()
return
}
// Sanity check
if len(globalTx.Vaas) != 1 {
params.logger.Warn("globalTransaction doesn't match exactly one VAA, skipping",
zap.String("vaaId", globalTx.Id),
zap.Int("matches", len(globalTx.Vaas)),
)
params.processedDocuments.Add(1)
continue
}
if globalTx.Vaas[0].TxHash == nil {
params.logger.Warn("VAA doesn't have a TxHash, skipping",
zap.String("vaaId", globalTx.Id),
)
params.processedDocuments.Add(1)
continue
}
params.logger.Debug("Processing source tx",
zap.String("vaaId", globalTx.Id),
zap.String("txid", *globalTx.Vaas[0].TxHash),
)
// Process the transaction
//
// This involves:
// 1. Querying an API/RPC service for the source tx details
// 2. Persisting source tx details in the database.
v := globalTx.Vaas[0]
p := consumer.ProcessSourceTxParams{
TrackID: "backfiller",
Timestamp: v.Timestamp,
VaaId: v.ID,
ChainId: v.EmitterChain,
Emitter: v.EmitterAddr,
Sequence: v.Sequence,
TxHash: *v.TxHash,
Overwrite: true, // Overwrite old contents
Metrics: metrics,
}
_, err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcPool, params.repository, &p, params.p2pNetwork)
if err != nil {
params.logger.Error("Failed to track source tx",
zap.String("vaaId", globalTx.Id),
zap.Error(err),
)
params.processedDocuments.Add(1)
continue
}
params.processedDocuments.Add(1)
params.logger.Debug("Updated source tx",
zap.String("vaaId", globalTx.Id),
zap.String("txid", *globalTx.Vaas[0].TxHash),
zap.String("progress", fmt.Sprintf("%d/%d", params.processedDocuments.Load(), params.totalDocuments)),
)
// If the context was cancelled, exit immediately
case <-ctx.Done():
params.logger.Info("Closing due to cancelled context")
params.wg.Done()
return
}
}
}
func newRpcPool(cfg *config.BackfillerSettings) (map[sdk.ChainID]*pool.Pool, error) {
var rpcConfigMap map[sdk.ChainID][]config.RpcConfig
var err error
if cfg.RpcProviderSettingsJson != nil {
rpcConfigMap, err = cfg.MapRpcProviderToRpcConfig()
if err != nil {
return nil, err
}
} else if cfg.RpcProviderSettings != nil {
// get rpc settings map
rpcConfigMap, err = cfg.RpcProviderSettings.ToMap()
if err != nil {
return nil, err
}
var testRpcConfig *config.TestnetRpcProviderSettings
if configuration.IsTestnet(cfg.P2pNetwork) {
testRpcConfig, err = config.LoadFromEnv[config.TestnetRpcProviderSettings]()
if err != nil {
log.Fatal("Error loading testnet rpc config: ", err)
}
}
// get rpc testnet settings map
var rpcTestnetMap map[sdk.ChainID][]config.RpcConfig
if testRpcConfig != nil {
rpcTestnetMap, err = cfg.TestnetRpcProviderSettings.ToMap()
if err != nil {
return nil, err
}
}
// merge rpc testnet settings to rpc settings map
if len(rpcTestnetMap) > 0 {
for chainID, rpcConfig := range rpcTestnetMap {
rpcConfigMap[chainID] = append(rpcConfigMap[chainID], rpcConfig...)
}
}
} else {
return nil, errors.New("rpc provider settings not found")
}
domains := []string{".network", ".cloud", ".com", ".io", ".build", ".team", ".dev", ".zone", ".org", ".net", ".in"}
// convert rpc settings map to rpc pool
convertFn := func(rpcConfig []config.RpcConfig) []pool.Config {
poolConfigs := make([]pool.Config, 0, len(rpcConfig))
for _, rpc := range rpcConfig {
poolConfigs = append(poolConfigs, pool.Config{
Id: rpc.Url,
Priority: rpc.Priority,
Description: utils.FindSubstringBeforeDomains(rpc.Url, domains),
RequestsPerMinute: rpc.RequestsPerMinute,
})
}
return poolConfigs
}
// create rpc pool
rpcPool := make(map[sdk.ChainID]*pool.Pool)
for chainID, rpcConfig := range rpcConfigMap {
rpcPool[chainID] = pool.NewPool(convertFn(rpcConfig))
}
return rpcPool, nil
}

View File

@ -0,0 +1,296 @@
package backfiller
import (
"context"
"errors"
"log"
"sync"
"sync/atomic"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/common/repository"
"github.com/wormhole-foundation/wormhole-explorer/common/utils"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/consumer"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/ratelimit"
"go.uber.org/zap"
)
type VaasBackfiller struct {
P2pNetwork string
LogLevel string
MongoURI string
MongoDatabase string
RequestsPerMinute int64
StartTime string
EndTime string
EmitterChainID *sdk.ChainID
EmitterAddress *string
Overwrite bool
DisableDBUpsert bool
PageSize int64
NumWorkers int
RpcProvidersPath string
}
type vaasBackfillerParams struct {
logger *zap.Logger
rpcPool map[sdk.ChainID]*pool.Pool
wormchainRpcPool map[sdk.ChainID]*pool.Pool
repository *consumer.Repository
queue chan *repository.VaaDoc
wg *sync.WaitGroup
processedDocumentsSuccess *atomic.Uint64
processedDocumentsWithError *atomic.Uint64
p2pNetwork string
overwrite bool
disableDBUpsert bool
limiter ratelimit.Limiter
}
func RunByVaas(backfillerConfig *VaasBackfiller) {
ctx := context.Background()
// Load config
cfg, err := config.NewRpcProviderSettingJson(backfillerConfig.RpcProvidersPath)
if err != nil {
log.Fatal("Failed to load config: ", err)
}
// create rpc pool
rpcPool, wormchainRpcPool, err := newRpcPool(cfg)
if err != nil {
log.Fatal("Failed to initialize rpc pool: ", zap.Error(err))
}
logger := logger.New("wormhole-explorer-tx-tracker", logger.WithLevel(backfillerConfig.LogLevel))
logger.Info("Starting wormhole-explorer-tx-tracker as vaas backfiller ...")
startTime, err := time.Parse(time.RFC3339, backfillerConfig.StartTime)
if err != nil {
logger.Fatal("failed to parse start time", zap.Error(err))
}
endTime := time.Now()
if backfillerConfig.EndTime != "" {
endTime, err = time.Parse(time.RFC3339, backfillerConfig.EndTime)
if err != nil {
logger.Fatal("Failed to parse end time", zap.Error(err))
}
}
if startTime.After(endTime) {
logger.Fatal("Start time should be before end time",
zap.String("start_time", startTime.Format(time.RFC3339)),
zap.String("end_time", endTime.Format(time.RFC3339)))
}
//setup DB connection
db, err := dbutil.Connect(ctx, logger, backfillerConfig.MongoURI, backfillerConfig.MongoDatabase, false)
if err != nil {
logger.Fatal("failed to connect MongoDB", zap.Error(err))
}
// create a vaa repository.
vaaRepository := repository.NewVaaRepository(db.Database, logger)
// create a consumer repository.
globalTrxRepository := consumer.NewRepository(logger, db.Database)
query := repository.VaaQuery{
StartTime: &startTime,
EndTime: &endTime,
EmitterChainID: backfillerConfig.EmitterChainID,
EmitterAddress: backfillerConfig.EmitterAddress,
}
limiter := ratelimit.New(int(backfillerConfig.RequestsPerMinute), ratelimit.Per(time.Minute))
pagination := repository.Pagination{
Page: 0,
PageSize: backfillerConfig.PageSize,
SortAsc: true,
}
queue := make(chan *repository.VaaDoc, 5*backfillerConfig.PageSize)
var quantityProduced, quantityConsumedWithError, quantityConsumedSuccess atomic.Uint64
go getVaas(ctx, logger, pagination, query, vaaRepository, queue, &quantityProduced)
var wg sync.WaitGroup
wg.Add(backfillerConfig.NumWorkers)
for i := 0; i < backfillerConfig.NumWorkers; i++ {
p := vaasBackfillerParams{
wg: &wg,
logger: logger.With(zap.Int("worker", i)),
rpcPool: rpcPool,
queue: queue,
wormchainRpcPool: wormchainRpcPool,
repository: globalTrxRepository,
p2pNetwork: backfillerConfig.P2pNetwork,
limiter: limiter,
overwrite: backfillerConfig.Overwrite,
disableDBUpsert: backfillerConfig.DisableDBUpsert,
processedDocumentsSuccess: &quantityConsumedSuccess,
processedDocumentsWithError: &quantityConsumedWithError,
}
go processVaa(ctx, &p)
}
logger.Info("Waiting for all workers to finish...")
wg.Wait()
logger.Info("closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
logger.Info("Finish wormhole-explorer-tx-tracker as vaas backfiller",
zap.Uint64("produced", quantityProduced.Load()),
zap.Uint64("consumer_success", quantityConsumedSuccess.Load()),
zap.Uint64("consumed_error", quantityConsumedWithError.Load()))
}
func getVaas(ctx context.Context, logger *zap.Logger, pagination repository.Pagination, query repository.VaaQuery,
vaaRepository *repository.VaaRepository, queue chan *repository.VaaDoc, quantityProduced *atomic.Uint64) {
defer close(queue)
for {
logger.Info("Processing page", zap.Any("pagination", pagination), zap.Any("query", query))
vaas, err := vaaRepository.FindPage(ctx, query, pagination)
if err != nil {
logger.Error("Failed to get vaas", zap.Error(err))
break
}
if len(vaas) == 0 {
logger.Info("Empty page", zap.Int64("page", pagination.Page))
break
}
for _, vaa := range vaas {
queue <- vaa
quantityProduced.Add(1)
}
pagination.Page++
}
for {
select {
case <-time.After(10 * time.Second):
if len(queue) == 0 {
logger.Info("Closing, queue is empty")
return
}
case <-ctx.Done():
logger.Info("Closing due to cancelled context")
return
}
}
}
func processVaa(ctx context.Context, params *vaasBackfillerParams) {
// Main loop: fetch global txs and process them
metrics := metrics.NewDummyMetrics()
defer params.wg.Done()
for {
select {
// Try to pop a globalTransaction from the queue
case v, ok := <-params.queue:
// If the channel was closed, exit immediately
if !ok {
params.logger.Info("Closing, channel was closed")
return
}
params.limiter.Take()
p := consumer.ProcessSourceTxParams{
TrackID: "backfiller",
Timestamp: v.Timestamp,
VaaId: v.ID,
ChainId: sdk.ChainID(v.ChainID),
Emitter: v.EmitterAddress,
Sequence: v.Sequence,
TxHash: v.TxHash,
Overwrite: params.overwrite,
Metrics: metrics,
DisableDBUpsert: params.disableDBUpsert,
}
_, err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcPool, params.wormchainRpcPool, params.repository, &p, params.p2pNetwork)
if err != nil {
if errors.Is(err, consumer.ErrAlreadyProcessed) {
params.logger.Info("Source tx was already processed", zap.String("vaaId", v.ID))
params.processedDocumentsSuccess.Add(1)
continue
}
params.logger.Error("Failed to process source tx",
zap.String("vaaId", v.ID),
zap.Error(err),
)
params.processedDocumentsWithError.Add(1)
continue
} else {
params.processedDocumentsSuccess.Add(1)
params.logger.Info("Processed source tx", zap.String("vaaId", v.ID))
}
// If the context was cancelled, exit immediately
case <-ctx.Done():
params.logger.Info("Closing due to cancelled context")
return
}
}
}
func newRpcPool(cfg *config.RpcProviderSettingsJson) (map[sdk.ChainID]*pool.Pool, map[sdk.ChainID]*pool.Pool, error) {
if cfg == nil {
return nil, nil, errors.New("rpc provider settings is nil")
}
rpcConfigMap, err := cfg.ToMap()
if err != nil {
return nil, nil, err
}
wormchainRpcConfigMap, err := cfg.WormchainToMap()
if err != nil {
return nil, nil, err
}
domains := []string{".network", ".cloud", ".com", ".io", ".build", ".team", ".dev", ".zone", ".org", ".net", ".in"}
// convert rpc settings map to rpc pool
convertFn := func(rpcConfig []config.RpcConfig) []pool.Config {
poolConfigs := make([]pool.Config, 0, len(rpcConfig))
for _, rpc := range rpcConfig {
poolConfigs = append(poolConfigs, pool.Config{
Id: rpc.Url,
Priority: rpc.Priority,
Description: utils.FindSubstringBeforeDomains(rpc.Url, domains),
RequestsPerMinute: rpc.RequestsPerMinute,
})
}
return poolConfigs
}
// create rpc pool
rpcPool := make(map[sdk.ChainID]*pool.Pool)
for chainID, rpcConfig := range rpcConfigMap {
rpcPool[chainID] = pool.NewPool(convertFn(rpcConfig))
}
// create wormchain rpc pool
wormchainRpcPool := make(map[sdk.ChainID]*pool.Pool)
for chainID, rpcConfig := range wormchainRpcConfigMap {
wormchainRpcPool[chainID] = pool.NewPool(convertFn(rpcConfig))
}
return rpcPool, wormchainRpcPool, nil
}

View File

@ -4,6 +4,7 @@ import (
"github.com/spf13/cobra"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/cmd/backfiller"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/cmd/service"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
)
func main() {
@ -42,61 +43,66 @@ func addBackfiller(parent *cobra.Command) {
Use: "backfiller",
}
addBackfillerByTimeRange(backfiller)
addBackfillerForIncompletes(backfiller)
addBackfillerByVaas(backfiller)
parent.AddCommand(backfiller)
}
func addBackfillerByTimeRange(parent *cobra.Command) {
var before, after string
timeRange := &cobra.Command{
Use: "time-range",
Short: "Run backfiller for a time range",
Run: func(_ *cobra.Command, _ []string) {
backfiller.RunByTimeRange(after, before)
},
}
// before flag
timeRange.Flags().StringVar(&before, "before", "", "before timestamp in RFC3339 format")
timeRange.MarkFlagRequired("before")
// after flag
timeRange.Flags().StringVar(&after, "after", "", "after timestamp in RFC3339 format")
timeRange.MarkFlagRequired("after")
parent.AddCommand(timeRange)
}
func addBackfillerForIncompletes(parent *cobra.Command) {
incompletes := &cobra.Command{
Use: "incompletes",
Short: "Run backfiller for source tx incompletes",
Run: func(_ *cobra.Command, _ []string) {
backfiller.RunForIncompletes()
},
}
parent.AddCommand(incompletes)
}
func addBackfillerByVaas(parent *cobra.Command) {
var emitterAddress, sequence string
var mongoUri, mongoDb, logLevel, startTime, endTime, p2pNetwork, emitterAddress, rpcProvidersPath string
var numWorkers int
var emitterChainID uint16
var pageSize, requestsPerMinute int64
var overwrite, disableDBUpsert bool
vaas := &cobra.Command{
Use: "vaas",
Short: "Run backfiller for vaas",
Run: func(_ *cobra.Command, _ []string) {
backfiller.RunByVaas(emitterChainID, emitterAddress, sequence)
cfg := &backfiller.VaasBackfiller{
LogLevel: logLevel,
P2pNetwork: p2pNetwork,
MongoURI: mongoUri,
MongoDatabase: mongoDb,
RequestsPerMinute: requestsPerMinute,
StartTime: startTime,
EndTime: endTime,
PageSize: pageSize,
NumWorkers: numWorkers,
Overwrite: overwrite,
DisableDBUpsert: disableDBUpsert,
RpcProvidersPath: rpcProvidersPath,
}
if emitterChainID != 0 {
eci := sdk.ChainID(emitterChainID)
cfg.EmitterChainID = &eci
}
if emitterAddress != "" {
cfg.EmitterAddress = &emitterAddress
}
backfiller.RunByVaas(cfg)
},
}
// emitter-chain flag
vaas.Flags().StringVar(&logLevel, "log-level", "INFO", "log level")
vaas.Flags().StringVar(&p2pNetwork, "p2p-network", "", "P2P network to use")
vaas.Flags().StringVar(&mongoUri, "mongo-uri", "", "Mongo connection")
vaas.Flags().StringVar(&mongoDb, "mongo-database", "", "Mongo database")
vaas.Flags().StringVar(&startTime, "start-time", "1970-01-01T00:00:00Z", "minimum VAA timestamp to process")
vaas.Flags().StringVar(&endTime, "end-time", "", "maximum VAA timestamp to process (default now)")
vaas.Flags().Int64Var(&pageSize, "page-size", 100, "number of documents retrieved at a time")
vaas.Flags().Int64Var(&requestsPerMinute, "requests-per-minute", 12, "maximum number of requests per minute to process VAA documents")
vaas.Flags().IntVar(&numWorkers, "num-workers", 1, "number of workers to process VAA documents concurrently")
vaas.Flags().Uint16Var(&emitterChainID, "emitter-chain", 0, "emitter chain id")
vaas.MarkFlagRequired("emitter-chain")
// emitter-address flag
vaas.Flags().StringVar(&emitterAddress, "emitter-address", "", "emitter address")
vaas.Flags().BoolVar(&overwrite, "overwrite", false, "overwrite existing data")
vaas.Flags().BoolVar(&disableDBUpsert, "disable-db-upsert", false, "disable db upsert")
vaas.Flags().StringVar(&rpcProvidersPath, "rpc-providers-path", "", "path to rpc providers file")
// sequence flag
vaas.Flags().StringVar(&sequence, "sequence", "", "sequence")
vaas.MarkFlagRequired("mongo-uri")
vaas.MarkFlagRequired("p2p-network")
vaas.MarkFlagRequired("mongo-database")
vaas.MarkFlagRequired("start-time")
vaas.MarkFlagRequired("rpc-providers-path")
parent.AddCommand(vaas)
}

View File

@ -49,7 +49,7 @@ func Run() {
logger.Info("Starting wormhole-explorer-tx-tracker ...")
// create rpc pool
rpcPool, err := newRpcPool(cfg)
rpcPool, wormchainRpcPool, err := newRpcPool(cfg)
if err != nil {
logger.Fatal("Failed to initialize rpc pool: ", zap.Error(err))
}
@ -65,7 +65,7 @@ func Run() {
vaaRepository := vaa.NewRepository(db.Database, logger)
// create controller
vaaController := vaa.NewController(rpcPool, vaaRepository, repository, cfg.P2pNetwork, logger)
vaaController := vaa.NewController(rpcPool, wormchainRpcPool, vaaRepository, repository, cfg.P2pNetwork, logger)
// start serving /health and /ready endpoints
healthChecks, err := makeHealthChecks(rootCtx, cfg, db.Database)
@ -77,12 +77,12 @@ func Run() {
// create and start a pipeline consumer.
vaaConsumeFunc := newVAAConsumeFunc(rootCtx, cfg, metrics, logger)
vaaConsumer := consumer.New(vaaConsumeFunc, rpcPool, rootCtx, logger, repository, metrics, cfg.P2pNetwork, cfg.ConsumerWorkersSize)
vaaConsumer := consumer.New(vaaConsumeFunc, rpcPool, wormchainRpcPool, rootCtx, logger, repository, metrics, cfg.P2pNetwork, cfg.ConsumerWorkersSize)
vaaConsumer.Start(rootCtx)
// create and start a notification consumer.
notificationConsumeFunc := newNotificationConsumeFunc(rootCtx, cfg, metrics, logger)
notificationConsumer := consumer.New(notificationConsumeFunc, rpcPool, rootCtx, logger, repository, metrics, cfg.P2pNetwork, cfg.ConsumerWorkersSize)
notificationConsumer := consumer.New(notificationConsumeFunc, rpcPool, wormchainRpcPool, rootCtx, logger, repository, metrics, cfg.P2pNetwork, cfg.ConsumerWorkersSize)
notificationConsumer.Start(rootCtx)
logger.Info("Started wormhole-explorer-tx-tracker")
@ -216,19 +216,20 @@ func newMetrics(cfg *config.ServiceSettings) metrics.Metrics {
return metrics.NewPrometheusMetrics(cfg.Environment)
}
func newRpcPool(cfg *config.ServiceSettings) (map[sdk.ChainID]*pool.Pool, error) {
func newRpcPool(cfg *config.ServiceSettings) (map[sdk.ChainID]*pool.Pool, map[sdk.ChainID]*pool.Pool, error) {
var rpcConfigMap map[sdk.ChainID][]config.RpcConfig
var wormchainRpcConfigMap map[sdk.ChainID][]config.RpcConfig
var err error
if cfg.RpcProviderSettingsJson != nil {
rpcConfigMap, err = cfg.MapRpcProviderToRpcConfig()
rpcConfigMap, wormchainRpcConfigMap, err = cfg.MapRpcProviderToRpcConfig()
if err != nil {
return nil, err
return nil, nil, err
}
} else if cfg.RpcProviderSettings != nil {
// get rpc settings map
rpcConfigMap, err = cfg.RpcProviderSettings.ToMap()
rpcConfigMap, wormchainRpcConfigMap, err = cfg.MapRpcProviderToRpcConfig()
if err != nil {
return nil, err
return nil, nil, err
}
var testRpcConfig *config.TestnetRpcProviderSettings
@ -244,7 +245,7 @@ func newRpcPool(cfg *config.ServiceSettings) (map[sdk.ChainID]*pool.Pool, error)
if testRpcConfig != nil {
rpcTestnetMap, err = cfg.TestnetRpcProviderSettings.ToMap()
if err != nil {
return nil, err
return nil, nil, err
}
}
@ -255,7 +256,7 @@ func newRpcPool(cfg *config.ServiceSettings) (map[sdk.ChainID]*pool.Pool, error)
}
}
} else {
return nil, errors.New("rpc provider settings not found")
return nil, nil, errors.New("rpc provider settings not found")
}
domains := []string{".network", ".cloud", ".com", ".io", ".build", ".team", ".dev", ".zone", ".org", ".net", ".in"}
@ -279,5 +280,11 @@ func newRpcPool(cfg *config.ServiceSettings) (map[sdk.ChainID]*pool.Pool, error)
rpcPool[chainID] = pool.NewPool(convertFn(rpcConfig))
}
return rpcPool, nil
// create wormchain rpc pool
wormchainRpcPool := make(map[sdk.ChainID]*pool.Pool)
for chainID, rpcConfig := range wormchainRpcConfigMap {
wormchainRpcPool[chainID] = pool.NewPool(convertFn(rpcConfig))
}
return rpcPool, wormchainRpcPool, nil
}

View File

@ -14,37 +14,6 @@ import (
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
)
type BackfillingStrategy string
const (
// StrategyReprocessAll will reprocess documents in the `globalTransactions`
// collection that don't have the `sourceTx` field set, or that have the
// `sourceTx.status` field set to "internalError".
BackfillerStrategyReprocessFailed BackfillingStrategy = "reprocess_failed"
// BackfillerStrategyTimeRange will reprocess all VAAs that have a timestamp between the specified range.
BackfillerStrategyTimeRange BackfillingStrategy = "time_range"
)
type BackfillerSettings struct {
LogLevel string `split_words:"true" default:"INFO"`
NumWorkers uint `split_words:"true" required:"true"`
BulkSize uint `split_words:"true" required:"true"`
P2pNetwork string `split_words:"true" required:"true"`
RpcProviderPath string `split_words:"true" required:"false"`
// Strategy determines which VAAs will be affected by the backfiller.
Strategy struct {
Name BackfillingStrategy `split_words:"true" required:"true"`
TimestampAfter string `split_words:"true" required:"false"`
TimestampBefore string `split_words:"true" required:"false"`
}
MongodbSettings
*RpcProviderSettings `required:"false"`
*TestnetRpcProviderSettings `required:"false"`
*RpcProviderSettingsJson `required:"false"`
}
type ServiceSettings struct {
// MonitoringPort defines the TCP port for the /health and /ready endpoints.
MonitoringPort string `split_words:"true" default:"8000"`
@ -58,12 +27,14 @@ type ServiceSettings struct {
AwsSettings
MongodbSettings
*RpcProviderSettings `required:"false"`
*WormchainProviderSettings `required:"false"`
*TestnetRpcProviderSettings `required:"false"`
*RpcProviderSettingsJson `required:"false"`
}
type RpcProviderSettingsJson struct {
RpcProviders []ChainRpcProviderSettings `json:"rpcProviders"`
RpcProviders []ChainRpcProviderSettings `json:"rpcProviders"`
WormchainRpcProviders []ChainRpcProviderSettings `json:"wormchainRpcProviders"`
}
type ChainRpcProviderSettings struct {
@ -129,10 +100,6 @@ type RpcProviderSettings struct {
EthereumRequestsPerMinute uint16 `split_words:"true" required:"false"`
EthereumFallbackUrls string `split_words:"true" required:"false"`
EthereumFallbackRequestsPerMinute string `split_words:"true" required:"false"`
EvmosBaseUrl string `split_words:"true" required:"false"`
EvmosRequestsPerMinute uint16 `split_words:"true" required:"false"`
EvmosFallbackUrls string `split_words:"true" required:"false"`
EvmosFallbackRequestsPerMinute string `split_words:"true" required:"false"`
FantomBaseUrl string `split_words:"true" required:"false"`
FantomRequestsPerMinute uint16 `split_words:"true" required:"false"`
FantomFallbackUrls string `split_words:"true" required:"false"`
@ -149,10 +116,6 @@ type RpcProviderSettings struct {
KlaytnRequestsPerMinute uint16 `split_words:"true" required:"false"`
KlaytnFallbackUrls string `split_words:"true" required:"false"`
KlaytnFallbackRequestsPerMinute string `split_words:"true" required:"false"`
KujiraBaseUrl string `split_words:"true" required:"false"`
KujiraRequestsPerMinute uint16 `split_words:"true" required:"false"`
KujiraFallbackUrls string `split_words:"true" required:"false"`
KujiraFallbackRequestsPerMinute string `split_words:"true" required:"false"`
MoonbeamBaseUrl string `split_words:"true" required:"false"`
MoonbeamRequestsPerMinute uint16 `split_words:"true" required:"false"`
MoonbeamFallbackUrls string `split_words:"true" required:"false"`
@ -165,10 +128,6 @@ type RpcProviderSettings struct {
OptimismRequestsPerMinute uint16 `split_words:"true" required:"false"`
OptimismFallbackUrls string `split_words:"true" required:"false"`
OptimismFallbackRequestsPerMinute string `split_words:"true" required:"false"`
OsmosisBaseUrl string `split_words:"true" required:"false"`
OsmosisRequestsPerMinute uint16 `split_words:"true" required:"false"`
OsmosisFallbackUrls string `split_words:"true" required:"false"`
OsmosisFallbackRequestsPerMinute string `split_words:"true" required:"false"`
PolygonBaseUrl string `split_words:"true" required:"false"`
PolygonRequestsPerMinute uint16 `split_words:"true" required:"false"`
PolygonFallbackUrls string `split_words:"true" required:"false"`
@ -201,6 +160,26 @@ type RpcProviderSettings struct {
WormchainRequestsPerMinute uint16 `split_words:"true" required:"false"`
WormchainFallbackUrls string `split_words:"true" required:"false"`
WormchainFallbackRequestsPerMinute string `split_words:"true" required:"false"`
WormchainProviderSettings
}
type WormchainProviderSettings struct {
WormchainEvmosBaseUrl string `split_words:"true" required:"false"`
WormchainEvmosRequestsPerMinute uint16 `split_words:"true" required:"false"`
WormchainEvmosFallbackUrls string `split_words:"true" required:"false"`
WormchainEvmosFallbackRequestsPerMinute string `split_words:"true" required:"false"`
WormchainKujiraBaseUrl string `split_words:"true" required:"false"`
WormchainKujiraRequestsPerMinute uint16 `split_words:"true" required:"false"`
WormchainKujiraFallbackUrls string `split_words:"true" required:"false"`
WormchainKujiraFallbackRequestsPerMinute string `split_words:"true" required:"false"`
WormchainOsmosisBaseUrl string `split_words:"true" required:"false"`
WormchainOsmosisRequestsPerMinute uint16 `split_words:"true" required:"false"`
WormchainOsmosisFallbackUrls string `split_words:"true" required:"false"`
WormchainOsmosisFallbackRequestsPerMinute string `split_words:"true" required:"false"`
WormchainInjectiveBaseUrl string `split_words:"true" required:"false"`
WormchainInjectiveRequestsPerMinute uint16 `split_words:"true" required:"false"`
WormchainInjectiveFallbackUrls string `split_words:"true" required:"false"`
WormchainInjectiveFallbackRequestsPerMinute string `split_words:"true" required:"false"`
}
type TestnetRpcProviderSettings struct {
@ -222,45 +201,19 @@ type TestnetRpcProviderSettings struct {
OptimismSepoliaFallbackRequestsPerMinute string `split_words:"true" required:"false"`
}
func NewBackfillerSettings() (*BackfillerSettings, error) {
_ = godotenv.Load()
var settings BackfillerSettings
func NewRpcProviderSettingJson(path string) (*RpcProviderSettingsJson, error) {
err := envconfig.Process("", &settings)
rpcJsonFile, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read config from environment: %w", err)
return nil, fmt.Errorf("failed to read rpc provider settings from file: %w", err)
}
if settings.RpcProviderPath != "" {
rpcJsonFile, err := os.ReadFile(settings.RpcProviderPath)
if err != nil {
return nil, fmt.Errorf("failed to read rpc provider settings from file: %w", err)
}
var rpcProviderSettingsJson RpcProviderSettingsJson
err = json.Unmarshal(rpcJsonFile, &rpcProviderSettingsJson)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal rpc provider settings from file: %w", err)
}
settings.RpcProviderSettingsJson = &rpcProviderSettingsJson
} else {
rpcProviderSettings, err := LoadFromEnv[RpcProviderSettings]()
if err != nil {
return nil, err
}
settings.RpcProviderSettings = rpcProviderSettings
var rpcProviderSettingsJson RpcProviderSettingsJson
err = json.Unmarshal(rpcJsonFile, &rpcProviderSettingsJson)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal rpc provider settings from file: %w", err)
}
return &settings, nil
}
// MapRpcProviderToRpcConfig converts the RpcProviderSettings to a map of RpcConfig
func (s *BackfillerSettings) MapRpcProviderToRpcConfig() (map[sdk.ChainID][]RpcConfig, error) {
if s.RpcProviderSettingsJson != nil {
return s.RpcProviderSettingsJson.ToMap()
}
return s.RpcProviderSettings.ToMap()
return &rpcProviderSettingsJson, nil
}
func New() (*ServiceSettings, error) {
@ -319,11 +272,30 @@ type RpcConfig struct {
}
// MapRpcProviderToRpcConfig converts the RpcProviderSettings to a map of RpcConfig
func (s *ServiceSettings) MapRpcProviderToRpcConfig() (map[sdk.ChainID][]RpcConfig, error) {
func (s *ServiceSettings) MapRpcProviderToRpcConfig() (map[sdk.ChainID][]RpcConfig, map[sdk.ChainID][]RpcConfig, error) {
if s.RpcProviderSettingsJson != nil {
return s.RpcProviderSettingsJson.ToMap()
rpcPoolConfig, err := s.RpcProviderSettingsJson.ToMap()
if err != nil {
return nil, nil, err
}
wormchainRpcPoolConfig, err := s.RpcProviderSettingsJson.WormchainToMap()
if err != nil {
return nil, nil, err
}
return rpcPoolConfig, wormchainRpcPoolConfig, nil
}
return s.RpcProviderSettings.ToMap()
if s.RpcProviderSettings != nil {
rpcPoolConfig, err := s.RpcProviderSettings.ToMap()
if err != nil {
return nil, nil, err
}
wormchainRpcPoolConfig, err := s.RpcProviderSettings.WormchainProviderSettings.ToMap()
if err != nil {
return nil, nil, err
}
return rpcPoolConfig, wormchainRpcPoolConfig, nil
}
return nil, nil, errors.New("rpc provider settings not found")
}
// ToMap converts the RpcProviderSettingsJson to a map of RpcConfig
@ -344,6 +316,74 @@ func (r RpcProviderSettingsJson) ToMap() (map[sdk.ChainID][]RpcConfig, error) {
return rpcs, nil
}
func (r RpcProviderSettingsJson) WormchainToMap() (map[sdk.ChainID][]RpcConfig, error) {
rpcs := make(map[sdk.ChainID][]RpcConfig)
for _, rpcProvider := range r.WormchainRpcProviders {
chainID := sdk.ChainID(rpcProvider.ChainId)
var rpcConfigs []RpcConfig
for _, rpcSetting := range rpcProvider.RpcSettings {
rpcConfigs = append(rpcConfigs, RpcConfig{
Url: rpcSetting.Url,
Priority: rpcSetting.Priority,
RequestsPerMinute: rpcSetting.RequestPerMinute,
})
}
rpcs[chainID] = rpcConfigs
}
return rpcs, nil
}
func (w WormchainProviderSettings) ToMap() (map[sdk.ChainID][]RpcConfig, error) {
rpcs := make(map[sdk.ChainID][]RpcConfig)
// add wormchain rpcs
wormchainRpcConfigs, err := addRpcConfig(
w.WormchainInjectiveBaseUrl,
w.WormchainInjectiveRequestsPerMinute,
w.WormchainInjectiveFallbackUrls,
w.WormchainInjectiveFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDInjective] = wormchainRpcConfigs
// add evmos rpcs
evmosRpcConfigs, err := addRpcConfig(
w.WormchainEvmosBaseUrl,
w.WormchainEvmosRequestsPerMinute,
w.WormchainEvmosFallbackUrls,
w.WormchainEvmosFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDEvmos] = evmosRpcConfigs
// add kujira rpcs
kujiraRpcConfigs, err := addRpcConfig(
w.WormchainKujiraBaseUrl,
w.WormchainKujiraRequestsPerMinute,
w.WormchainKujiraFallbackUrls,
w.WormchainKujiraFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDKujira] = kujiraRpcConfigs
// add osmosis rpcs
osmosisRpcConfigs, err := addRpcConfig(
w.WormchainOsmosisBaseUrl,
w.WormchainOsmosisRequestsPerMinute,
w.WormchainOsmosisFallbackUrls,
w.WormchainOsmosisFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDOsmosis] = osmosisRpcConfigs
return rpcs, nil
}
// ToMap converts the RpcProviderSettings to a map of RpcConfig
func (r RpcProviderSettings) ToMap() (map[sdk.ChainID][]RpcConfig, error) {
rpcs := make(map[sdk.ChainID][]RpcConfig)
@ -447,17 +487,6 @@ func (r RpcProviderSettings) ToMap() (map[sdk.ChainID][]RpcConfig, error) {
}
rpcs[sdk.ChainIDEthereum] = ethereumRpcConfigs
// add evmos rpcs
evmosRpcConfigs, err := addRpcConfig(
r.EvmosBaseUrl,
r.EvmosRequestsPerMinute,
r.EvmosFallbackUrls,
r.EvmosFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDEvmos] = evmosRpcConfigs
// add fantom rpcs
fantomRpcConfigs, err := addRpcConfig(
r.FantomBaseUrl,
@ -502,17 +531,6 @@ func (r RpcProviderSettings) ToMap() (map[sdk.ChainID][]RpcConfig, error) {
}
rpcs[sdk.ChainIDKlaytn] = klaytnRpcConfigs
// add kujira rpcs
kujiraRpcConfigs, err := addRpcConfig(
r.KujiraBaseUrl,
r.KujiraRequestsPerMinute,
r.KujiraFallbackUrls,
r.KujiraFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDKujira] = kujiraRpcConfigs
// add moonbeam rpcs
moonbeamRpcConfigs, err := addRpcConfig(
r.MoonbeamBaseUrl,
@ -546,17 +564,6 @@ func (r RpcProviderSettings) ToMap() (map[sdk.ChainID][]RpcConfig, error) {
}
rpcs[sdk.ChainIDOptimism] = optimismRpcConfigs
// add osmosis rpcs
osmosisRpcConfigs, err := addRpcConfig(
r.OsmosisBaseUrl,
r.OsmosisRequestsPerMinute,
r.OsmosisFallbackUrls,
r.OsmosisFallbackRequestsPerMinute)
if err != nil {
return nil, err
}
rpcs[sdk.ChainIDOsmosis] = osmosisRpcConfigs
// add polygon rpcs
polygonRpcConfigs, err := addRpcConfig(
r.PolygonBaseUrl,

View File

@ -16,19 +16,21 @@ import (
// Consumer consumer struct definition.
type Consumer struct {
consumeFunc queue.ConsumeFunc
rpcpool map[vaa.ChainID]*pool.Pool
logger *zap.Logger
repository *Repository
metrics metrics.Metrics
p2pNetwork string
workersSize int
consumeFunc queue.ConsumeFunc
rpcpool map[vaa.ChainID]*pool.Pool
wormchainRpcPool map[vaa.ChainID]*pool.Pool
logger *zap.Logger
repository *Repository
metrics metrics.Metrics
p2pNetwork string
workersSize int
}
// New creates a new vaa consumer.
func New(
consumeFunc queue.ConsumeFunc,
rpcPool map[vaa.ChainID]*pool.Pool,
wormchainRpcPool map[vaa.ChainID]*pool.Pool,
ctx context.Context,
logger *zap.Logger,
repository *Repository,
@ -38,13 +40,14 @@ func New(
) *Consumer {
c := Consumer{
consumeFunc: consumeFunc,
rpcpool: rpcPool,
logger: logger,
repository: repository,
metrics: metrics,
p2pNetwork: p2pNetwork,
workersSize: workersSize,
consumeFunc: consumeFunc,
rpcpool: rpcPool,
wormchainRpcPool: wormchainRpcPool,
logger: logger,
repository: repository,
metrics: metrics,
p2pNetwork: p2pNetwork,
workersSize: workersSize,
}
return &c
@ -111,7 +114,7 @@ func (c *Consumer) processSourceTx(ctx context.Context, msg queue.ConsumerMessag
Metrics: c.metrics,
Overwrite: false, // avoid processing the same transaction twice
}
_, err := ProcessSourceTx(ctx, c.logger, c.rpcpool, c.repository, &p, c.p2pNetwork)
_, err := ProcessSourceTx(ctx, c.logger, c.rpcpool, c.wormchainRpcPool, c.repository, &p, c.p2pNetwork)
// add vaa processing duration metrics
c.metrics.AddVaaProcessedDuration(uint16(event.ChainID), time.Since(start).Seconds())

View File

@ -31,14 +31,16 @@ type ProcessSourceTxParams struct {
// the schema changed).
// In the context of the service, you usually don't want to overwrite existing data
// to avoid processing the same VAA twice, which would result in performance degradation.
Overwrite bool
Metrics metrics.Metrics
Overwrite bool
Metrics metrics.Metrics
DisableDBUpsert bool
}
func ProcessSourceTx(
ctx context.Context,
logger *zap.Logger,
rpcPool map[vaa.ChainID]*pool.Pool,
wormchainRpcPool map[vaa.ChainID]*pool.Pool,
repository *Repository,
params *ProcessSourceTxParams,
p2pNetwork string,
@ -101,7 +103,7 @@ func ProcessSourceTx(
}
// Get transaction details from the emitter blockchain
txDetail, err = chains.FetchTx(ctx, rpcPool, params.ChainId, params.TxHash, params.Timestamp, p2pNetwork, params.Metrics, logger)
txDetail, err = chains.FetchTx(ctx, rpcPool, wormchainRpcPool, params.ChainId, params.TxHash, params.Timestamp, p2pNetwork, params.Metrics, logger)
if err != nil {
errHandleFetchTx := handleFetchTxError(ctx, logger, repository, params, err)
if errHandleFetchTx == nil {
@ -110,6 +112,11 @@ func ProcessSourceTx(
return nil, err
}
// If disableDBUpsert is set to true, we don't want to store the source transaction details in the database.
if params.DisableDBUpsert {
return txDetail, nil
}
// Store source transaction details in the database
p := UpsertOriginTxParams{
VaaId: params.VaaId,

View File

@ -17,6 +17,7 @@ require (
github.com/stretchr/testify v1.8.1
github.com/wormhole-foundation/wormhole-explorer/api v0.0.0-20240228181628-161878b15b41
go.mongodb.org/mongo-driver v1.11.2
go.uber.org/ratelimit v0.2.0
)
require (
@ -24,6 +25,7 @@ require (
filippo.io/edwards25519 v1.0.0 // indirect
github.com/algorand/go-algorand-sdk v1.23.0 // indirect
github.com/algorand/go-codec/codec v1.1.8 // indirect
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.29 // indirect

View File

@ -65,6 +65,7 @@ github.com/algorand/go-algorand-sdk v1.23.0/go.mod h1:7i2peZBcE48kfoxNZnLA+mklKh
github.com/algorand/go-codec v1.1.8/go.mod h1:XhzVs6VVyWMLu6cApb9/192gBjGRVGm5cX5j203Heg4=
github.com/algorand/go-codec/codec v1.1.8 h1:lsFuhcOH2LiEhpBH3BVUUkdevVmwCRyvb7FCAAPeY6U=
github.com/algorand/go-codec/codec v1.1.8/go.mod h1:tQ3zAJ6ijTps6V+wp8KsGDnPC2uhHVC7ANyrtkIY0bA=
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI=
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
@ -633,6 +634,7 @@ go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKY
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/ratelimit v0.2.0 h1:UQE2Bgi7p2B85uP5dC2bbRtig0C+OeNRnNEafLjsLPA=
go.uber.org/ratelimit v0.2.0/go.mod h1:YYBV4e4naJvhpitQrWJu1vCpgB7CboMe0qhltKt6mUg=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=

View File

@ -13,23 +13,25 @@ import (
// Controller definition.
type Controller struct {
logger *zap.Logger
rpcPool map[sdk.ChainID]*pool.Pool
vaaRepository *Repository
repository *consumer.Repository
metrics metrics.Metrics
p2pNetwork string
logger *zap.Logger
rpcPool map[sdk.ChainID]*pool.Pool
wormchainRpcPool map[sdk.ChainID]*pool.Pool
vaaRepository *Repository
repository *consumer.Repository
metrics metrics.Metrics
p2pNetwork string
}
// NewController creates a Controller instance.
func NewController(rpcPool map[sdk.ChainID]*pool.Pool, vaaRepository *Repository, repository *consumer.Repository, p2pNetwork string, logger *zap.Logger) *Controller {
func NewController(rpcPool map[sdk.ChainID]*pool.Pool, wormchainRpcPool map[sdk.ChainID]*pool.Pool, vaaRepository *Repository, repository *consumer.Repository, p2pNetwork string, logger *zap.Logger) *Controller {
return &Controller{
metrics: metrics.NewDummyMetrics(),
rpcPool: rpcPool,
vaaRepository: vaaRepository,
repository: repository,
p2pNetwork: p2pNetwork,
logger: logger}
metrics: metrics.NewDummyMetrics(),
rpcPool: rpcPool,
wormchainRpcPool: wormchainRpcPool,
vaaRepository: vaaRepository,
repository: repository,
p2pNetwork: p2pNetwork,
logger: logger}
}
func (c *Controller) Process(ctx *fiber.Ctx) error {
@ -65,7 +67,7 @@ func (c *Controller) Process(ctx *fiber.Ctx) error {
Overwrite: true,
}
result, err := consumer.ProcessSourceTx(ctx.Context(), c.logger, c.rpcPool, c.repository, p, c.p2pNetwork)
result, err := consumer.ProcessSourceTx(ctx.Context(), c.logger, c.rpcPool, c.wormchainRpcPool, c.repository, p, c.p2pNetwork)
if err != nil {
return err
}

View File

@ -43,3 +43,6 @@ func (d *DummyMetrics) IncVaaProcessed(chainID uint16, retry uint8) {}
// IncVaaFailed is a dummy implementation of IncVaaFailed.
func (d *DummyMetrics) IncVaaFailed(chainID uint16, retry uint8) {}
// IncWormchainUnknown is a dummy implementation of IncWormchainUnknown.
func (d *DummyMetrics) IncWormchainUnknown(srcChannel string, dstChannel string) {}

View File

@ -15,4 +15,5 @@ type Metrics interface {
IncStoreUnprocessedOriginTx(chainID uint16)
IncVaaProcessed(chainID uint16, retry uint8)
IncVaaFailed(chainID uint16, retry uint8)
IncWormchainUnknown(srcChannel string, dstChannel string)
}

View File

@ -15,6 +15,7 @@ type PrometheusMetrics struct {
rpcCallCount *prometheus.CounterVec
storeUnprocessedOriginTx *prometheus.CounterVec
vaaProcessed *prometheus.CounterVec
wormchainUnknown *prometheus.CounterVec
}
// NewPrometheusMetrics returns a new instance of PrometheusMetrics.
@ -64,12 +65,23 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
"service": serviceName,
},
}, []string{"chain", "retry", "status"})
wormchainUnknown := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormchain_unknown",
Help: "Total number of unknown wormchain",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
}, []string{"srcChannel", "dstChannel"})
return &PrometheusMetrics{
vaaTxTrackerCount: vaaTxTrackerCount,
vaaProcesedDuration: vaaProcesedDuration,
rpcCallCount: rpcCallCount,
storeUnprocessedOriginTx: storeUnprocessedOriginTx,
vaaProcessed: vaaProcessed,
wormchainUnknown: wormchainUnknown,
}
}
@ -140,3 +152,8 @@ func (m *PrometheusMetrics) IncVaaFailed(chainID uint16, retry uint8) {
chain := vaa.ChainID(chainID).String()
m.vaaProcessed.WithLabelValues(chain, strconv.Itoa(int(retry)), "failed").Inc()
}
// IncWormchainUnknown increments the number of unknown wormchain.
func (m *PrometheusMetrics) IncWormchainUnknown(srcChannel string, dstChannel string) {
m.wormchainUnknown.WithLabelValues(srcChannel, dstChannel).Inc()
}