Add processed message metrics in pipeline components (#1239)

Co-authored-by: walker-16 <agpazos85@gmail.com>
This commit is contained in:
ftocal 2024-03-25 15:21:10 -03:00 committed by GitHub
parent b19bc16dfb
commit adb7231074
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 89 additions and 34 deletions

View File

@ -30,11 +30,13 @@ func (c *Consumer) Start(ctx context.Context) {
for msg := range c.consume(ctx) {
event := msg.Data()
chainID := sdk.ChainID(event.ChainID).String()
// check id message is expired.
if msg.IsExpired() {
msg.Failed()
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID))
c.metrics.IncExpiredMessage(c.p2pNetwork, event.Source)
c.metrics.IncExpiredMessage(chainID, event.Source)
continue
}
@ -43,7 +45,7 @@ func (c *Consumer) Start(ctx context.Context) {
if err != nil {
msg.Done()
c.logger.Error("Invalid vaa", zap.String("id", event.ID), zap.Error(err))
c.metrics.IncInvalidMessage(c.p2pNetwork, event.Source)
c.metrics.IncInvalidMessage(chainID, event.Source)
continue
}
@ -51,13 +53,13 @@ func (c *Consumer) Start(ctx context.Context) {
err = c.pushMetric(ctx, &metric.Params{TrackID: event.TrackID, Vaa: vaa, VaaIsSigned: event.VaaIsSigned})
if err != nil {
msg.Failed()
c.metrics.IncUnprocessedMessage(c.p2pNetwork, event.Source)
c.metrics.IncUnprocessedMessage(chainID, event.Source)
continue
}
msg.Done()
c.logger.Debug("Pushed vaa metric", zap.String("id", event.ID))
c.metrics.IncProcessedMessage(c.p2pNetwork, event.Source)
c.metrics.IncProcessedMessage(chainID, event.Source)
}
}()
}

View File

@ -6,6 +6,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/parser/processor"
"github.com/wormhole-foundation/wormhole-explorer/parser/queue"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
@ -28,13 +29,15 @@ func (c *Consumer) Start(ctx context.Context) {
for msg := range c.consume(ctx) {
event := msg.Data()
emitterChainID := sdk.ChainID(event.ChainID).String()
// check id message is expired.
if msg.IsExpired() {
c.metrics.IncExpiredMessage(emitterChainID, event.Source)
c.logger.Warn("Event expired", zap.String("id", event.ID))
msg.Failed()
continue
}
c.metrics.IncVaaUnexpired(event.ChainID)
params := &processor.Params{
TrackID: event.TrackID,
@ -42,6 +45,7 @@ func (c *Consumer) Start(ctx context.Context) {
}
_, err := c.process(ctx, params)
if err != nil {
c.metrics.IncUnprocessedMessage(emitterChainID, event.Source)
c.logger.Error("Error processing event",
zap.String("trackId", event.TrackID),
zap.String("id", event.ID),
@ -49,6 +53,7 @@ func (c *Consumer) Start(ctx context.Context) {
msg.Failed()
continue
} else {
c.metrics.IncProcessedMessage(emitterChainID, event.Source)
c.logger.Debug("Event processed",
zap.String("trackId", event.TrackID),
zap.String("id", event.ID))

View File

@ -35,3 +35,12 @@ func (d *DummyMetrics) IncVaaPayloadParserSuccessCount(chainID uint16) {}
// IncVaaPayloadParserSuccessCount increments the number of vaa payload parser success.
func (d *DummyMetrics) IncVaaPayloadParserNotFoundCount(chainID uint16) {}
// IncExpiredMessage increments the number of expired message.
func (p *DummyMetrics) IncExpiredMessage(chain, source string) {}
// IncUnprocessedMessage increments the number of unprocessed message.
func (p *DummyMetrics) IncUnprocessedMessage(chain, source string) {}
// IncProcessedMessage increments the number of processed message.
func (p *DummyMetrics) IncProcessedMessage(chain, source string) {}

View File

@ -5,7 +5,6 @@ const serviceName = "wormscan-parser"
type Metrics interface {
IncVaaConsumedQueue(chainID uint16)
IncVaaUnfiltered(chainID uint16)
IncVaaUnexpired(chainID uint16)
IncVaaParsed(chainID uint16)
IncVaaParsedInserted(chainID uint16)
@ -13,4 +12,8 @@ type Metrics interface {
IncVaaPayloadParserErrorCount(chainID uint16)
IncVaaPayloadParserNotFoundCount(chainID uint16)
IncVaaPayloadParserSuccessCount(chainID uint16)
IncExpiredMessage(chain, source string)
IncUnprocessedMessage(chain, source string)
IncProcessedMessage(chain, source string)
}

View File

@ -11,6 +11,7 @@ type PrometheusMetrics struct {
vaaParseCount *prometheus.CounterVec
vaaPayloadParserRequest *prometheus.CounterVec
vaaPayloadParserResponseCount *prometheus.CounterVec
processedMessage *prometheus.CounterVec
}
// NewPrometheusMetrics returns a new instance of PrometheusMetrics.
@ -42,10 +43,22 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
"service": serviceName,
},
}, []string{"chain", "status"})
processedMessage := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "processed_message",
Help: "Total number of processed message",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
},
[]string{"chain", "source", "status"},
)
return &PrometheusMetrics{
vaaParseCount: vaaParseCount,
vaaPayloadParserRequest: vaaPayloadParserRequestCount,
vaaPayloadParserResponseCount: vaaPayloadParserResponseCount,
processedMessage: processedMessage,
}
}
@ -102,3 +115,18 @@ func (m *PrometheusMetrics) IncVaaPayloadParserNotFoundCount(chainID uint16) {
chain := vaa.ChainID(chainID).String()
m.vaaPayloadParserResponseCount.WithLabelValues(chain, "not_found").Inc()
}
// IncExpiredMessage increments the number of expired message.
func (p *PrometheusMetrics) IncExpiredMessage(chain, source string) {
p.processedMessage.WithLabelValues(chain, source, "expired").Inc()
}
// IncUnprocessedMessage increments the number of unprocessed message.
func (p *PrometheusMetrics) IncUnprocessedMessage(chain, source string) {
p.processedMessage.WithLabelValues(chain, source, "unprocessed").Inc()
}
// IncProcessedMessage increments the number of processed message.
func (p *PrometheusMetrics) IncProcessedMessage(chain, source string) {
p.processedMessage.WithLabelValues(chain, source, "processed").Inc()
}

View File

@ -37,6 +37,7 @@ func NewVaaConverter(log *zap.Logger) ConverterFunc {
return nil, err
}
return &Event{
Source: "pipeline",
TrackID: fmt.Sprintf("pipeline-%s", vaaEvent.ID),
ID: vaaEvent.ID,
ChainID: vaaEvent.ChainID,
@ -73,6 +74,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
}
return &Event{
Source: "chain-event",
TrackID: notification.TrackID,
ID: signedVaaEvent.ID,
ChainID: signedVaaEvent.EmitterChain,
@ -101,6 +103,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
}
return &Event{
Source: "chain-event",
TrackID: notification.TrackID,
ID: vaa.MessageID(),
ChainID: plm.ChainID,

View File

@ -12,6 +12,7 @@ type sqsEvent struct {
// Event represents a event data to be handle.
type Event struct {
Source string
TrackID string
ID string
ChainID uint16

View File

@ -97,7 +97,7 @@ func (c *Consumer) processSourceTx(ctx context.Context, msg queue.ConsumerMessag
start := time.Now()
c.metrics.IncVaaUnfiltered(uint16(event.ChainID))
c.metrics.IncVaaUnfiltered(event.ChainID.String(), event.Source)
// Process the VAA
p := ProcessSourceTxParams{
@ -147,7 +147,7 @@ func (c *Consumer) processSourceTx(ctx context.Context, msg queue.ConsumerMessag
zap.String("id", event.ID),
elapsedLog,
)
c.metrics.IncOriginTxInserted(uint16(event.ChainID))
c.metrics.IncOriginTxInserted(event.ChainID.String(), event.Source)
}
}
@ -165,9 +165,10 @@ func (c *Consumer) processTargetTx(ctx context.Context, msg queue.ConsumerMessag
// Process the VAA
p := ProcessTargetTxParams{
Source: event.Source,
TrackID: event.TrackID,
VaaId: event.ID,
ChainId: event.ChainID,
ChainID: event.ChainID,
Emitter: event.EmitterAddress,
TxHash: event.TxHash,
BlockTimestamp: event.Timestamp,

View File

@ -19,9 +19,10 @@ var (
// ProcessSourceTxParams is a struct that contains the parameters for the ProcessSourceTx method.
type ProcessTargetTxParams struct {
Source string
TrackID string
VaaId string
ChainId sdk.ChainID
ChainID sdk.ChainID
Emitter string
TxHash string
BlockTimestamp *time.Time
@ -45,7 +46,7 @@ func ProcessTargetTx(
ID: params.VaaId,
TrackID: params.TrackID,
Destination: &DestinationTx{
ChainID: params.ChainId,
ChainID: params.ChainID,
Status: params.Status,
TxHash: params.TxHash,
BlockNumber: params.BlockHeight,
@ -65,7 +66,7 @@ func ProcessTargetTx(
}
err = repository.UpsertTargetTx(ctx, update)
if err == nil {
params.Metrics.IncDestinationTxInserted(uint16(params.ChainId))
params.Metrics.IncDestinationTxInserted(params.ChainID.String(), params.Source)
}
return err
}

View File

@ -9,16 +9,16 @@ func NewDummyMetrics() *DummyMetrics {
}
// IncVaaConsumedQueue is a dummy implementation of IncVaaConsumedQueue.
func (d *DummyMetrics) IncVaaConsumedQueue(chainID uint16) {}
func (d *DummyMetrics) IncVaaConsumedQueue(chainID string, source string) {}
// IncVaaUnfiltered is a dummy implementation of IncVaaUnfiltered.
func (d *DummyMetrics) IncVaaUnfiltered(chainID uint16) {}
func (d *DummyMetrics) IncVaaUnfiltered(chainID string, source string) {}
// IncOriginTxInserted is a dummy implementation of IncOriginTxInserted.
func (d *DummyMetrics) IncOriginTxInserted(chainID uint16) {}
func (d *DummyMetrics) IncOriginTxInserted(chainID string, source string) {}
// IncDestinationTxInserted is a dummy implementation of IncDestinationTxInserted.
func (d *DummyMetrics) IncDestinationTxInserted(chainID uint16) {}
func (d *DummyMetrics) IncDestinationTxInserted(chainID string, source string) {}
// IncVaaWithoutTxHash is a dummy implementation of IncVaaWithoutTxHash.
func (d *DummyMetrics) IncVaaWithoutTxHash(chainID uint16) {}

View File

@ -3,12 +3,12 @@ package metrics
const serviceName = "wormscan-tx-tracker"
type Metrics interface {
IncVaaConsumedQueue(chainID uint16)
IncVaaUnfiltered(chainID uint16)
IncOriginTxInserted(chainID uint16)
IncVaaConsumedQueue(chainID string, source string)
IncVaaUnfiltered(chainID string, source string)
IncOriginTxInserted(chainID string, source string)
IncVaaWithoutTxHash(chainID uint16)
IncVaaWithTxHashFixed(chainID uint16)
IncDestinationTxInserted(chainID uint16)
IncDestinationTxInserted(chainID string, source string)
AddVaaProcessedDuration(chainID uint16, duration float64)
IncCallRpcSuccess(chainID uint16, rpc string)
IncCallRpcError(chainID uint16, rpc string)

View File

@ -27,7 +27,7 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
"environment": environment,
"service": serviceName,
},
}, []string{"chain", "type"})
}, []string{"chain", "source", "type"})
vaaProcesedDuration := promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "vaa_processed_duration",
Help: "Duration of vaa processing",
@ -74,27 +74,23 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
}
// IncVaaConsumedQueue increments the number of consumed VAA.
func (m *PrometheusMetrics) IncVaaConsumedQueue(chainID uint16) {
chain := vaa.ChainID(chainID).String()
m.vaaTxTrackerCount.WithLabelValues(chain, "consumed_queue").Inc()
func (m *PrometheusMetrics) IncVaaConsumedQueue(chainID string, source string) {
m.vaaTxTrackerCount.WithLabelValues(chainID, source, "consumed_queue").Inc()
}
// IncVaaUnfiltered increments the number of unfiltered VAA.
func (m *PrometheusMetrics) IncVaaUnfiltered(chainID uint16) {
chain := vaa.ChainID(chainID).String()
m.vaaTxTrackerCount.WithLabelValues(chain, "unfiltered").Inc()
func (m *PrometheusMetrics) IncVaaUnfiltered(chainID string, source string) {
m.vaaTxTrackerCount.WithLabelValues(chainID, source, "unfiltered").Inc()
}
// IncOriginTxInserted increments the number of inserted origin tx.
func (m *PrometheusMetrics) IncOriginTxInserted(chainID uint16) {
chain := vaa.ChainID(chainID).String()
m.vaaTxTrackerCount.WithLabelValues(chain, "origin_tx_inserted").Inc()
func (m *PrometheusMetrics) IncOriginTxInserted(chainID string, source string) {
m.vaaTxTrackerCount.WithLabelValues(chainID, source, "origin_tx_inserted").Inc()
}
// IncDestinationTxInserted increments the number of inserted destination tx.
func (m *PrometheusMetrics) IncDestinationTxInserted(chainID uint16) {
chain := vaa.ChainID(chainID).String()
m.vaaTxTrackerCount.WithLabelValues(chain, "destination_tx_inserted").Inc()
func (m *PrometheusMetrics) IncDestinationTxInserted(chainID string, source string) {
m.vaaTxTrackerCount.WithLabelValues(chainID, source, "destination_tx_inserted").Inc()
}
// AddVaaProcessedDuration adds the duration of vaa processing.

View File

@ -38,6 +38,7 @@ func NewVaaConverter(log *zap.Logger) ConverterFunc {
return nil, err
}
return &Event{
Source: "pipeline",
TrackID: fmt.Sprintf("pipeline-%s", vaaEvent.ID),
Type: SourceChainEvent,
ID: vaaEvent.ID,
@ -80,6 +81,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
}
return &Event{
Source: "chain-event",
TrackID: notification.TrackID,
Type: SourceChainEvent,
ID: signedVaa.ID,
@ -104,6 +106,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
}
return &Event{
Source: "chain-event",
TrackID: notification.TrackID,
Type: SourceChainEvent,
ID: vaa.MessageID(),
@ -136,6 +139,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
}
return &Event{
Source: "chain-event",
TrackID: notification.TrackID,
Type: TargetChainEvent,
ID: vaa.MessageID(),
@ -171,6 +175,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
}
return &Event{
Source: "chain-event",
TrackID: notification.TrackID,
Type: TargetChainEvent,
ID: vaa.MessageID(),

View File

@ -95,7 +95,7 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
}
continue
}
q.metrics.IncVaaConsumedQueue(uint16(event.ChainID))
q.metrics.IncVaaConsumedQueue(event.ChainID.String(), event.Source)
retry, _ := strconv.Atoi(msg.Attributes["ApproximateReceiveCount"])
q.wg.Add(1)

View File

@ -39,6 +39,7 @@ type EventAttributes interface {
// Event represents a event data to be handle.
type Event struct {
Source string
TrackID string
Type EventType
ID string