Add processed metrics to analytics component (#1237)
Co-authored-by: ftocal <fert1335@gmail.com>
This commit is contained in:
parent
2980347e00
commit
e843100678
|
@ -112,13 +112,13 @@ func Run() {
|
|||
// create and start a vaa consumer.
|
||||
logger.Info("initializing vaa consumer...")
|
||||
vaaConsumeFunc := newVAAConsumeFunc(rootCtx, config, logger)
|
||||
vaaConsumer := consumer.New(vaaConsumeFunc, metric.Push, logger, config.P2pNetwork)
|
||||
vaaConsumer := consumer.New(vaaConsumeFunc, metric.Push, logger, metrics, config.P2pNetwork)
|
||||
vaaConsumer.Start(rootCtx)
|
||||
|
||||
// create and start a notification consumer.
|
||||
logger.Info("initializing notification consumer...")
|
||||
notificationConsumeFunc := newNotificationConsumeFunc(rootCtx, config, logger)
|
||||
notificationConsumer := consumer.New(notificationConsumeFunc, metric.Push, logger, config.P2pNetwork)
|
||||
notificationConsumer := consumer.New(notificationConsumeFunc, metric.Push, logger, metrics, config.P2pNetwork)
|
||||
notificationConsumer.Start(rootCtx)
|
||||
|
||||
// create and start server.
|
||||
|
|
|
@ -3,6 +3,7 @@ package consumer
|
|||
import (
|
||||
"context"
|
||||
|
||||
"github.com/wormhole-foundation/wormhole-explorer/analytics/internal/metrics"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/analytics/metric"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/analytics/queue"
|
||||
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
|
||||
|
@ -14,12 +15,13 @@ type Consumer struct {
|
|||
consume queue.ConsumeFunc
|
||||
pushMetric metric.MetricPushFunc
|
||||
logger *zap.Logger
|
||||
metrics metrics.Metrics
|
||||
p2pNetwork string
|
||||
}
|
||||
|
||||
// New creates a new vaa consumer.
|
||||
func New(consume queue.ConsumeFunc, pushMetric metric.MetricPushFunc, logger *zap.Logger, p2pNetwork string) *Consumer {
|
||||
return &Consumer{consume: consume, pushMetric: pushMetric, logger: logger, p2pNetwork: p2pNetwork}
|
||||
func New(consume queue.ConsumeFunc, pushMetric metric.MetricPushFunc, logger *zap.Logger, metrics metrics.Metrics, p2pNetwork string) *Consumer {
|
||||
return &Consumer{consume: consume, pushMetric: pushMetric, logger: logger, metrics: metrics, p2pNetwork: p2pNetwork}
|
||||
}
|
||||
|
||||
// Start consumes messages from VAA queue, parse and store those messages in a repository.
|
||||
|
@ -30,16 +32,18 @@ func (c *Consumer) Start(ctx context.Context) {
|
|||
|
||||
// check id message is expired.
|
||||
if msg.IsExpired() {
|
||||
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID))
|
||||
msg.Failed()
|
||||
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID))
|
||||
c.metrics.IncExpiredMessage(c.p2pNetwork, event.Source)
|
||||
continue
|
||||
}
|
||||
|
||||
// unmarshal vaa.
|
||||
vaa, err := sdk.Unmarshal(event.Vaa)
|
||||
if err != nil {
|
||||
msg.Done()
|
||||
c.logger.Error("Invalid vaa", zap.String("id", event.ID), zap.Error(err))
|
||||
msg.Failed()
|
||||
c.metrics.IncInvalidMessage(c.p2pNetwork, event.Source)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -47,11 +51,13 @@ func (c *Consumer) Start(ctx context.Context) {
|
|||
err = c.pushMetric(ctx, &metric.Params{TrackID: event.TrackID, Vaa: vaa, VaaIsSigned: event.VaaIsSigned})
|
||||
if err != nil {
|
||||
msg.Failed()
|
||||
c.metrics.IncUnprocessedMessage(c.p2pNetwork, event.Source)
|
||||
continue
|
||||
}
|
||||
|
||||
msg.Done()
|
||||
c.logger.Debug("Pushed vaa metric", zap.String("id", event.ID))
|
||||
c.metrics.IncProcessedMessage(c.p2pNetwork, event.Source)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
|
@ -9,4 +9,8 @@ type Metrics interface {
|
|||
IncFoundNotional(symbol string)
|
||||
IncMissingToken(chain, token string)
|
||||
IncFoundToken(chain, token string)
|
||||
IncExpiredMessage(chain, source string)
|
||||
IncInvalidMessage(chain, source string)
|
||||
IncUnprocessedMessage(chain, source string)
|
||||
IncProcessedMessage(chain, source string)
|
||||
}
|
||||
|
|
|
@ -26,3 +26,15 @@ func (p *NoopMetrics) IncMissingToken(chain, token string) {
|
|||
|
||||
func (p *NoopMetrics) IncFoundToken(chain, token string) {
|
||||
}
|
||||
|
||||
func (p *NoopMetrics) IncExpiredMessage(chain, source string) {
|
||||
}
|
||||
|
||||
func (p *NoopMetrics) IncInvalidMessage(chain, source string) {
|
||||
}
|
||||
|
||||
func (p *NoopMetrics) IncUnprocessedMessage(chain, source string) {
|
||||
}
|
||||
|
||||
func (p *NoopMetrics) IncProcessedMessage(chain, source string) {
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ type PrometheusMetrics struct {
|
|||
measurementCount *prometheus.CounterVec
|
||||
notionalCount *prometheus.CounterVec
|
||||
tokenRequestsCount *prometheus.CounterVec
|
||||
processedMessage *prometheus.CounterVec
|
||||
}
|
||||
|
||||
// NewPrometheusMetrics returns a new instance of PrometheusMetrics.
|
||||
|
@ -44,10 +45,19 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
|
|||
[]string{"chain", "token", "status"},
|
||||
)
|
||||
|
||||
processedMessage := promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "processed_message",
|
||||
Help: "Total number of processed message",
|
||||
ConstLabels: constLabels,
|
||||
},
|
||||
[]string{"chain", "source", "status"},
|
||||
)
|
||||
return &PrometheusMetrics{
|
||||
measurementCount: measurementCount,
|
||||
notionalCount: notionalRequestsCount,
|
||||
tokenRequestsCount: tokenRequestsCount,
|
||||
processedMessage: processedMessage,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,3 +84,19 @@ func (p *PrometheusMetrics) IncMissingToken(chain, token string) {
|
|||
func (p *PrometheusMetrics) IncFoundToken(chain, token string) {
|
||||
p.tokenRequestsCount.WithLabelValues(chain, token, "found").Inc()
|
||||
}
|
||||
|
||||
func (p *PrometheusMetrics) IncExpiredMessage(chain, source string) {
|
||||
p.processedMessage.WithLabelValues(chain, source, "expired").Inc()
|
||||
}
|
||||
|
||||
func (p *PrometheusMetrics) IncInvalidMessage(chain, source string) {
|
||||
p.processedMessage.WithLabelValues(chain, source, "invalid").Inc()
|
||||
}
|
||||
|
||||
func (p *PrometheusMetrics) IncUnprocessedMessage(chain, source string) {
|
||||
p.processedMessage.WithLabelValues(chain, source, "unprocessed").Inc()
|
||||
}
|
||||
|
||||
func (p *PrometheusMetrics) IncProcessedMessage(chain, source string) {
|
||||
p.processedMessage.WithLabelValues(chain, source, "processed").Inc()
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ func NewVaaConverter(log *zap.Logger) ConverterFunc {
|
|||
return nil, err
|
||||
}
|
||||
return &Event{
|
||||
Source: "pipeline",
|
||||
TrackID: fmt.Sprintf("pipeline-%s", vaaEvent.ID),
|
||||
ID: vaaEvent.ID,
|
||||
ChainID: vaaEvent.ChainID,
|
||||
|
@ -73,6 +74,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
|
|||
}
|
||||
|
||||
return &Event{
|
||||
Source: "chain-event",
|
||||
TrackID: notification.TrackID,
|
||||
ID: signedVaa.ID,
|
||||
ChainID: signedVaa.EmitterChain,
|
||||
|
@ -100,6 +102,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
|
|||
}
|
||||
|
||||
return &Event{
|
||||
Source: "chain-event",
|
||||
TrackID: notification.TrackID,
|
||||
ID: vaa.MessageID(),
|
||||
ChainID: plm.ChainID,
|
||||
|
|
|
@ -12,6 +12,7 @@ type sqsEvent struct {
|
|||
|
||||
// Event represents a event data to be handle.
|
||||
type Event struct {
|
||||
Source string
|
||||
TrackID string
|
||||
ID string
|
||||
ChainID uint16
|
||||
|
|
Loading…
Reference in New Issue