From 9025f6f6400d901e3e76f2ffda17e5b997e73b84 Mon Sep 17 00:00:00 2001 From: Agustin Pazos Date: Thu, 6 Jul 2023 18:04:16 -0300 Subject: [PATCH] Add metrics client --- parser/queue/vaa_sqs.go | 2 +- tx-tracker/cmd/service/main.go | 18 ++++++-- tx-tracker/config/structs.go | 2 + tx-tracker/consumer/consumer.go | 4 +- tx-tracker/consumer/workerpool.go | 6 +++ tx-tracker/internal/metrics/dummy.go | 22 ++++++++++ tx-tracker/internal/metrics/metrics.go | 10 +++++ tx-tracker/internal/metrics/prometheus.go | 52 +++++++++++++++++++++++ tx-tracker/queue/vaa_sqs.go | 6 ++- 9 files changed, 116 insertions(+), 6 deletions(-) create mode 100644 tx-tracker/internal/metrics/dummy.go create mode 100644 tx-tracker/internal/metrics/metrics.go create mode 100644 tx-tracker/internal/metrics/prometheus.go diff --git a/parser/queue/vaa_sqs.go b/parser/queue/vaa_sqs.go index 804025e4..91fb42e6 100644 --- a/parser/queue/vaa_sqs.go +++ b/parser/queue/vaa_sqs.go @@ -77,7 +77,7 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage { q.logger.Error("Error decoding vaaEvent message from SQSEvent", zap.Error(err)) continue } - q.metrics.IncVaaConsumedQueue(vaaEvent.ChainID) + q.metrics.IncVaaConsumedQueue(uint16(vaaEvent.ChainID)) // filter vaaEvent by p2p net. if q.filterConsume(&vaaEvent) { diff --git a/tx-tracker/cmd/service/main.go b/tx-tracker/cmd/service/main.go index e2a1e4e5..d168abcc 100644 --- a/tx-tracker/cmd/service/main.go +++ b/tx-tracker/cmd/service/main.go @@ -21,6 +21,7 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" "github.com/wormhole-foundation/wormhole-explorer/txtracker/consumer" "github.com/wormhole-foundation/wormhole-explorer/txtracker/http/infrastructure" + "github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/txtracker/queue" "go.uber.org/zap" ) @@ -54,6 +55,9 @@ func main() { }() db := cli.Database(cfg.MongodbDatabase) + // initialize metrics + metrics := newMetrics(cfg) + // start serving /health and /ready endpoints healthChecks, err := makeHealthChecks(rootCtx, cfg, db) if err != nil { @@ -63,9 +67,9 @@ func main() { server.Start() // create and start a consumer. - vaaConsumeFunc := newVAAConsumeFunc(rootCtx, cfg, logger) + vaaConsumeFunc := newVAAConsumeFunc(rootCtx, cfg, metrics, logger) repository := consumer.NewRepository(logger, db) - consumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, rootCtx, logger, repository) + consumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, rootCtx, logger, repository, metrics) consumer.Start(rootCtx) logger.Info("Started wormhole-explorer-tx-tracker") @@ -91,6 +95,7 @@ func main() { func newVAAConsumeFunc( ctx context.Context, cfg *config.ServiceSettings, + metrics metrics.Metrics, logger *zap.Logger, ) queue.VAAConsumeFunc { @@ -99,7 +104,7 @@ func newVAAConsumeFunc( logger.Fatal("failed to create sqs consumer", zap.Error(err)) } - vaaQueue := queue.NewVaaSqs(sqsConsumer, logger) + vaaQueue := queue.NewVaaSqs(sqsConsumer, metrics, logger) return vaaQueue.Consume } @@ -175,3 +180,10 @@ func makeHealthChecks( return plugins, nil } + +func newMetrics(cfg *config.ServiceSettings) metrics.Metrics { + if !cfg.MetricsEnabled { + return metrics.NewDummyMetrics() + } + return metrics.NewPrometheusMetrics(cfg.Environment) +} diff --git a/tx-tracker/config/structs.go b/tx-tracker/config/structs.go index e7f3cc8c..d7587b5b 100644 --- a/tx-tracker/config/structs.go +++ b/tx-tracker/config/structs.go @@ -37,8 +37,10 @@ type BackfillerSettings struct { type ServiceSettings struct { // MonitoringPort defines the TCP port for the /health and /ready endpoints. MonitoringPort string `split_words:"true" default:"8000"` + Environment string `split_words:"true" required:"true"` LogLevel string `split_words:"true" default:"INFO"` PprofEnabled bool `split_words:"true" default:"false"` + MetricsEnabled bool `split_words:"true" default:"false"` AwsSettings MongodbSettings diff --git a/tx-tracker/consumer/consumer.go b/tx-tracker/consumer/consumer.go index 30815b66..74f7b2a7 100644 --- a/tx-tracker/consumer/consumer.go +++ b/tx-tracker/consumer/consumer.go @@ -5,6 +5,7 @@ import ( "time" "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" + "github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/txtracker/queue" "go.uber.org/zap" ) @@ -30,9 +31,10 @@ func New( ctx context.Context, logger *zap.Logger, repository *Repository, + metrics metrics.Metrics, ) *Consumer { - workerPool := NewWorkerPool(ctx, logger, rpcServiceProviderSettings, repository) + workerPool := NewWorkerPool(ctx, logger, rpcServiceProviderSettings, repository, metrics) c := Consumer{ consumeFunc: consumeFunc, diff --git a/tx-tracker/consumer/workerpool.go b/tx-tracker/consumer/workerpool.go index 6e09e695..a2ee5a2d 100644 --- a/tx-tracker/consumer/workerpool.go +++ b/tx-tracker/consumer/workerpool.go @@ -7,6 +7,7 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/txtracker/chains" "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" + "github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/txtracker/queue" sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" @@ -22,6 +23,7 @@ type WorkerPool struct { logger *zap.Logger rpcProviderSettings *config.RpcProviderSettings repository *Repository + metrics metrics.Metrics } // NewWorkerPool creates a new worker pool. @@ -30,6 +32,7 @@ func NewWorkerPool( logger *zap.Logger, rpcProviderSettings *config.RpcProviderSettings, repository *Repository, + metrics metrics.Metrics, ) *WorkerPool { w := WorkerPool{ @@ -111,12 +114,14 @@ func (w *WorkerPool) process(msg queue.ConsumerMessage) { msg.Failed() return } + w.metrics.IncVaaUnexpired(uint16(event.ChainID)) // Do not process messages from PythNet if event.ChainID == sdk.ChainIDPythNet { msg.Done() return } + w.metrics.IncVaaUnfiltered(uint16(event.ChainID)) // Process the VAA p := ProcessSourceTxParams{ @@ -139,6 +144,7 @@ func (w *WorkerPool) process(msg queue.ConsumerMessage) { zap.Error(err), ) } else { + w.metrics.IncOriginTxInserted(uint16(event.ChainID)) w.logger.Info("Updated originTx in the database", zap.String("id", event.ID), ) diff --git a/tx-tracker/internal/metrics/dummy.go b/tx-tracker/internal/metrics/dummy.go new file mode 100644 index 00000000..fe14467f --- /dev/null +++ b/tx-tracker/internal/metrics/dummy.go @@ -0,0 +1,22 @@ +package metrics + +// DummyMetrics is a dummy implementation of Metric interface. +type DummyMetrics struct { +} + +// NewDummyMetrics returns a new instance of DummyMetrics. +func NewDummyMetrics() *DummyMetrics { + return &DummyMetrics{} +} + +// IncVaaConsumedQueue is a dummy implementation of IncVaaConsumedQueue. +func (d *DummyMetrics) IncVaaConsumedQueue(chainID uint16) {} + +// IncVaaUnexpired is a dummy implementation of IncVaaUnexpired. +func (d *DummyMetrics) IncVaaUnexpired(chainID uint16) {} + +// IncVaaUnfiltered is a dummy implementation of IncVaaUnfiltered. +func (d *DummyMetrics) IncVaaUnfiltered(chainID uint16) {} + +// IncOriginTxInserted is a dummy implementation of IncOriginTxInserted. +func (d *DummyMetrics) IncOriginTxInserted(chainID uint16) {} diff --git a/tx-tracker/internal/metrics/metrics.go b/tx-tracker/internal/metrics/metrics.go new file mode 100644 index 00000000..db136fa7 --- /dev/null +++ b/tx-tracker/internal/metrics/metrics.go @@ -0,0 +1,10 @@ +package metrics + +const serviceName = "wormscan-tx-tracker" + +type Metrics interface { + IncVaaConsumedQueue(chainID uint16) + IncVaaUnexpired(chainID uint16) + IncVaaUnfiltered(chainID uint16) + IncOriginTxInserted(chainID uint16) +} diff --git a/tx-tracker/internal/metrics/prometheus.go b/tx-tracker/internal/metrics/prometheus.go new file mode 100644 index 00000000..5a8f533c --- /dev/null +++ b/tx-tracker/internal/metrics/prometheus.go @@ -0,0 +1,52 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/wormhole-foundation/wormhole/sdk/vaa" +) + +// PrometheusMetrics is a Prometheus implementation of Metric interface. +type PrometheusMetrics struct { + vaaTxTrackerCount *prometheus.CounterVec +} + +// NewPrometheusMetrics returns a new instance of PrometheusMetrics. +func NewPrometheusMetrics(environment string) *PrometheusMetrics { + vaaTxTrackerCount := promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "vaa_tx_tracker_count_by_chain", + Help: "Total number of vaa processed by tx tracker by chain", + ConstLabels: map[string]string{ + "environment": environment, + "service": serviceName, + }, + }, []string{"chain", "type"}) + return &PrometheusMetrics{ + vaaTxTrackerCount: vaaTxTrackerCount, + } +} + +// IncVaaConsumedQueue increments the number of consumed VAA. +func (m *PrometheusMetrics) IncVaaConsumedQueue(chainID uint16) { + chain := vaa.ChainID(chainID).String() + m.vaaTxTrackerCount.WithLabelValues(chain, "consumed_queue").Inc() +} + +// IncVaaUnexpired increments the number of unexpired VAA. +func (m *PrometheusMetrics) IncVaaUnexpired(chainID uint16) { + chain := vaa.ChainID(chainID).String() + m.vaaTxTrackerCount.WithLabelValues(chain, "unexpired").Inc() +} + +// IncVaaUnfiltered increments the number of unfiltered VAA. +func (m *PrometheusMetrics) IncVaaUnfiltered(chainID uint16) { + chain := vaa.ChainID(chainID).String() + m.vaaTxTrackerCount.WithLabelValues(chain, "unfiltered").Inc() +} + +// IncOriginTxInserted increments the number of inserted origin tx. +func (m *PrometheusMetrics) IncOriginTxInserted(chainID uint16) { + chain := vaa.ChainID(chainID).String() + m.vaaTxTrackerCount.WithLabelValues(chain, "origin_tx_inserted").Inc() +} diff --git a/tx-tracker/queue/vaa_sqs.go b/tx-tracker/queue/vaa_sqs.go index 58dfa015..0cbde3b1 100644 --- a/tx-tracker/queue/vaa_sqs.go +++ b/tx-tracker/queue/vaa_sqs.go @@ -9,6 +9,7 @@ import ( "go.uber.org/zap" sqs_client "github.com/wormhole-foundation/wormhole-explorer/common/client/sqs" + "github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics" ) // SQSOption represents a VAA queue in SQS option function. @@ -20,6 +21,7 @@ type SQS struct { ch chan ConsumerMessage chSize int wg sync.WaitGroup + metrics metrics.Metrics logger *zap.Logger } @@ -27,9 +29,10 @@ type SQS struct { type FilterConsumeFunc func(vaaEvent *VaaEvent) bool // NewVaaSqs creates a VAA queue in SQS instances. -func NewVaaSqs(consumer *sqs_client.Consumer, logger *zap.Logger, opts ...SQSOption) *SQS { +func NewVaaSqs(consumer *sqs_client.Consumer, metrics metrics.Metrics, logger *zap.Logger, opts ...SQSOption) *SQS { s := &SQS{ consumer: consumer, + metrics: metrics, chSize: 10, logger: logger} for _, opt := range opts { @@ -72,6 +75,7 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage { q.logger.Error("Error decoding vaaEvent message from SQSEvent", zap.Error(err)) continue } + q.metrics.IncVaaConsumedQueue(uint16(vaaEvent.ChainID)) q.wg.Add(1) q.ch <- &sqsConsumerMessage{