Retrieve chain and sender of wormchain originated vaas (#678)

* Add additional information for osmosis transaction through wormchain

Co-authored-by: walker-16 <agpazos85@gmail.com>

* Modify tx-tracker deployment

Co-authored-by: walker-16 <agpazos85@gmail.com>

---------

Co-authored-by: walker-16 <agpazos85@gmail.com>
This commit is contained in:
ftocal 2023-09-04 15:17:23 -03:00 committed by GitHub
parent a525a1f686
commit 5e3adeb4ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 413 additions and 14 deletions

View File

@ -74,14 +74,21 @@ type GlobalTransactionDoc struct {
DestinationTx *DestinationTx `bson:"destinationTx" json:"destinationTx"`
}
// OriginTx representa a origin transaction.
// OriginTx represents a origin transaction.
type OriginTx struct {
TxHash string `bson:"nativeTxHash" json:"txHash"`
From string `bson:"from" json:"from"`
Status string `bson:"status" json:"status"`
Attribute *AttributeDoc `bson:"attribute" json:"attribute"`
}
// DestinationTx representa a destination transaction.
// AttributeDoc represents a custom attribute for a origin transaction.
type AttributeDoc struct {
Type string `bson:"type" json:"type"`
Value map[string]any `bson:"value" json:"value"`
}
// DestinationTx represents a destination transaction.
type DestinationTx struct {
ChainID sdk.ChainID `bson:"chainId" json:"chainId"`
Status string `bson:"status" json:"status"`

View File

@ -9,6 +9,7 @@ RESOURCES_REQUESTS_MEMORY=128Mi
RESOURCES_REQUESTS_CPU=250m
SQS_URL=
SQS_AWS_REGION=
P2P_NETWORK=mainnet
AWS_IAM_ROLE=
METRICS_ENABLED=true
@ -62,6 +63,9 @@ OASIS_REQUESTS_PER_MINUTE=12
OPTIMISM_BASE_URL=https://rpc.ankr.com/optimism
OPTIMISM_REQUESTS_PER_MINUTE=12
OSMOSIS_BASE_URL=https://rpc.osmosis.zone
OSMOSIS_REQUESTS_PER_MINUTE=12
POLYGON_BASE_URL=https://rpc.ankr.com/polygon
POLYGON_REQUESTS_PER_MINUTE=12
@ -77,5 +81,8 @@ TERRA_REQUESTS_PER_MINUTE=12
TERRA2_BASE_URL=https://phoenix-lcd.terra.dev
TERRA2_REQUESTS_PER_MINUTE=12
WORMCHAIN_BASE_URL=https://wormchain.jumpisolated.com
WORMCHAIN_REQUESTS_PER_MINUTE=12
XPLA_BASE_URL=https://dimension-lcd.xpla.dev
XPLA_REQUESTS_PER_MINUTE=12

View File

@ -9,6 +9,7 @@ RESOURCES_REQUESTS_MEMORY=15Mi
RESOURCES_REQUESTS_CPU=10m
SQS_URL=
SQS_AWS_REGION=
P2P_NETWORK=testnet
AWS_IAM_ROLE=
METRICS_ENABLED=true
@ -60,6 +61,9 @@ OASIS_REQUESTS_PER_MINUTE=12
OPTIMISM_BASE_URL=https://goerli.optimism.io
OPTIMISM_REQUESTS_PER_MINUTE=12
OSMOSIS_BASE_URL=https://rpc.testnet.osmosis.zone
OSMOSIS_REQUESTS_PER_MINUTE=12
POLYGON_BASE_URL=https://rpc.ankr.com/polygon_mumbai
POLYGON_REQUESTS_PER_MINUTE=12
@ -75,5 +79,8 @@ TERRA_REQUESTS_PER_MINUTE=12
TERRA2_BASE_URL=https://pisco-lcd.terra.dev
TERRA2_REQUESTS_PER_MINUTE=12
WORMCHAIN_BASE_URL=https://wormchain-testnet.jumpisolated.com
WORMCHAIN_REQUESTS_PER_MINUTE=12
XPLA_BASE_URL=https://cube-lcd.xpla.dev:443
XPLA_REQUESTS_PER_MINUTE=12

View File

@ -9,6 +9,7 @@ RESOURCES_REQUESTS_MEMORY=15Mi
RESOURCES_REQUESTS_CPU=40m
SQS_URL=
SQS_AWS_REGION=
P2P_NETWORK=mainnet
AWS_IAM_ROLE=
METRICS_ENABLED=true
@ -62,6 +63,9 @@ OASIS_REQUESTS_PER_MINUTE=12
OPTIMISM_BASE_URL=https://rpc.ankr.com/optimism
OPTIMISM_REQUESTS_PER_MINUTE=12
OSMOSIS_BASE_URL=https://rpc.osmosis.zone
OSMOSIS_REQUESTS_PER_MINUTE=12
POLYGON_BASE_URL=https://rpc.ankr.com/polygon
POLYGON_REQUESTS_PER_MINUTE=12
@ -77,5 +81,8 @@ TERRA_REQUESTS_PER_MINUTE=12
TERRA2_BASE_URL=https://phoenix-lcd.terra.dev
TERRA2_REQUESTS_PER_MINUTE=12
WORMCHAIN_BASE_URL=https://wormchain.jumpisolated.com
WORMCHAIN_REQUESTS_PER_MINUTE=12
XPLA_BASE_URL=https://dimension-lcd.xpla.dev
XPLA_REQUESTS_PER_MINUTE=12

View File

@ -9,6 +9,7 @@ RESOURCES_REQUESTS_MEMORY=15Mi
RESOURCES_REQUESTS_CPU=10m
SQS_URL=
SQS_AWS_REGION=
P2P_NETWORK=testnet
AWS_IAM_ROLE=
METRICS_ENABLED=true
@ -60,6 +61,9 @@ OASIS_REQUESTS_PER_MINUTE=12
OPTIMISM_BASE_URL=https://goerli.optimism.io
OPTIMISM_REQUESTS_PER_MINUTE=12
OSMOSIS_BASE_URL=https://rpc.testnet.osmosis.zone
OSMOSIS_REQUESTS_PER_MINUTE=12
POLYGON_BASE_URL=https://rpc.ankr.com/polygon_mumbai
POLYGON_REQUESTS_PER_MINUTE=12
@ -75,5 +79,8 @@ TERRA_REQUESTS_PER_MINUTE=12
TERRA2_BASE_URL=https://pisco-lcd.terra.dev
TERRA2_REQUESTS_PER_MINUTE=12
WORMCHAIN_BASE_URL=https://wormchain-testnet.jumpisolated.com
WORMCHAIN_REQUESTS_PER_MINUTE=12
XPLA_BASE_URL=https://cube-lcd.xpla.dev:443
XPLA_REQUESTS_PER_MINUTE=12

View File

@ -60,6 +60,8 @@ spec:
value: {{ .SQS_URL }}
- name: AWS_REGION
value: {{ .SQS_AWS_REGION }}
- name: P2P_NETWORK
value: {{ .P2P_NETWORK }}
- name: METRICS_ENABLED
value: "{{ .METRICS_ENABLED }}"
- name: ACALA_BASE_URL
@ -126,6 +128,10 @@ spec:
value: {{ .OPTIMISM_BASE_URL }}
- name: OPTIMISM_REQUESTS_PER_MINUTE
value: "{{ .OPTIMISM_REQUESTS_PER_MINUTE }}"
- name: OSMOSIS_BASE_URL
value: {{ .OSMOSIS_BASE_URL }}
- name: OSMOSIS_REQUESTS_PER_MINUTE
value: "{{ .OSMOSIS_REQUESTS_PER_MINUTE }}"
- name: POLYGON_BASE_URL
value: {{ .POLYGON_BASE_URL }}
- name: POLYGON_REQUESTS_PER_MINUTE
@ -146,6 +152,10 @@ spec:
value: {{ .TERRA2_BASE_URL }}
- name: TERRA2_REQUESTS_PER_MINUTE
value: "{{ .TERRA2_REQUESTS_PER_MINUTE }}"
- name: WORMCHAIN_BASE_URL
value: {{ .WORMCHAIN_BASE_URL }}
- name: WORMCHAIN_REQUESTS_PER_MINUTE
value: "{{ .WORMCHAIN_REQUESTS_PER_MINUTE }}"
- name: XPLA_BASE_URL
value: {{ .XPLA_BASE_URL }}
- name: XPLA_REQUESTS_PER_MINUTE

View File

@ -0,0 +1,264 @@
package chains
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
)
type apiWormchain struct {
osmosisUrl string
osmosisRateLimiter *time.Ticker
p2pNetwork string
}
type wormchainTxDetail struct {
Jsonrpc string `json:"jsonrpc"`
ID int `json:"id"`
Result struct {
Hash string `json:"hash"`
Height string `json:"height"`
Index int `json:"index"`
TxResult struct {
Code int `json:"code"`
Data string `json:"data"`
Log string `json:"log"`
Info string `json:"info"`
GasWanted string `json:"gas_wanted"`
GasUsed string `json:"gas_used"`
Events []struct {
Type string `json:"type"`
Attributes []struct {
Key string `json:"key"`
Value string `json:"value"`
Index bool `json:"index"`
} `json:"attributes"`
} `json:"events"`
Codespace string `json:"codespace"`
} `json:"tx_result"`
Tx string `json:"tx"`
} `json:"result"`
}
type event struct {
Type string `json:"type"`
Attributes []struct {
Key string `json:"key"`
Value string `json:"value"`
} `json:"attributes"`
}
type packetData struct {
Sender string `json:"sender"`
Receiver string `json:"receiver"`
}
type logWrapper struct {
Events []event `json:"events"`
}
type worchainTx struct {
srcChannel, dstChannel, sender, receiver, timestamp, sequence string
}
func fetchWormchainDetail(ctx context.Context, baseUrl string, rateLimiter *time.Ticker, txHash string) (*worchainTx, error) {
uri := fmt.Sprintf("%s/tx?hash=%s", baseUrl, txHash)
body, err := httpGet(ctx, rateLimiter, uri)
if err != nil {
return nil, err
}
var tx wormchainTxDetail
err = json.Unmarshal(body, &tx)
if err != nil {
return nil, err
}
var log []logWrapper
err = json.Unmarshal([]byte(tx.Result.TxResult.Log), &log)
if err != nil {
return nil, err
}
var srcChannel, dstChannel, sender, receiver, timestamp, sequence string
for _, l := range log {
for _, e := range l.Events {
if e.Type == "recv_packet" {
for _, attr := range e.Attributes {
if attr.Key == "packet_src_channel" {
srcChannel = attr.Value
}
if attr.Key == "packet_dst_channel" {
dstChannel = attr.Value
}
if attr.Key == "packet_timeout_timestamp" {
timestamp = attr.Value
}
if attr.Key == "packet_sequence" {
sequence = attr.Value
}
if attr.Key == "packet_data" {
var pd packetData
err = json.Unmarshal([]byte(attr.Value), &pd)
if err != nil {
return nil, err
}
sender = pd.Sender
receiver = pd.Receiver
}
}
}
}
}
return &worchainTx{
srcChannel: srcChannel,
dstChannel: dstChannel,
sender: sender,
receiver: receiver,
timestamp: timestamp,
sequence: sequence,
}, nil
}
const queryTemplate = `send_packet.packet_sequence='%s' AND send_packet.packet_timeout_timestamp='%s' AND send_packet.packet_src_channel='%s' AND send_packet.packet_dst_channel='%s'`
type osmosisRequest struct {
Jsonrpc string `json:"jsonrpc"`
ID int `json:"id"`
Method string `json:"method"`
Params struct {
Query string `json:"query"`
Page string `json:"page"`
} `json:"params"`
}
type osmosisResponse struct {
Jsonrpc string `json:"jsonrpc"`
ID int `json:"id"`
Result struct {
Txs []struct {
Hash string `json:"hash"`
Height string `json:"height"`
Index int `json:"index"`
TxResult struct {
Code int `json:"code"`
Data string `json:"data"`
Log string `json:"log"`
Info string `json:"info"`
GasWanted string `json:"gas_wanted"`
GasUsed string `json:"gas_used"`
Events []struct {
Type string `json:"type"`
Attributes []struct {
Key string `json:"key"`
Value string `json:"value"`
Index bool `json:"index"`
} `json:"attributes"`
} `json:"events"`
Codespace string `json:"codespace"`
} `json:"tx_result"`
Tx string `json:"tx"`
} `json:"txs"`
TotalCount string `json:"total_count"`
} `json:"result"`
}
type osmosisTx struct {
txHash string
}
type WorchainAttributeTxDetail struct {
OriginChainID sdk.ChainID `bson:"originChainId"`
OriginTxHash string `bson:"originTxHash"`
OriginAddress string `bson:"originAddress"`
}
func fetchOsmosisDetail(ctx context.Context, baseUrl string, rateLimiter *time.Ticker, sequence, timestamp, srcChannel, dstChannel string) (*osmosisTx, error) {
query := fmt.Sprintf(queryTemplate, sequence, timestamp, srcChannel, dstChannel)
q := osmosisRequest{
Jsonrpc: "2.0",
ID: 1,
Method: "tx_search",
Params: struct {
Query string `json:"query"`
Page string `json:"page"`
}{
Query: query,
Page: "1",
},
}
response, err := httpPost(ctx, rateLimiter, baseUrl, q)
if err != nil {
return nil, err
}
var oReponse osmosisResponse
err = json.Unmarshal(response, &oReponse)
if err != nil {
return nil, err
}
if len(oReponse.Result.Txs) == 0 {
return nil, fmt.Errorf("can not found hash for sequence %s, timestamp %s, srcChannel %s, dstChannel %s", sequence, timestamp, srcChannel, dstChannel)
}
return &osmosisTx{txHash: oReponse.Result.Txs[0].Hash}, nil
}
func (a *apiWormchain) fetchWormchainTx(
ctx context.Context,
rateLimiter *time.Ticker,
baseUrl string,
txHash string,
) (*TxDetail, error) {
txHash = txHashLowerCaseWith0x(txHash)
wormchainTx, err := fetchWormchainDetail(ctx, baseUrl, rateLimiter, txHash)
if err != nil {
return nil, err
}
// Verify if this transaction is from osmosis by wormchain
if a.isOsmosisTx(wormchainTx) {
osmosisTx, err := fetchOsmosisDetail(ctx, a.osmosisUrl, a.osmosisRateLimiter, wormchainTx.sequence, wormchainTx.timestamp, wormchainTx.srcChannel, wormchainTx.dstChannel)
if err != nil {
return nil, err
}
return &TxDetail{
NativeTxHash: txHash,
From: wormchainTx.receiver,
Attribute: &AttributeTxDetail{
Type: "wormchain-gateway",
Value: &WorchainAttributeTxDetail{
OriginChainID: ChainIDOsmosis,
OriginTxHash: osmosisTx.txHash,
OriginAddress: wormchainTx.sender,
},
},
}, nil
}
return &TxDetail{
NativeTxHash: txHash,
From: wormchainTx.receiver,
}, nil
}
func (a *apiWormchain) isOsmosisTx(tx *worchainTx) bool {
if a.p2pNetwork == domain.P2pMainNet {
return tx.srcChannel == "channel-2186" && tx.dstChannel == "channel-3"
}
if a.p2pNetwork == domain.P2pTestNet {
return tx.srcChannel == "channel-486" && tx.dstChannel == "channel-4"
}
return false
}

View File

@ -23,11 +23,23 @@ var (
baseUrlsByChain map[sdk.ChainID]string
)
// WARNING: The following chain IDs are not supported by the wormhole-sdk:
const ChainIDOsmosis sdk.ChainID = 20
type WormchainTxDetail struct {
}
type TxDetail struct {
// From is the address that signed the transaction, encoded in the chain's native format.
From string
// NativeTxHash contains the transaction hash, encoded in the chain's native format.
NativeTxHash string
// Attribute contains the specific information of the transaction.
Attribute *AttributeTxDetail
}
type AttributeTxDetail struct {
Type string
Value any
}
func Initialize(cfg *config.RpcProviderSettings) {
@ -67,6 +79,8 @@ func Initialize(cfg *config.RpcProviderSettings) {
rateLimitersByChain[sdk.ChainIDTerra2] = convertToRateLimiter(cfg.Terra2RequestsPerMinute)
rateLimitersByChain[sdk.ChainIDSui] = convertToRateLimiter(cfg.SuiRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDXpla] = convertToRateLimiter(cfg.XplaRequestsPerMinute)
rateLimitersByChain[sdk.ChainIDWormchain] = convertToRateLimiter(cfg.WormchainRequestsPerMinute)
rateLimitersByChain[ChainIDOsmosis] = convertToRateLimiter(cfg.OsmosisRequestsPerMinute)
// Initialize the RPC base URLs for each chain
baseUrlsByChain = make(map[sdk.ChainID]string)
@ -92,6 +106,7 @@ func Initialize(cfg *config.RpcProviderSettings) {
baseUrlsByChain[sdk.ChainIDTerra2] = cfg.Terra2BaseUrl
baseUrlsByChain[sdk.ChainIDSui] = cfg.SuiBaseUrl
baseUrlsByChain[sdk.ChainIDXpla] = cfg.XplaBaseUrl
baseUrlsByChain[sdk.ChainIDWormchain] = cfg.WormchainBaseUrl
}
func FetchTx(
@ -99,6 +114,7 @@ func FetchTx(
cfg *config.RpcProviderSettings,
chainId sdk.ChainID,
txHash string,
p2pNetwork string,
) (*TxDetail, error) {
// Decide which RPC/API service to use based on chain ID
@ -132,6 +148,17 @@ func FetchTx(
sdk.ChainIDOptimism,
sdk.ChainIDPolygon:
fetchFunc = fetchEthTx
case sdk.ChainIDWormchain:
rateLimiter, ok := rateLimitersByChain[ChainIDOsmosis]
if !ok {
return nil, errors.New("found no rate limiter for chain osmosis")
}
apiWormchain := &apiWormchain{
osmosisUrl: cfg.OsmosisBaseUrl,
osmosisRateLimiter: rateLimiter,
p2pNetwork: p2pNetwork,
}
fetchFunc = apiWormchain.fetchWormchainTx
default:
return nil, ErrChainNotSupported
}

View File

@ -1,9 +1,11 @@
package chains
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"net/http"
"strconv"
"strings"
@ -56,13 +58,52 @@ func httpGet(ctx context.Context, rateLimiter *time.Ticker, url string) ([]byte,
}
// Read the response body and return
body, err := ioutil.ReadAll(response.Body)
body, err := io.ReadAll(response.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
return body, nil
}
// httpPost is a helper function that performs an HTTP request.
func httpPost(ctx context.Context, rateLimiter *time.Ticker, url string, body any) ([]byte, error) {
// Wait for the rate limiter
if !waitForRateLimiter(ctx, rateLimiter) {
return nil, ctx.Err()
}
b, err := json.Marshal(body)
if err != nil {
return nil, err
}
// Build the HTTP request
request, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(b))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
request.Header.Set("Content-Type", "application/json")
// Send it
var client http.Client
response, err := client.Do(request)
if err != nil {
return nil, fmt.Errorf("failed to query url: %w", err)
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected HTTP status code: %d", response.StatusCode)
}
// Read the response body and return
result, err := io.ReadAll(response.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
return result, nil
}
func waitForRateLimiter(ctx context.Context, t *time.Ticker) bool {
select {
case <-t.C:
@ -108,3 +149,10 @@ func (c *rateLimitedRpcClient) CallContext(
func (c *rateLimitedRpcClient) Close() {
c.client.Close()
}
func txHashLowerCaseWith0x(v string) string {
if strings.HasPrefix(v, "0x") {
return strings.ToLower(v)
}
return "0x" + strings.ToLower(v)
}

View File

@ -109,6 +109,7 @@ func main() {
wg: &wg,
totalDocuments: totalDocuments,
processedDocuments: &processedDocuments,
p2pNetwork: cfg.P2pNetwork,
}
go consume(rootCtx, &p)
}
@ -242,6 +243,7 @@ type consumerParams struct {
wg *sync.WaitGroup
totalDocuments uint64
processedDocuments *atomic.Uint64
p2pNetwork string
}
// consume reads VAA IDs from a channel, processes them, and updates the database accordingly.
@ -303,7 +305,7 @@ func consume(ctx context.Context, params *consumerParams) {
TxHash: *v.TxHash,
Overwrite: true, // Overwrite old contents
}
err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcProviderSettings, params.repository, &p)
err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcProviderSettings, params.repository, &p, params.p2pNetwork)
if err != nil {
params.logger.Error("Failed to track source tx",
zap.String("vaaId", globalTx.Id),

View File

@ -13,8 +13,8 @@ import (
func main() {
// validate commandline arguments
if len(os.Args) != 3 {
log.Fatalf("Usage: ./%s <chain name> <tx hash>\n", os.Args[0])
if len(os.Args) != 4 {
log.Fatalf("Usage: ./%s <chain name> <tx hash> <p2p network>\n", os.Args[0])
}
// load config settings
@ -31,7 +31,7 @@ func main() {
// fetch tx data
chains.Initialize(cfg)
txDetail, err := chains.FetchTx(context.Background(), cfg, chainId, os.Args[2])
txDetail, err := chains.FetchTx(context.Background(), cfg, chainId, os.Args[2], os.Args[3])
if err != nil {
log.Fatalf("Failed to get transaction data: %v", err)
}

View File

@ -63,7 +63,7 @@ func main() {
// create and start a consumer.
vaaConsumeFunc := newVAAConsumeFunc(rootCtx, cfg, metrics, logger)
repository := consumer.NewRepository(logger, db.Database)
consumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, rootCtx, logger, repository, metrics)
consumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, rootCtx, logger, repository, metrics, cfg.P2pNetwork)
consumer.Start(rootCtx)
logger.Info("Started wormhole-explorer-tx-tracker")

View File

@ -22,6 +22,7 @@ type BackfillerSettings struct {
LogLevel string `split_words:"true" default:"INFO"`
NumWorkers uint `split_words:"true" required:"true"`
BulkSize uint `split_words:"true" required:"true"`
P2pNetwork string `split_words:"true" required:"true"`
// Strategy determines which VAAs will be affected by the backfiller.
Strategy struct {
@ -41,6 +42,7 @@ type ServiceSettings struct {
LogLevel string `split_words:"true" default:"INFO"`
PprofEnabled bool `split_words:"true" default:"false"`
MetricsEnabled bool `split_words:"true" default:"false"`
P2pNetwork string `split_words:"true" required:"true"`
AwsSettings
MongodbSettings
RpcProviderSettings
@ -92,6 +94,8 @@ type RpcProviderSettings struct {
OasisRequestsPerMinute uint16 `split_words:"true" required:"true"`
OptimismBaseUrl string `split_words:"true" required:"true"`
OptimismRequestsPerMinute uint16 `split_words:"true" required:"true"`
OsmosisBaseUrl string `split_words:"true" required:"true"`
OsmosisRequestsPerMinute uint16 `split_words:"true" required:"true"`
PolygonBaseUrl string `split_words:"true" required:"true"`
PolygonRequestsPerMinute uint16 `split_words:"true" required:"true"`
SolanaBaseUrl string `split_words:"true" required:"true"`
@ -104,6 +108,8 @@ type RpcProviderSettings struct {
Terra2RequestsPerMinute uint16 `split_words:"true" required:"true"`
XplaBaseUrl string `split_words:"true" required:"true"`
XplaRequestsPerMinute uint16 `split_words:"true" required:"true"`
WormchainBaseUrl string `split_words:"true" required:"true"`
WormchainRequestsPerMinute uint16 `split_words:"true" required:"true"`
}
func LoadFromEnv[T any]() (*T, error) {

View File

@ -19,6 +19,7 @@ type Consumer struct {
logger *zap.Logger
repository *Repository
metrics metrics.Metrics
p2pNetwork string
}
// New creates a new vaa consumer.
@ -29,6 +30,7 @@ func New(
logger *zap.Logger,
repository *Repository,
metrics metrics.Metrics,
p2pNetwork string,
) *Consumer {
c := Consumer{
@ -37,6 +39,7 @@ func New(
logger: logger,
repository: repository,
metrics: metrics,
p2pNetwork: p2pNetwork,
}
return &c
@ -81,7 +84,7 @@ func (c *Consumer) process(ctx context.Context, msg queue.ConsumerMessage) {
TxHash: event.TxHash,
Overwrite: false, // avoid processing the same transaction twice
}
err := ProcessSourceTx(ctx, c.logger, c.rpcProviderSettings, c.repository, &p)
err := ProcessSourceTx(ctx, c.logger, c.rpcProviderSettings, c.repository, &p, c.p2pNetwork)
// Log a message informing the processing status
if errors.Is(err, chains.ErrChainNotSupported) {

View File

@ -44,6 +44,7 @@ func ProcessSourceTx(
rpcServiceProviderSettings *config.RpcProviderSettings,
repository *Repository,
params *ProcessSourceTxParams,
p2pNetwork string,
) error {
if !params.Overwrite {
@ -72,7 +73,7 @@ func ProcessSourceTx(
for retries := 0; ; retries++ {
// Get transaction details from the emitter blockchain
txDetail, err = chains.FetchTx(ctx, rpcServiceProviderSettings, params.ChainId, params.TxHash)
txDetail, err = chains.FetchTx(ctx, rpcServiceProviderSettings, params.ChainId, params.TxHash, p2pNetwork)
if err == nil {
break
}

View File

@ -53,6 +53,9 @@ func (r *Repository) UpsertDocument(ctx context.Context, params *UpsertDocumentP
if params.TxDetail != nil {
fields = append(fields, primitive.E{Key: "nativeTxHash", Value: params.TxDetail.NativeTxHash})
fields = append(fields, primitive.E{Key: "from", Value: params.TxDetail.From})
if params.TxDetail.Attribute != nil {
fields = append(fields, primitive.E{Key: "attribute", Value: params.TxDetail.Attribute})
}
}
update := bson.D{