Merge remote-tracking branch 'origin' into feature-1042/mapped-wormchain-source-events

This commit is contained in:
julian merlo 2024-03-22 13:19:10 -03:00
commit 61283100be
14 changed files with 79 additions and 8 deletions

3
LEGAL.txt Normal file
View File

@ -0,0 +1,3 @@
The information and materials available through this repo (collectively, “Materials”) are available on an “AI IS” basis without warranties of any kind, either express or implied, including, but not limited to, warranties of merchantability, title, fitness for a particular purpose and non-infringement. You assume all risks associated with using the Materials, and digital assets and decentralized systems generally, including but not limited to, that: (a) digital assets are highly volatile; (b) using digital assets is inherently risky due to both features of such assets and the potential unauthorized acts of third parties; (c) you may not have ready access to digital assets; and (d) there is a risk of losing your digital assets or those owned by a third party. You agree that you will have no recourse against anyone else for any losses due to the use of the Materials. For example, these losses may arise from or relate to: (i) incorrect information; (ii) software or network failures; (iii) corrupted files; (iv) unauthorized access; (v) errors, mistakes, or inaccuracies; or (vi) third-party activities.
You are solely responsible for using the Materials in compliance with applicable laws and regulations, including but not limited to, export control or sanctions laws of any applicable jurisdiction. You should be aware that U.S. export control and sanctions laws prohibit U.S. persons (and other persons that are subject to such laws) from transacting with persons in certain countries and territories or that are on the Specially Designated Nationals list. Accordingly, you must avoid providing the Materials to sanctioned persons because such activity could be a violation of U.S. export controls and sanctions law.

View File

@ -112,13 +112,13 @@ func Run() {
// create and start a vaa consumer.
logger.Info("initializing vaa consumer...")
vaaConsumeFunc := newVAAConsumeFunc(rootCtx, config, logger)
vaaConsumer := consumer.New(vaaConsumeFunc, metric.Push, logger, config.P2pNetwork)
vaaConsumer := consumer.New(vaaConsumeFunc, metric.Push, logger, metrics, config.P2pNetwork)
vaaConsumer.Start(rootCtx)
// create and start a notification consumer.
logger.Info("initializing notification consumer...")
notificationConsumeFunc := newNotificationConsumeFunc(rootCtx, config, logger)
notificationConsumer := consumer.New(notificationConsumeFunc, metric.Push, logger, config.P2pNetwork)
notificationConsumer := consumer.New(notificationConsumeFunc, metric.Push, logger, metrics, config.P2pNetwork)
notificationConsumer.Start(rootCtx)
// create and start server.

View File

@ -3,6 +3,7 @@ package consumer
import (
"context"
"github.com/wormhole-foundation/wormhole-explorer/analytics/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/analytics/metric"
"github.com/wormhole-foundation/wormhole-explorer/analytics/queue"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
@ -14,12 +15,13 @@ type Consumer struct {
consume queue.ConsumeFunc
pushMetric metric.MetricPushFunc
logger *zap.Logger
metrics metrics.Metrics
p2pNetwork string
}
// New creates a new vaa consumer.
func New(consume queue.ConsumeFunc, pushMetric metric.MetricPushFunc, logger *zap.Logger, p2pNetwork string) *Consumer {
return &Consumer{consume: consume, pushMetric: pushMetric, logger: logger, p2pNetwork: p2pNetwork}
func New(consume queue.ConsumeFunc, pushMetric metric.MetricPushFunc, logger *zap.Logger, metrics metrics.Metrics, p2pNetwork string) *Consumer {
return &Consumer{consume: consume, pushMetric: pushMetric, logger: logger, metrics: metrics, p2pNetwork: p2pNetwork}
}
// Start consumes messages from VAA queue, parse and store those messages in a repository.
@ -30,16 +32,18 @@ func (c *Consumer) Start(ctx context.Context) {
// check id message is expired.
if msg.IsExpired() {
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID))
msg.Failed()
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID))
c.metrics.IncExpiredMessage(c.p2pNetwork, event.Source)
continue
}
// unmarshal vaa.
vaa, err := sdk.Unmarshal(event.Vaa)
if err != nil {
msg.Done()
c.logger.Error("Invalid vaa", zap.String("id", event.ID), zap.Error(err))
msg.Failed()
c.metrics.IncInvalidMessage(c.p2pNetwork, event.Source)
continue
}
@ -47,11 +51,13 @@ func (c *Consumer) Start(ctx context.Context) {
err = c.pushMetric(ctx, &metric.Params{TrackID: event.TrackID, Vaa: vaa, VaaIsSigned: event.VaaIsSigned})
if err != nil {
msg.Failed()
c.metrics.IncUnprocessedMessage(c.p2pNetwork, event.Source)
continue
}
msg.Done()
c.logger.Debug("Pushed vaa metric", zap.String("id", event.ID))
c.metrics.IncProcessedMessage(c.p2pNetwork, event.Source)
}
}()
}

View File

@ -9,4 +9,8 @@ type Metrics interface {
IncFoundNotional(symbol string)
IncMissingToken(chain, token string)
IncFoundToken(chain, token string)
IncExpiredMessage(chain, source string)
IncInvalidMessage(chain, source string)
IncUnprocessedMessage(chain, source string)
IncProcessedMessage(chain, source string)
}

View File

@ -26,3 +26,15 @@ func (p *NoopMetrics) IncMissingToken(chain, token string) {
func (p *NoopMetrics) IncFoundToken(chain, token string) {
}
func (p *NoopMetrics) IncExpiredMessage(chain, source string) {
}
func (p *NoopMetrics) IncInvalidMessage(chain, source string) {
}
func (p *NoopMetrics) IncUnprocessedMessage(chain, source string) {
}
func (p *NoopMetrics) IncProcessedMessage(chain, source string) {
}

View File

@ -9,6 +9,7 @@ type PrometheusMetrics struct {
measurementCount *prometheus.CounterVec
notionalCount *prometheus.CounterVec
tokenRequestsCount *prometheus.CounterVec
processedMessage *prometheus.CounterVec
}
// NewPrometheusMetrics returns a new instance of PrometheusMetrics.
@ -44,10 +45,19 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
[]string{"chain", "token", "status"},
)
processedMessage := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "processed_message",
Help: "Total number of processed message",
ConstLabels: constLabels,
},
[]string{"chain", "source", "status"},
)
return &PrometheusMetrics{
measurementCount: measurementCount,
notionalCount: notionalRequestsCount,
tokenRequestsCount: tokenRequestsCount,
processedMessage: processedMessage,
}
}
@ -74,3 +84,19 @@ func (p *PrometheusMetrics) IncMissingToken(chain, token string) {
func (p *PrometheusMetrics) IncFoundToken(chain, token string) {
p.tokenRequestsCount.WithLabelValues(chain, token, "found").Inc()
}
func (p *PrometheusMetrics) IncExpiredMessage(chain, source string) {
p.processedMessage.WithLabelValues(chain, source, "expired").Inc()
}
func (p *PrometheusMetrics) IncInvalidMessage(chain, source string) {
p.processedMessage.WithLabelValues(chain, source, "invalid").Inc()
}
func (p *PrometheusMetrics) IncUnprocessedMessage(chain, source string) {
p.processedMessage.WithLabelValues(chain, source, "unprocessed").Inc()
}
func (p *PrometheusMetrics) IncProcessedMessage(chain, source string) {
p.processedMessage.WithLabelValues(chain, source, "processed").Inc()
}

View File

@ -37,6 +37,7 @@ func NewVaaConverter(log *zap.Logger) ConverterFunc {
return nil, err
}
return &Event{
Source: "pipeline",
TrackID: fmt.Sprintf("pipeline-%s", vaaEvent.ID),
ID: vaaEvent.ID,
ChainID: vaaEvent.ChainID,
@ -73,6 +74,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
}
return &Event{
Source: "chain-event",
TrackID: notification.TrackID,
ID: signedVaa.ID,
ChainID: signedVaa.EmitterChain,
@ -100,6 +102,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
}
return &Event{
Source: "chain-event",
TrackID: notification.TrackID,
ID: vaa.MessageID(),
ChainID: plm.ChainID,

View File

@ -12,6 +12,7 @@ type sqsEvent struct {
// Event represents a event data to be handle.
type Event struct {
Source string
TrackID string
ID string
ChainID uint16

View File

@ -49,7 +49,6 @@ func Run() {
logger.Info("Starting wormhole-explorer-tx-tracker ...")
// create rpc pool
// TODO: review: RpcProviderSettings
rpcPool, err := newRpcPool(cfg)
if err != nil {
logger.Fatal("Failed to initialize rpc pool: ", zap.Error(err))

View File

@ -176,6 +176,7 @@ func (c *Consumer) processTargetTx(ctx context.Context, msg queue.ConsumerMessag
From: attr.From,
To: attr.To,
Status: attr.Status,
Metrics: c.metrics,
}
err := ProcessTargetTx(ctx, c.logger, c.repository, &p)

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
@ -29,6 +30,7 @@ type ProcessTargetTxParams struct {
From string
To string
Status string
Metrics metrics.Metrics
}
func ProcessTargetTx(
@ -61,7 +63,11 @@ func ProcessTargetTx(
logger.Warn("Transaction should not be updated", zap.String("vaaId", params.VaaId), zap.Error(err))
return nil
}
return repository.UpsertTargetTx(ctx, update)
err = repository.UpsertTargetTx(ctx, update)
if err == nil {
params.Metrics.IncDestinationTxInserted(uint16(params.ChainId))
}
return err
}
func checkTxShouldBeUpdated(ctx context.Context, tx *TargetTxUpdate, repository *Repository) (bool, error) {

View File

@ -17,6 +17,9 @@ func (d *DummyMetrics) IncVaaUnfiltered(chainID uint16) {}
// IncOriginTxInserted is a dummy implementation of IncOriginTxInserted.
func (d *DummyMetrics) IncOriginTxInserted(chainID uint16) {}
// IncDestinationTxInserted is a dummy implementation of IncDestinationTxInserted.
func (d *DummyMetrics) IncDestinationTxInserted(chainID uint16) {}
// IncVaaWithoutTxHash is a dummy implementation of IncVaaWithoutTxHash.
func (d *DummyMetrics) IncVaaWithoutTxHash(chainID uint16) {}

View File

@ -8,6 +8,7 @@ type Metrics interface {
IncOriginTxInserted(chainID uint16)
IncVaaWithoutTxHash(chainID uint16)
IncVaaWithTxHashFixed(chainID uint16)
IncDestinationTxInserted(chainID uint16)
AddVaaProcessedDuration(chainID uint16, duration float64)
IncCallRpcSuccess(chainID uint16, rpc string)
IncCallRpcError(chainID uint16, rpc string)

View File

@ -91,6 +91,12 @@ func (m *PrometheusMetrics) IncOriginTxInserted(chainID uint16) {
m.vaaTxTrackerCount.WithLabelValues(chain, "origin_tx_inserted").Inc()
}
// IncDestinationTxInserted increments the number of inserted destination tx.
func (m *PrometheusMetrics) IncDestinationTxInserted(chainID uint16) {
chain := vaa.ChainID(chainID).String()
m.vaaTxTrackerCount.WithLabelValues(chain, "destination_tx_inserted").Inc()
}
// AddVaaProcessedDuration adds the duration of vaa processing.
func (m *PrometheusMetrics) AddVaaProcessedDuration(chainID uint16, duration float64) {
chain := vaa.ChainID(chainID).String()