Improve tx tracker logic without internal retries (#890)

* Improve tx tracker logic without internal retries

Co-authored-by: walker-16 <agpazos85@gmail.com>

* Add message attribute in pipeline sns

Co-authored-by: walker-16 <agpazos85@gmail.com>

---------

Co-authored-by: walker-16 <agpazos85@gmail.com>
This commit is contained in:
ftocal 2023-12-12 15:46:00 -03:00 committed by GitHub
parent 04674745c2
commit 7992f76e56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 67 additions and 73 deletions

View File

@ -1,7 +1,7 @@
ENVIRONMENT=production-mainnet
NAMESPACE=wormscan
NAME=wormscan-tx-tracker
REPLICAS=2
REPLICAS=3
IMAGE_NAME=
RESOURCES_LIMITS_MEMORY=256Mi
RESOURCES_LIMITS_CPU=500m

View File

@ -1,8 +1,8 @@
ENVIRONMENT=production-testnet
NAMESPACE=wormscan-testnet
NAME=wormscan-tx-tracker
REPLICAS=1
IMAGE_NAME=
REPLICAS=3
IMAGE_NAME=698505565001.dkr.ecr.us-east-2.amazonaws.com/wormscan-tx-tracker:0.0.26r3
RESOURCES_LIMITS_MEMORY=30Mi
RESOURCES_LIMITS_CPU=20m
RESOURCES_REQUESTS_MEMORY=15Mi
@ -21,25 +21,25 @@ ALGORAND_BASE_URL=https://testnet-idx.algonode.cloud
ALGORAND_REQUESTS_PER_MINUTE=12
APTOS_BASE_URL=https://fullnode.testnet.aptoslabs.com
APTOS_REQUESTS_PER_MINUTE=12
APTOS_REQUESTS_PER_MINUTE=120
ARBITRUM_BASE_URL=https://goerli-rollup.arbitrum.io/rpc
ARBITRUM_REQUESTS_PER_MINUTE=12
AVALANCHE_BASE_URL=https://rpc.ankr.com/avalanche_fuji
AVALANCHE_REQUESTS_PER_MINUTE=12
AVALANCHE_REQUESTS_PER_MINUTE=120
BASE_BASE_URL=https://base-goerli.public.blastapi.io
BASE_REQUESTS_PER_MINUTE=12
BASE_REQUESTS_PER_MINUTE=120
BSC_BASE_URL=https://data-seed-prebsc-1-s1.binance.org:8545
BSC_REQUESTS_PER_MINUTE=12
BSC_REQUESTS_PER_MINUTE=120
CELO_BASE_URL=https://alfajores-forno.celo-testnet.org
CELO_REQUESTS_PER_MINUTE=12
ETHEREUM_BASE_URL=https://rpc.ankr.com/eth_goerli
ETHEREUM_REQUESTS_PER_MINUTE=12
ETHEREUM_REQUESTS_PER_MINUTE=120
EVMOS_BASE_URL=https://evmos-testnet-rpc.polkachu.com
EVMOS_REQUESTS_PER_MINUTE=12
@ -66,19 +66,19 @@ OASIS_BASE_URL=https://testnet.emerald.oasis.dev
OASIS_REQUESTS_PER_MINUTE=12
OPTIMISM_BASE_URL=https://goerli.optimism.io
OPTIMISM_REQUESTS_PER_MINUTE=12
OPTIMISM_REQUESTS_PER_MINUTE=120
OSMOSIS_BASE_URL=https://rpc.testnet.osmosis.zone
OSMOSIS_REQUESTS_PER_MINUTE=12
POLYGON_BASE_URL=https://rpc.ankr.com/polygon_mumbai
POLYGON_REQUESTS_PER_MINUTE=12
POLYGON_REQUESTS_PER_MINUTE=120
SEI_BASE_URL=https://sei-a2-rpc.brocha.in
SEI_REQUESTS_PER_MINUTE=12
SOLANA_BASE_URL=https://api.devnet.solana.com
SOLANA_REQUESTS_PER_MINUTE=12
SOLANA_REQUESTS_PER_MINUTE=120
SUI_BASE_URL=https://fullnode.testnet.sui.io:443
SUI_REQUESTS_PER_MINUTE=12

View File

@ -1,7 +1,7 @@
ENVIRONMENT=staging-testnet
NAMESPACE=wormscan-testnet
NAME=wormscan-tx-tracker
REPLICAS=1
REPLICAS=2
IMAGE_NAME=
RESOURCES_LIMITS_MEMORY=30Mi
RESOURCES_LIMITS_CPU=20m

View File

@ -45,7 +45,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-retryablehttp v0.5.1 // indirect
github.com/holiman/uint256 v1.2.1 // indirect
github.com/klauspost/compress v1.16.3 // indirect

View File

@ -183,8 +183,9 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/hashicorp/go-cleanhttp v0.5.0 h1:wvCrVc9TjDls6+YGAF2hAifE1E5U1+b4tH6KdvN3Gig=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-retryablehttp v0.5.1 h1:Vsx5XKPqPs3M6sM4U4GWyUqFS8aBiL9U5gkgvpkg4SE=
github.com/hashicorp/go-retryablehttp v0.5.1/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=

View File

@ -2,9 +2,11 @@ package sns
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
aws_sns "github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sns/types"
)
// Producer represents SNS producer.
@ -21,13 +23,20 @@ func NewProducer(awsConfig aws.Config, url string) (*Producer, error) {
}
// SendMessage sends messages to SQS.
func (p *Producer) SendMessage(ctx context.Context, groupID, deduplicationID, body string) error {
func (p *Producer) SendMessage(ctx context.Context, chainId uint16, groupID, deduplicationID, body string) error {
attrs := map[string]types.MessageAttributeValue{
"chainId": {
DataType: aws.String("String"),
StringValue: aws.String(fmt.Sprintf("%d", chainId)),
},
}
_, err := p.api.Publish(ctx,
&aws_sns.PublishInput{
MessageGroupId: aws.String(groupID),
MessageDeduplicationId: aws.String(deduplicationID),
Message: aws.String(body),
TopicArn: aws.String(p.url),
MessageAttributes: attrs,
})
return err
}

View File

@ -3,7 +3,6 @@ package topic
import (
"context"
"encoding/json"
"fmt"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
pipelineAlert "github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/alert"
@ -38,16 +37,15 @@ func (s *SNS) Publish(ctx context.Context, message *Event) error {
return err
}
groupID := fmt.Sprintf("%d/%s", message.ChainID, message.EmitterAddress)
s.logger.Debug("Publishing message", zap.String("groupID", groupID))
err = s.producer.SendMessage(ctx, groupID, message.ID, string(body))
s.logger.Debug("Publishing message", zap.String("groupID", message.ID))
err = s.producer.SendMessage(ctx, message.ChainID, message.ID, message.ID, string(body))
if err == nil {
s.metrics.IncVaaSendNotification(message.ChainID)
} else {
// Alert error pushing event.
alertContext := alert.AlertContext{
Details: map[string]string{
"groupID": groupID,
"groupID": message.ID,
"messageID": message.ID,
},
Error: err,

View File

@ -146,7 +146,7 @@ func newSqsConsumer(ctx context.Context, cfg *config.ServiceSettings, sqsUrl str
awsconfig,
sqsUrl,
sqs.WithMaxMessages(10),
sqs.WithVisibilityTimeout(4*60),
sqs.WithVisibilityTimeout(60),
)
return consumer, err
}

View File

@ -56,24 +56,24 @@ func (c *Consumer) producerLoop(ctx context.Context) {
ch := c.consumeFunc(ctx)
for msg := range ch {
c.logger.Debug("Received message", zap.String("vaaId", msg.Data().ID))
c.logger.Debug("Received message", zap.String("vaaId", msg.Data().ID), zap.String("trackId", msg.Data().TrackID))
c.process(ctx, msg)
}
}
func (c *Consumer) process(ctx context.Context, msg queue.ConsumerMessage) {
defer msg.Done()
event := msg.Data()
// Do not process messages from PythNet
if event.ChainID == sdk.ChainIDPythNet {
c.logger.Debug("Skipping expired PythNet message", zap.String("trackId", event.TrackID), zap.String("vaaId", event.ID))
msg.Done()
c.logger.Debug("Skipping pythNet message", zap.String("trackId", event.TrackID), zap.String("vaaId", event.ID))
return
}
if event.ChainID == sdk.ChainIDNear {
msg.Done()
c.logger.Warn("Skipping vaa from near", zap.String("trackId", event.TrackID), zap.String("vaaId", event.ID))
return
}
@ -102,18 +102,21 @@ func (c *Consumer) process(ctx context.Context, msg queue.ConsumerMessage) {
elapsedLog := zap.Uint64("elapsedTime", uint64(time.Since(start).Milliseconds()))
// Log a message informing the processing status
if errors.Is(err, chains.ErrChainNotSupported) {
msg.Done()
c.logger.Info("Skipping VAA - chain not supported",
zap.String("trackId", event.TrackID),
zap.String("vaaId", event.ID),
elapsedLog,
)
} else if errors.Is(err, ErrAlreadyProcessed) {
msg.Done()
c.logger.Warn("Message already processed - skipping",
zap.String("trackId", event.TrackID),
zap.String("vaaId", event.ID),
elapsedLog,
)
} else if err != nil {
msg.Failed()
c.logger.Error("Failed to process originTx",
zap.String("trackId", event.TrackID),
zap.String("vaaId", event.ID),
@ -121,6 +124,7 @@ func (c *Consumer) process(ctx context.Context, msg queue.ConsumerMessage) {
elapsedLog,
)
} else {
msg.Done()
c.logger.Info("Transaction processed successfully",
zap.String("trackId", event.TrackID),
zap.String("id", event.ID),

View File

@ -3,7 +3,6 @@ package consumer
import (
"context"
"errors"
"fmt"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
@ -73,62 +72,45 @@ func ProcessSourceTx(
// 2. A minimum number of attempts have been made.
var txDetail *chains.TxDetail
var err error
for retries := 0; ; retries++ {
if params.TxHash == "" {
// add metrics for vaa without txHash
params.Metrics.IncVaaWithoutTxHash(uint16(params.ChainId))
v, err := repository.GetVaaIdTxHash(ctx, params.VaaId)
if err != nil {
logger.Error("failed to find vaaIdTxHash",
zap.String("trackId", params.TrackID),
zap.String("vaaId", params.VaaId),
zap.Any("vaaTimestamp", params.Timestamp),
zap.Int("retries", retries),
zap.Error(err),
)
} else {
// add metrics for vaa with txHash fixed
params.Metrics.IncVaaWithTxHashFixed(uint16(params.ChainId))
params.TxHash = v.TxHash
logger.Warn("fix txHash for vaa",
zap.String("trackId", params.TrackID),
zap.String("vaaId", params.VaaId),
zap.Any("vaaTimestamp", params.Timestamp),
zap.Int("retries", retries),
zap.String("txHash", v.TxHash),
)
}
}
if params.TxHash != "" {
// Get transaction details from the emitter blockchain
txDetail, err = chains.FetchTx(ctx, rpcServiceProviderSettings, params.ChainId, params.TxHash, params.Timestamp, p2pNetwork)
if err == nil {
break
}
}
// Keep retrying?
if params.Timestamp == nil && retries > minRetries {
return nil, fmt.Errorf("failed to process transaction: %w", err)
} else if time.Since(*params.Timestamp) > retryDeadline && retries >= minRetries {
return nil, fmt.Errorf("failed to process transaction: %w", err)
} else {
logger.Warn("failed to process transaction",
if params.TxHash == "" {
// add metrics for vaa without txHash
params.Metrics.IncVaaWithoutTxHash(uint16(params.ChainId))
v, err := repository.GetVaaIdTxHash(ctx, params.VaaId)
if err != nil {
logger.Error("failed to find vaaIdTxHash",
zap.String("trackId", params.TrackID),
zap.String("vaaId", params.VaaId),
zap.Any("vaaTimestamp", params.Timestamp),
zap.Int("retries", retries),
zap.Error(err),
)
if params.Timestamp != nil && time.Since(*params.Timestamp) < retryDeadline {
time.Sleep(retryDelay)
}
} else {
// add metrics for vaa with txHash fixed
params.Metrics.IncVaaWithTxHashFixed(uint16(params.ChainId))
params.TxHash = v.TxHash
logger.Warn("fix txHash for vaa",
zap.String("trackId", params.TrackID),
zap.String("vaaId", params.VaaId),
zap.Any("vaaTimestamp", params.Timestamp),
zap.String("txHash", v.TxHash),
)
}
}
if params.TxHash == "" {
logger.Warn("txHash is empty",
zap.String("trackId", params.TrackID),
zap.String("vaaId", params.VaaId),
)
return nil, errors.New("txHash is empty")
}
// Get transaction details from the emitter blockchain
txDetail, err = chains.FetchTx(ctx, rpcServiceProviderSettings, params.ChainId, params.TxHash, params.Timestamp, p2pNetwork)
if err != nil {
return nil, err
}
// Store source transaction details in the database
p := UpsertDocumentParams{
VaaId: params.VaaId,