Add Sei support in tx-tracker (#800)

* Add Sei support in tx-tracker

* Code review updates
This commit is contained in:
ftocal 2023-11-15 12:11:24 -03:00 committed by GitHub
parent e5282ffb16
commit 11b162ca54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 893 additions and 21 deletions

View File

@ -0,0 +1,9 @@
---
kind: ConfigMap
apiVersion: v1
metadata:
name: tx-tracker
namespace: {{ .NAMESPACE }}
data:
aws-region: {{ .SQS_AWS_REGION }}
pipeline-sqs-url: {{ .PIPELINE_SQS_URL }}

View File

@ -7,7 +7,7 @@ RESOURCES_LIMITS_MEMORY=256Mi
RESOURCES_LIMITS_CPU=500m
RESOURCES_REQUESTS_MEMORY=128Mi
RESOURCES_REQUESTS_CPU=250m
SQS_URL=
PIPELINE_SQS_URL=
SQS_AWS_REGION=
P2P_NETWORK=mainnet
AWS_IAM_ROLE=
@ -75,6 +75,9 @@ OSMOSIS_REQUESTS_PER_MINUTE=12
POLYGON_BASE_URL=https://rpc.ankr.com/polygon
POLYGON_REQUESTS_PER_MINUTE=12
SEI_BASE_URL=https://rpc.ankr.com/sei
SEI_REQUESTS_PER_MINUTE=12
SOLANA_BASE_URL=https://api.mainnet-beta.solana.com
SOLANA_REQUESTS_PER_MINUTE=12

View File

@ -7,7 +7,7 @@ RESOURCES_LIMITS_MEMORY=30Mi
RESOURCES_LIMITS_CPU=20m
RESOURCES_REQUESTS_MEMORY=15Mi
RESOURCES_REQUESTS_CPU=10m
SQS_URL=
PIPELINE_SQS_URL=
SQS_AWS_REGION=
P2P_NETWORK=testnet
AWS_IAM_ROLE=
@ -73,6 +73,9 @@ OSMOSIS_REQUESTS_PER_MINUTE=12
POLYGON_BASE_URL=https://rpc.ankr.com/polygon_mumbai
POLYGON_REQUESTS_PER_MINUTE=12
SEI_BASE_URL=https://sei-a2-rpc.brocha.in
SEI_REQUESTS_PER_MINUTE=12
SOLANA_BASE_URL=https://api.devnet.solana.com
SOLANA_REQUESTS_PER_MINUTE=12

View File

@ -7,7 +7,7 @@ RESOURCES_LIMITS_MEMORY=30Mi
RESOURCES_LIMITS_CPU=60m
RESOURCES_REQUESTS_MEMORY=15Mi
RESOURCES_REQUESTS_CPU=40m
SQS_URL=
PIPELINE_SQS_URL=
SQS_AWS_REGION=
P2P_NETWORK=mainnet
AWS_IAM_ROLE=
@ -75,6 +75,9 @@ OSMOSIS_REQUESTS_PER_MINUTE=12
POLYGON_BASE_URL=https://rpc.ankr.com/polygon
POLYGON_REQUESTS_PER_MINUTE=12
SEI_BASE_URL=https://rpc.ankr.com/sei
SEI_REQUESTS_PER_MINUTE=12
SOLANA_BASE_URL=https://api.mainnet-beta.solana.com
SOLANA_REQUESTS_PER_MINUTE=12

View File

@ -7,7 +7,7 @@ RESOURCES_LIMITS_MEMORY=30Mi
RESOURCES_LIMITS_CPU=20m
RESOURCES_REQUESTS_MEMORY=15Mi
RESOURCES_REQUESTS_CPU=10m
SQS_URL=
PIPELINE_SQS_URL=
SQS_AWS_REGION=
P2P_NETWORK=testnet
AWS_IAM_ROLE=
@ -73,6 +73,9 @@ OSMOSIS_REQUESTS_PER_MINUTE=12
POLYGON_BASE_URL=https://rpc.ankr.com/polygon_mumbai
POLYGON_REQUESTS_PER_MINUTE=12
SEI_BASE_URL=https://sei-a2-rpc.brocha.in
SEI_REQUESTS_PER_MINUTE=12
SOLANA_BASE_URL=https://api.devnet.solana.com
SOLANA_REQUESTS_PER_MINUTE=12

View File

@ -56,10 +56,16 @@ spec:
configMapKeyRef:
name: config
key: mongo-database
- name: SQS_URL
value: {{ .SQS_URL }}
- name: PIPELINE_SQS_URL
valueFrom:
configMapKeyRef:
name: tx-tracker
key: pipeline-sqs-url
- name: AWS_REGION
value: {{ .SQS_AWS_REGION }}
valueFrom:
configMapKeyRef:
name: tx-tracker
key: aws-region
- name: P2P_NETWORK
value: {{ .P2P_NETWORK }}
- name: METRICS_ENABLED
@ -144,6 +150,10 @@ spec:
value: {{ .POLYGON_BASE_URL }}
- name: POLYGON_REQUESTS_PER_MINUTE
value: "{{ .POLYGON_REQUESTS_PER_MINUTE }}"
- name: SEI_BASE_URL
value: {{ .SEI_BASE_URL }}
- name: SEI_REQUESTS_PER_MINUTE
value: "{{ .SEI_REQUESTS_PER_MINUTE }}"
- name: SOLANA_BASE_URL
value: {{ .SOLANA_BASE_URL }}
- name: SOLANA_REQUESTS_PER_MINUTE

View File

@ -0,0 +1,70 @@
package chains
import (
"context"
"time"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
type seiTx struct {
TxHash string
Sender string
}
func seiTxSearchExtractor(tx *cosmosTxSearchResponse, logs []cosmosLogWrapperResponse) (*seiTx, error) {
var sender string
for _, l := range logs {
for _, e := range l.Events {
if e.Type == "message" {
for _, attr := range e.Attributes {
if attr.Key == "sender" {
sender = attr.Value
}
}
break
}
}
}
return &seiTx{TxHash: tx.Result.Txs[0].Hash, Sender: sender}, nil
}
type apiSei struct {
wormchainUrl string
wormchainRateLimiter *time.Ticker
p2pNetwork string
}
func fetchSeiDetail(ctx context.Context, baseUrl string, rateLimiter *time.Ticker, sequence, timestamp, srcChannel, dstChannel string) (*seiTx, error) {
params := &cosmosTxSearchParams{Sequence: sequence, Timestamp: timestamp, SrcChannel: srcChannel, DstChannel: dstChannel}
return fetchTxSearch[seiTx](ctx, baseUrl, rateLimiter, params, seiTxSearchExtractor)
}
func (a *apiSei) fetchSeiTx(
ctx context.Context,
rateLimiter *time.Ticker,
baseUrl string,
txHash string,
) (*TxDetail, error) {
txHash = txHashLowerCaseWith0x(txHash)
wormchainTx, err := fetchWormchainDetail(ctx, a.wormchainUrl, a.wormchainRateLimiter, txHash)
if err != nil {
return nil, err
}
seiTx, err := fetchSeiDetail(ctx, baseUrl, rateLimiter, wormchainTx.sequence, wormchainTx.timestamp, wormchainTx.srcChannel, wormchainTx.dstChannel)
if err != nil {
return nil, err
}
return &TxDetail{
NativeTxHash: txHash,
From: wormchainTx.receiver,
Attribute: &AttributeTxDetail{
Type: "wormchain-gateway",
Value: &WorchainAttributeTxDetail{
OriginChainID: vaa.ChainIDSei,
OriginTxHash: seiTx.TxHash,
OriginAddress: seiTx.Sender,
},
},
}, nil
}

File diff suppressed because one or more lines are too long

View File

@ -83,6 +83,7 @@ func Initialize(cfg *config.RpcProviderSettings) {
rateLimitersByChain[sdk.ChainIDXpla] = convertToRateLimiter(cfg.XplaRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDWormchain] = convertToRateLimiter(cfg.WormchainRequestsPerMinute)
rateLimitersByChain[ChainIDOsmosis] = convertToRateLimiter(cfg.OsmosisRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDSei] = convertToRateLimiter(cfg.SeiRequestsPerMinute)
// Initialize the RPC base URLs for each chain
baseUrlsByChain = make(map[sdk.ChainID]string)
@ -109,6 +110,7 @@ func Initialize(cfg *config.RpcProviderSettings) {
baseUrlsByChain[sdk.ChainIDSui] = cfg.SuiBaseUrl
baseUrlsByChain[sdk.ChainIDXpla] = cfg.XplaBaseUrl
baseUrlsByChain[sdk.ChainIDWormchain] = cfg.WormchainBaseUrl
baseUrlsByChain[sdk.ChainIDSei] = cfg.SeiBaseUrl
}
func FetchTx(
@ -165,6 +167,18 @@ func FetchTx(
p2pNetwork: p2pNetwork,
}
fetchFunc = apiWormchain.fetchWormchainTx
case sdk.ChainIDSei:
rateLimiter, ok := rateLimitersByChain[sdk.ChainIDWormchain]
if !ok {
return nil, errors.New("found no rate limiter for chain osmosis")
}
apiSei := &apiSei{
wormchainRateLimiter: rateLimiter,
wormchainUrl: cfg.WormchainBaseUrl,
p2pNetwork: p2pNetwork,
}
fetchFunc = apiSei.fetchSeiTx
default:
return nil, ErrChainNotSupported
}

113
tx-tracker/chains/cosmos.go Normal file
View File

@ -0,0 +1,113 @@
package chains
import (
"context"
"encoding/json"
"fmt"
"time"
)
type cosmosRequest struct {
Jsonrpc string `json:"jsonrpc"`
ID int `json:"id"`
Method string `json:"method"`
Params struct {
Query string `json:"query"`
Page string `json:"page"`
} `json:"params"`
}
type cosmosTxSearchParams struct {
Sequence string
Timestamp string
SrcChannel string
DstChannel string
}
type cosmosTxSearchResponse struct {
Jsonrpc string `json:"jsonrpc"`
ID int `json:"id"`
Result struct {
Txs []struct {
Hash string `json:"hash"`
Height string `json:"height"`
Index int `json:"index"`
TxResult struct {
Code int `json:"code"`
Data string `json:"data"`
Log string `json:"log"`
Info string `json:"info"`
GasWanted string `json:"gas_wanted"`
GasUsed string `json:"gas_used"`
Events []struct {
Type string `json:"type"`
Attributes []struct {
Key string `json:"key"`
Value string `json:"value"`
Index bool `json:"index"`
} `json:"attributes"`
} `json:"events"`
Codespace string `json:"codespace"`
} `json:"tx_result"`
Tx string `json:"tx"`
} `json:"txs"`
TotalCount string `json:"total_count"`
} `json:"result"`
}
type cosmosEventResponse struct {
Type string `json:"type"`
Attributes []struct {
Key string `json:"key"`
Value string `json:"value"`
} `json:"attributes"`
}
type cosmosLogWrapperResponse struct {
Events []cosmosEventResponse `json:"events"`
}
type txSearchExtractor[T any] func(tx *cosmosTxSearchResponse, log []cosmosLogWrapperResponse) (T, error)
func fetchTxSearch[T any](ctx context.Context, baseUrl string, rl *time.Ticker, p *cosmosTxSearchParams, extractor txSearchExtractor[*T]) (*T, error) {
queryTemplate := `send_packet.packet_sequence='%s' AND send_packet.packet_timeout_timestamp='%s' AND send_packet.packet_src_channel='%s' AND send_packet.packet_dst_channel='%s'`
query := fmt.Sprintf(queryTemplate, p.Sequence, p.Timestamp, p.SrcChannel, p.DstChannel)
q := cosmosRequest{
Jsonrpc: "2.0",
ID: 1,
Method: "tx_search",
Params: struct {
Query string `json:"query"`
Page string `json:"page"`
}{
Query: query,
Page: "1",
},
}
response, err := httpPost(ctx, rl, baseUrl, q)
if err != nil {
return nil, err
}
return parseTxSearchResponse[T](response, p, extractor)
}
func parseTxSearchResponse[T any](body []byte, p *cosmosTxSearchParams, extractor txSearchExtractor[*T]) (*T, error) {
var txSearchReponse cosmosTxSearchResponse
err := json.Unmarshal(body, &txSearchReponse)
if err != nil {
return nil, err
}
if len(txSearchReponse.Result.Txs) == 0 {
return nil, fmt.Errorf("can not found hash for sequence %s, timestamp %s, srcChannel %s, dstChannel %s", p.Sequence, p.Timestamp, p.SrcChannel, p.DstChannel)
}
var log []cosmosLogWrapperResponse
err = json.Unmarshal([]byte(txSearchReponse.Result.Txs[0].TxResult.Log), &log)
if err != nil {
return nil, err
}
return extractor(&txSearchReponse, log)
}

View File

@ -305,7 +305,7 @@ func consume(ctx context.Context, params *consumerParams) {
TxHash: *v.TxHash,
Overwrite: true, // Overwrite old contents
}
err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcProviderSettings, params.repository, &p, params.p2pNetwork)
_, err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcProviderSettings, params.repository, &p, params.p2pNetwork)
if err != nil {
params.logger.Error("Failed to track source tx",
zap.String("vaaId", globalTx.Id),

View File

@ -21,6 +21,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/consumer"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/http/infrastructure"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/http/vaa"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/queue"
"go.uber.org/zap"
@ -52,17 +53,23 @@ func main() {
log.Fatal("Failed to initialize MongoDB client: ", err)
}
// create repositories
repository := consumer.NewRepository(logger, db.Database)
vaaRepository := vaa.NewRepository(db.Database, logger)
// create controller
vaaController := vaa.NewController(vaaRepository, repository, &cfg.RpcProviderSettings, cfg.P2pNetwork, logger)
// start serving /health and /ready endpoints
healthChecks, err := makeHealthChecks(rootCtx, cfg, db.Database)
if err != nil {
logger.Fatal("Failed to create health checks", zap.Error(err))
}
server := infrastructure.NewServer(logger, cfg.MonitoringPort, cfg.PprofEnabled, healthChecks...)
server := infrastructure.NewServer(logger, cfg.MonitoringPort, cfg.PprofEnabled, vaaController, healthChecks...)
server.Start()
// create and start a consumer.
vaaConsumeFunc := newVAAConsumeFunc(rootCtx, cfg, metrics, logger)
repository := consumer.NewRepository(logger, db.Database)
consumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, rootCtx, logger, repository, metrics, cfg.P2pNetwork)
consumer.Start(rootCtx)
@ -116,7 +123,7 @@ func newSqsConsumer(ctx context.Context, cfg *config.ServiceSettings) (*sqs.Cons
consumer, err := sqs.NewConsumer(
awsconfig,
cfg.SqsUrl,
cfg.PipelineSqsUrl,
sqs.WithMaxMessages(10),
sqs.WithVisibilityTimeout(4*60),
)
@ -166,7 +173,7 @@ func makeHealthChecks(
}
plugins := []health.Check{
health.SQS(awsConfig, config.SqsUrl),
health.SQS(awsConfig, config.PipelineSqsUrl),
health.Mongo(db),
}

View File

@ -53,7 +53,7 @@ type AwsSettings struct {
AwsAccessKeyID string `split_words:"true" required:"false"`
AwsSecretAccessKey string `split_words:"true" required:"false"`
AwsRegion string `split_words:"true" required:"true"`
SqsUrl string `split_words:"true" required:"true"`
PipelineSqsUrl string `split_words:"true" required:"true"`
}
type MongodbSettings struct {
@ -102,6 +102,8 @@ type RpcProviderSettings struct {
OsmosisRequestsPerMinute uint16 `split_words:"true" required:"true"`
PolygonBaseUrl string `split_words:"true" required:"true"`
PolygonRequestsPerMinute uint16 `split_words:"true" required:"true"`
SeiBaseUrl string `split_words:"true" required:"true"`
SeiRequestsPerMinute uint16 `split_words:"true" required:"true"`
SolanaBaseUrl string `split_words:"true" required:"true"`
SolanaRequestsPerMinute uint16 `split_words:"true" required:"true"`
SuiBaseUrl string `split_words:"true" required:"true"`

View File

@ -84,7 +84,7 @@ func (c *Consumer) process(ctx context.Context, msg queue.ConsumerMessage) {
TxHash: event.TxHash,
Overwrite: false, // avoid processing the same transaction twice
}
err := ProcessSourceTx(ctx, c.logger, c.rpcProviderSettings, c.repository, &p, c.p2pNetwork)
_, err := ProcessSourceTx(ctx, c.logger, c.rpcProviderSettings, c.repository, &p, c.p2pNetwork)
// Log a message informing the processing status
if errors.Is(err, chains.ErrChainNotSupported) {

View File

@ -45,7 +45,7 @@ func ProcessSourceTx(
repository *Repository,
params *ProcessSourceTxParams,
p2pNetwork string,
) error {
) (*chains.TxDetail, error) {
if !params.Overwrite {
// If the message has already been processed, skip it.
@ -56,9 +56,9 @@ func ProcessSourceTx(
// we don't want to hit the RPC nodes again for performance reasons.
processed, err := repository.AlreadyProcessed(ctx, params.VaaId)
if err != nil {
return err
return nil, err
} else if err == nil && processed {
return ErrAlreadyProcessed
return nil, ErrAlreadyProcessed
}
}
@ -80,9 +80,9 @@ func ProcessSourceTx(
// Keep retrying?
if params.Timestamp == nil && retries > minRetries {
return fmt.Errorf("failed to process transaction: %w", err)
return nil, fmt.Errorf("failed to process transaction: %w", err)
} else if time.Since(*params.Timestamp) > retryDeadline && retries >= minRetries {
return fmt.Errorf("failed to process transaction: %w", err)
return nil, fmt.Errorf("failed to process transaction: %w", err)
} else {
logger.Warn("failed to process transaction",
zap.String("vaaId", params.VaaId),
@ -103,5 +103,10 @@ func ProcessSourceTx(
TxDetail: txDetail,
TxStatus: domain.SourceTxStatusConfirmed,
}
return repository.UpsertDocument(ctx, &p)
err = repository.UpsertDocument(ctx, &p)
if err != nil {
return nil, err
}
return txDetail, nil
}

View File

@ -5,6 +5,7 @@ import (
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/pprof"
health "github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/http/vaa"
"go.uber.org/zap"
)
@ -14,7 +15,7 @@ type Server struct {
logger *zap.Logger
}
func NewServer(logger *zap.Logger, port string, pprofEnabled bool, checks ...health.Check) *Server {
func NewServer(logger *zap.Logger, port string, pprofEnabled bool, vaaController *vaa.Controller, checks ...health.Check) *Server {
app := fiber.New(fiber.Config{DisableStartupMessage: true})
prometheus := fiberprometheus.New("wormscan-tx-tracker")
prometheus.RegisterAt(app, "/metrics")
@ -30,6 +31,8 @@ func NewServer(logger *zap.Logger, port string, pprofEnabled bool, checks ...hea
api.Get("/health", ctrl.HealthCheck)
api.Get("/ready", ctrl.ReadyCheck)
api.Post("/vaa/process", vaaController.Process)
return &Server{
app: app,
port: port,

View File

@ -0,0 +1,66 @@
package vaa
import (
"strconv"
"github.com/gofiber/fiber/v2"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/consumer"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
// Controller definition.
type Controller struct {
logger *zap.Logger
vaaRepository *Repository
repository *consumer.Repository
rpcProviderSettings *config.RpcProviderSettings
p2pNetwork string
}
// NewController creates a Controller instance.
func NewController(vaaRepository *Repository, repository *consumer.Repository, rpcProviderSettings *config.RpcProviderSettings, p2pNetwork string, logger *zap.Logger) *Controller {
return &Controller{vaaRepository: vaaRepository, repository: repository, rpcProviderSettings: rpcProviderSettings, p2pNetwork: p2pNetwork, logger: logger}
}
func (c *Controller) Process(ctx *fiber.Ctx) error {
payload := struct {
ID string `json:"id"`
}{}
if err := ctx.BodyParser(&payload); err != nil {
return err
}
c.logger.Info("Processing VAA from endpoint", zap.String("id", payload.ID))
v, err := c.vaaRepository.FindById(ctx.Context(), payload.ID)
if err != nil {
return err
}
vaa, err := sdk.Unmarshal(v.Vaa)
if err != nil {
return err
}
p := &consumer.ProcessSourceTxParams{
Timestamp: &vaa.Timestamp,
VaaId: vaa.MessageID(),
ChainId: vaa.EmitterChain,
Emitter: vaa.EmitterAddress.String(),
Sequence: strconv.FormatUint(vaa.Sequence, 10),
TxHash: v.TxHash,
Overwrite: true,
}
result, err := consumer.ProcessSourceTx(ctx.Context(), c.logger, c.rpcProviderSettings, c.repository, p, c.p2pNetwork)
if err != nil {
return err
}
return ctx.JSON(struct {
Result any `json:"result"`
}{Result: result})
}

View File

@ -0,0 +1,35 @@
package vaa
import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
type Repository struct {
db *mongo.Database
logger *zap.Logger
vaas *mongo.Collection
}
type VaaDoc struct {
ID string `bson:"_id" json:"id"`
Vaa []byte `bson:"vaas" json:"vaa"`
TxHash string `bson:"txHash" json:"txHash"`
}
// NewRepository create a new Repository.
func NewRepository(db *mongo.Database, logger *zap.Logger) *Repository {
return &Repository{db: db,
logger: logger.With(zap.String("module", "VaaRepository")),
vaas: db.Collection("vaas"),
}
}
func (r *Repository) FindById(ctx context.Context, id string) (*VaaDoc, error) {
var vaaDoc VaaDoc
err := r.vaas.FindOne(ctx, bson.M{"_id": id}).Decode(&vaaDoc)
return &vaaDoc, err
}