Add vaa processing speed metrics in fly, analytics, parser and tx-tracker (#1531)

Co-authored-by: walker-16 <agpazos85@gmail.com>
This commit is contained in:
ftocal 2024-07-03 11:23:54 -03:00 committed by GitHub
parent 4e4cfab138
commit 0bfd9e554b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 350 additions and 219 deletions

View File

@ -60,6 +60,7 @@ func (c *Consumer) Start(ctx context.Context) {
msg.Done()
c.logger.Debug("Pushed vaa metric", zap.String("id", event.ID))
c.metrics.IncProcessedMessage(chainID, event.Source, msg.Retry())
c.metrics.VaaProcessingDuration(chainID, msg.SentTimestamp())
}
}()
}

View File

@ -1,5 +1,7 @@
package metrics
import "time"
const serviceName = "wormscan-analytics"
type Metrics interface {
@ -13,4 +15,5 @@ type Metrics interface {
IncInvalidMessage(chain, source string, retry uint8)
IncUnprocessedMessage(chain, source string, retry uint8)
IncProcessedMessage(chain, source string, retry uint8)
VaaProcessingDuration(chain string, start *time.Time)
}

View File

@ -1,5 +1,7 @@
package metrics
import "time"
// NoopMetrics is a no-op implementation of the Metrics interface.
type NoopMetrics struct {
}
@ -38,3 +40,6 @@ func (p *NoopMetrics) IncUnprocessedMessage(chain, source string, retry uint8) {
func (p *NoopMetrics) IncProcessedMessage(chain, source string, retry uint8) {
}
func (m *NoopMetrics) VaaProcessingDuration(chain string, start *time.Time) {
}

View File

@ -2,16 +2,18 @@ package metrics
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type PrometheusMetrics struct {
measurementCount *prometheus.CounterVec
notionalCount *prometheus.CounterVec
tokenRequestsCount *prometheus.CounterVec
processedMessage *prometheus.CounterVec
measurementCount *prometheus.CounterVec
notionalCount *prometheus.CounterVec
tokenRequestsCount *prometheus.CounterVec
processedMessage *prometheus.CounterVec
vaaProcessingDuration *prometheus.HistogramVec
}
// NewPrometheusMetrics returns a new instance of PrometheusMetrics.
@ -55,11 +57,21 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
},
[]string{"chain", "source", "status", "retry"},
)
vaaProcessingDuration := promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "vaa_processing_duration_seconds",
Help: "Duration of all vaa processing by chain.",
ConstLabels: constLabels,
Buckets: []float64{.01, .05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 120, 300, 600, 1200},
},
[]string{"chain"},
)
return &PrometheusMetrics{
measurementCount: measurementCount,
notionalCount: notionalRequestsCount,
tokenRequestsCount: tokenRequestsCount,
processedMessage: processedMessage,
measurementCount: measurementCount,
notionalCount: notionalRequestsCount,
tokenRequestsCount: tokenRequestsCount,
processedMessage: processedMessage,
vaaProcessingDuration: vaaProcessingDuration,
}
}
@ -102,3 +114,11 @@ func (p *PrometheusMetrics) IncUnprocessedMessage(chain, source string, retry ui
func (p *PrometheusMetrics) IncProcessedMessage(chain, source string, retry uint8) {
p.processedMessage.WithLabelValues(chain, source, "processed", fmt.Sprintf("%d", retry)).Inc()
}
func (p *PrometheusMetrics) VaaProcessingDuration(chain string, start *time.Time) {
if start == nil {
return
}
elapsed := float64(time.Since(*start).Nanoseconds()) / 1e9
p.vaaProcessingDuration.WithLabelValues(chain).Observe(elapsed)
}

View File

@ -94,14 +94,15 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
retry, _ := strconv.Atoi(msg.Attributes["ApproximateReceiveCount"])
q.wg.Add(1)
q.ch <- &sqsConsumerMessage{
id: msg.ReceiptHandle,
data: event,
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
retry: uint8(retry),
expiredAt: expiredAt,
ctx: ctx,
id: msg.ReceiptHandle,
data: event,
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
retry: uint8(retry),
expiredAt: expiredAt,
sentTimestamp: sqs_client.GetSentTimestamp(msg),
ctx: ctx,
}
}
q.wg.Wait()
@ -117,14 +118,15 @@ func (q *SQS) Close() {
}
type sqsConsumerMessage struct {
data *Event
consumer *sqs_client.Consumer
wg *sync.WaitGroup
id *string
logger *zap.Logger
retry uint8
expiredAt time.Time
ctx context.Context
data *Event
consumer *sqs_client.Consumer
wg *sync.WaitGroup
id *string
logger *zap.Logger
retry uint8
expiredAt time.Time
sentTimestamp *time.Time
ctx context.Context
}
func (m *sqsConsumerMessage) Data() *Event {
@ -149,3 +151,7 @@ func (m *sqsConsumerMessage) IsExpired() bool {
func (m *sqsConsumerMessage) Retry() uint8 {
return m.retry
}
func (m *sqsConsumerMessage) SentTimestamp() *time.Time {
return m.sentTimestamp
}

View File

@ -30,6 +30,7 @@ type ConsumerMessage interface {
Done()
Failed()
IsExpired() bool
SentTimestamp() *time.Time
}
// ConsumeFunc is a function to consume VAAEvent.

View File

@ -2,6 +2,7 @@ package sqs
import (
"context"
"strconv"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
@ -113,3 +114,17 @@ func (c *Consumer) GetQueueAttributes(ctx context.Context) (*aws_sqs.GetQueueAtt
func (c *Consumer) GetQueueUrl() string {
return c.url
}
func GetSentTimestamp(msg aws_sqs_types.Message) *time.Time {
sentTimestampStr := msg.Attributes[string(aws_sqs_types.MessageSystemAttributeNameSentTimestamp)]
if sentTimestampStr == "" {
return nil
}
sentTimestampUInt, err := strconv.ParseUint(sentTimestampStr, 10, 64)
if err != nil {
return nil
}
sentTimestamp := time.Unix(0, int64(sentTimestampUInt)*int64(time.Millisecond))
return &sentTimestamp
}

View File

@ -1,6 +1,10 @@
package metrics
import sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
import (
"time"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
)
// DummyMetrics is a dummy implementation of Metric interface.
type DummyMetrics struct {
@ -81,3 +85,5 @@ func (m *DummyMetrics) IncNotFoundTxHash(t string) {}
func (m *DummyMetrics) IncConsistencyLevelByChainID(chainID sdk.ChainID, consistenceLevel uint8) {}
func (m *DummyMetrics) IncDuplicateVaaByChainID(chain sdk.ChainID) {}
func (m *DummyMetrics) VaaProcessingDuration(chain sdk.ChainID, start *time.Time) {}

View File

@ -1,6 +1,10 @@
package metrics
import sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
import (
"time"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
)
const serviceName = "wormscan-fly"
@ -47,4 +51,7 @@ type Metrics interface {
// duplicate vaa metrics
IncDuplicateVaaByChainID(chain sdk.ChainID)
// vaas processing duration
VaaProcessingDuration(chain sdk.ChainID, start *time.Time)
}

View File

@ -2,6 +2,7 @@ package metrics
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -22,125 +23,103 @@ type PrometheusMetrics struct {
txHashSearchCount *prometheus.CounterVec
consistenceLevelChainCount *prometheus.CounterVec
duplicateVaaByChainCount *prometheus.CounterVec
vaaProcessingDuration *prometheus.HistogramVec
}
// NewPrometheusMetrics returns a new instance of PrometheusMetrics.
func NewPrometheusMetrics(environment string) *PrometheusMetrics {
constLabels := map[string]string{
"environment": environment,
"service": serviceName,
}
vaaReceivedCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "vaa_count_by_chain",
Help: "Total number of vaa by chain",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "vaa_count_by_chain",
Help: "Total number of vaa by chain",
ConstLabels: constLabels,
}, []string{"chain", "type"})
vaaTotal := promauto.NewCounter(
prometheus.CounterOpts{
Name: "vaa_total",
Help: "Total number of vaa from Gossip network",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "vaa_total",
Help: "Total number of vaa from Gossip network",
ConstLabels: constLabels,
})
observationReceivedCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "observation_count_by_chain",
Help: "Total number of observation by chain",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "observation_count_by_chain",
Help: "Total number of observation by chain",
ConstLabels: constLabels,
}, []string{"chain", "type"})
observationTotal := promauto.NewCounter(
prometheus.CounterOpts{
Name: "observation_total",
Help: "Total number of observation from Gossip network",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "observation_total",
Help: "Total number of observation from Gossip network",
ConstLabels: constLabels,
})
observationReceivedByGuardian := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "observation_count_by_guardian",
Help: "Total number of observation by guardian",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "observation_count_by_guardian",
Help: "Total number of observation by guardian",
ConstLabels: constLabels,
}, []string{"guardian_address", "type"})
heartbeatReceivedCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "heartbeat_count_by_guardian",
Help: "Total number of heartbeat by guardian",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "heartbeat_count_by_guardian",
Help: "Total number of heartbeat by guardian",
ConstLabels: constLabels,
}, []string{"guardian_node", "type"})
governorConfigReceivedCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "governor_config_count_by_guardian",
Help: "Total number of governor config by guardian",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "governor_config_count_by_guardian",
Help: "Total number of governor config by guardian",
ConstLabels: constLabels,
}, []string{"guardian_node", "type"})
governorStatusReceivedCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "governor_status_count_by_guardian",
Help: "Total number of governor status by guardian",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "governor_status_count_by_guardian",
Help: "Total number of governor status by guardian",
ConstLabels: constLabels,
}, []string{"guardian_node", "type"})
maxSequenceCacheCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "max_sequence_cache_count_by_chain",
Help: "Total number of errors when updating max sequence cache",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "max_sequence_cache_count_by_chain",
Help: "Total number of errors when updating max sequence cache",
ConstLabels: constLabels,
}, []string{"chain"})
txHashSearchCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "tx_hash_search_count_by_store",
Help: "Total number of errors when updating max sequence cache",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "tx_hash_search_count_by_store",
Help: "Total number of errors when updating max sequence cache",
ConstLabels: constLabels,
}, []string{"store", "action"})
consistenceLevelChainCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "consistence_level_count_by_chain",
Help: "Total number of consistence level by chain",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "consistence_level_count_by_chain",
Help: "Total number of consistence level by chain",
ConstLabels: constLabels,
}, []string{"chain", "consistence_level"})
duplicateVaaByChainCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "duplicate_vaa_count_by_chain",
Help: "Total number of duplicate vaa by chain",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "duplicate_vaa_count_by_chain",
Help: "Total number of duplicate vaa by chain",
ConstLabels: constLabels,
}, []string{"chain"})
vaaProcessingDuration := promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "vaa_processing_duration_seconds",
Help: "Duration of all vaa processing by chain.",
ConstLabels: constLabels,
Buckets: []float64{.01, .05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 120, 300, 600, 1200},
},
[]string{"chain"},
)
return &PrometheusMetrics{
vaaReceivedCount: vaaReceivedCount,
vaaTotal: vaaTotal,
@ -154,6 +133,7 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
observationReceivedByGuardian: observationReceivedByGuardian,
consistenceLevelChainCount: consistenceLevelChainCount,
duplicateVaaByChainCount: duplicateVaaByChainCount,
vaaProcessingDuration: vaaProcessingDuration,
}
}
@ -279,3 +259,12 @@ func (m *PrometheusMetrics) IncConsistencyLevelByChainID(chainID sdk.ChainID, co
func (m *PrometheusMetrics) IncDuplicateVaaByChainID(chain sdk.ChainID) {
m.duplicateVaaByChainCount.WithLabelValues(chain.String()).Inc()
}
// VaaProcessingDuration increases the duration of vaa processing.
func (m *PrometheusMetrics) VaaProcessingDuration(chain sdk.ChainID, start *time.Time) {
if start == nil {
return
}
elapsed := float64(time.Since(*start).Nanoseconds()) / 1e9
m.vaaProcessingDuration.WithLabelValues(chain.String()).Observe(elapsed)
}

View File

@ -121,7 +121,7 @@ func (c *VAAQueueConsumer) Start(ctx context.Context) {
msg.Failed()
continue
}
c.metrics.VaaProcessingDuration(v.EmitterChain, msg.SentTimestamp())
msg.Done(ctx)
c.logger.Info("Vaa saved in repository", zap.String("id", v.MessageID()))
}

View File

@ -10,13 +10,14 @@ import (
)
type sqsConsumerMessage[T any] struct {
data T
consumer *sqs.Consumer
id *string
logger *zap.Logger
expiredAt time.Time
wg *sync.WaitGroup
ctx context.Context
data T
consumer *sqs.Consumer
id *string
logger *zap.Logger
expiredAt time.Time
wg *sync.WaitGroup
ctx context.Context
sentTimestamp *time.Time
}
func (m *sqsConsumerMessage[T]) Data() T {
@ -38,6 +39,10 @@ func (m *sqsConsumerMessage[T]) IsExpired() bool {
return m.expiredAt.Before(time.Now())
}
func (m *sqsConsumerMessage[T]) SentTimestamp() *time.Time {
return m.sentTimestamp
}
type memoryConsumerMessageQueue[T any] struct {
data T
}
@ -53,3 +58,7 @@ func (m *memoryConsumerMessageQueue[T]) Failed() {}
func (m *memoryConsumerMessageQueue[T]) IsExpired() bool {
return false
}
func (m *memoryConsumerMessageQueue[T]) SentTimestamp() *time.Time {
return nil
}

View File

@ -2,6 +2,7 @@ package queue
import (
"context"
"time"
)
// Message represents a message from a queue.
@ -10,6 +11,7 @@ type Message[T any] interface {
Done(context.Context)
Failed()
IsExpired() bool
SentTimestamp() *time.Time
}
// Observation represents a signed observation.

View File

@ -2,6 +2,7 @@ package queue
import (
"context"
"time"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
@ -60,3 +61,7 @@ func (m *memoryConsumerMessage) Failed() {}
func (m *memoryConsumerMessage) IsExpired() bool {
return false
}
func (m *memoryConsumerMessage) SentTimestamp() *time.Time {
return nil
}

View File

@ -6,6 +6,7 @@ import (
"sync"
"time"
common_sqs "github.com/wormhole-foundation/wormhole-explorer/common/client/sqs"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/sqs"
@ -74,13 +75,14 @@ func (q *VAASqs) Consume(ctx context.Context) <-chan Message[[]byte] {
//TODO check if callback is better than channel
q.wg.Add(1)
q.ch <- &sqsConsumerMessage[[]byte]{
id: msg.ReceiptHandle,
data: body,
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
expiredAt: expiredAt,
ctx: ctx,
id: msg.ReceiptHandle,
data: body,
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
expiredAt: expiredAt,
ctx: ctx,
sentTimestamp: common_sqs.GetSentTimestamp(msg),
}
}
q.wg.Wait()

View File

@ -58,6 +58,7 @@ func (c *Consumer) Start(ctx context.Context) {
zap.String("trackId", event.TrackID),
zap.String("id", event.ID))
}
c.metrics.VaaProcessingDuration(emitterChainID, msg.SentTimestamp())
msg.Done()
}
}()

View File

@ -1,5 +1,7 @@
package metrics
import "time"
// DummyMetrics is a dummy implementation of Metric interface.
type DummyMetrics struct {
}
@ -44,3 +46,6 @@ func (p *DummyMetrics) IncUnprocessedMessage(chain, source string) {}
// IncProcessedMessage increments the number of processed message.
func (p *DummyMetrics) IncProcessedMessage(chain, source string) {}
// VaaProcessingDuration increments the duration of VAA processing.
func (m *DummyMetrics) VaaProcessingDuration(chain string, start *time.Time) {}

View File

@ -1,5 +1,9 @@
package metrics
import (
"time"
)
const serviceName = "wormscan-parser"
type Metrics interface {
@ -16,4 +20,6 @@ type Metrics interface {
IncExpiredMessage(chain, source string)
IncUnprocessedMessage(chain, source string)
IncProcessedMessage(chain, source string)
VaaProcessingDuration(chain string, start *time.Time)
}

View File

@ -1,6 +1,8 @@
package metrics
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
@ -12,53 +14,56 @@ type PrometheusMetrics struct {
vaaPayloadParserRequest *prometheus.CounterVec
vaaPayloadParserResponseCount *prometheus.CounterVec
processedMessage *prometheus.CounterVec
vaaProcessingDuration *prometheus.HistogramVec
}
// NewPrometheusMetrics returns a new instance of PrometheusMetrics.
func NewPrometheusMetrics(environment string) *PrometheusMetrics {
constLabels := map[string]string{
"environment": environment,
"service": serviceName,
}
vaaParseCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "parse_vaa_count_by_chain",
Help: "Total number of vaa parser by chain",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "parse_vaa_count_by_chain",
Help: "Total number of vaa parser by chain",
ConstLabels: constLabels,
}, []string{"chain", "type"})
vaaPayloadParserRequestCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "parse_vaa_payload_request_count_by_chain",
Help: "Total number of request to payload parser component by chain",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "parse_vaa_payload_request_count_by_chain",
Help: "Total number of request to payload parser component by chain",
ConstLabels: constLabels,
}, []string{"chain"})
vaaPayloadParserResponseCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "parse_vaa_payload_response_count_by_chain",
Help: "Total number of response from payload parser component by chain",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "parse_vaa_payload_response_count_by_chain",
Help: "Total number of response from payload parser component by chain",
ConstLabels: constLabels,
}, []string{"chain", "status"})
processedMessage := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "processed_message",
Help: "Total number of processed message",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "processed_message",
Help: "Total number of processed message",
ConstLabels: constLabels,
},
[]string{"chain", "source", "status"},
)
vaaProcessingDuration := promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "vaa_processing_duration_seconds",
Help: "Duration of all vaa processing by chain.",
ConstLabels: constLabels,
Buckets: []float64{.01, .05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 120, 300, 600, 1200},
},
[]string{"chain"},
)
return &PrometheusMetrics{
vaaParseCount: vaaParseCount,
vaaPayloadParserRequest: vaaPayloadParserRequestCount,
vaaPayloadParserResponseCount: vaaPayloadParserResponseCount,
processedMessage: processedMessage,
vaaProcessingDuration: vaaProcessingDuration,
}
}
@ -130,3 +135,12 @@ func (p *PrometheusMetrics) IncUnprocessedMessage(chain, source string) {
func (p *PrometheusMetrics) IncProcessedMessage(chain, source string) {
p.processedMessage.WithLabelValues(chain, source, "processed").Inc()
}
// VaaProcessingDuration increases the duration of vaa processing.
func (p *PrometheusMetrics) VaaProcessingDuration(chain string, start *time.Time) {
if start == nil {
return
}
elapsed := float64(time.Since(*start).Nanoseconds()) / 1e9
p.vaaProcessingDuration.WithLabelValues(chain).Observe(elapsed)
}

View File

@ -6,6 +6,7 @@ import (
"sync"
"time"
common_sqs "github.com/wormhole-foundation/wormhole-explorer/common/client/sqs"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/sqs"
"go.uber.org/zap"
@ -111,13 +112,14 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
q.wg.Add(1)
q.ch <- &sqsConsumerMessage{
id: msg.ReceiptHandle,
data: event,
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
expiredAt: expiredAt,
ctx: ctx,
id: msg.ReceiptHandle,
data: event,
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
expiredAt: expiredAt,
sentTimestamp: common_sqs.GetSentTimestamp(msg),
ctx: ctx,
}
}
q.wg.Wait()
@ -133,13 +135,14 @@ func (q *SQS) Close() {
}
type sqsConsumerMessage struct {
data *Event
consumer *sqs.Consumer
wg *sync.WaitGroup
id *string
logger *zap.Logger
expiredAt time.Time
ctx context.Context
data *Event
consumer *sqs.Consumer
wg *sync.WaitGroup
id *string
logger *zap.Logger
expiredAt time.Time
sentTimestamp *time.Time
ctx context.Context
}
func (m *sqsConsumerMessage) Data() *Event {
@ -160,3 +163,7 @@ func (m *sqsConsumerMessage) Failed() {
func (m *sqsConsumerMessage) IsExpired() bool {
return m.expiredAt.Before(time.Now())
}
func (m *sqsConsumerMessage) SentTimestamp() *time.Time {
return m.sentTimestamp
}

View File

@ -29,6 +29,7 @@ type ConsumerMessage interface {
Done()
Failed()
IsExpired() bool
SentTimestamp() *time.Time
}
// ConsumeFunc is a function to consume Event.

View File

@ -104,18 +104,19 @@ func (c *Consumer) processSourceTx(ctx context.Context, msg queue.ConsumerMessag
// Process the VAA
p := ProcessSourceTxParams{
TrackID: event.TrackID,
Timestamp: event.Timestamp,
VaaId: event.ID,
ChainId: event.ChainID,
Emitter: event.EmitterAddress,
Sequence: event.Sequence,
TxHash: event.TxHash,
Vaa: event.Vaa,
IsVaaSigned: event.IsVaaSigned,
Metrics: c.metrics,
Overwrite: event.Overwrite, // avoid processing the same transaction twice
Source: event.Source,
TrackID: event.TrackID,
Timestamp: event.Timestamp,
VaaId: event.ID,
ChainId: event.ChainID,
Emitter: event.EmitterAddress,
Sequence: event.Sequence,
TxHash: event.TxHash,
Vaa: event.Vaa,
IsVaaSigned: event.IsVaaSigned,
Metrics: c.metrics,
Overwrite: event.Overwrite, // avoid processing the same transaction twice
Source: event.Source,
SentTimestamp: msg.SentTimestamp(),
}
_, err := ProcessSourceTx(ctx, c.logger, c.rpcpool, c.wormchainRpcPool, c.repository, &p, c.p2pNetwork)

View File

@ -36,6 +36,7 @@ type ProcessSourceTxParams struct {
// to avoid processing the same VAA twice, which would result in performance degradation.
Overwrite bool
Metrics metrics.Metrics
SentTimestamp *time.Time
DisableDBUpsert bool
}
@ -142,6 +143,9 @@ func ProcessSourceTx(
if err != nil {
return nil, err
}
params.Metrics.VaaProcessingDuration(params.ChainId.String(), params.SentTimestamp)
return txDetail, nil
}

View File

@ -1,5 +1,7 @@
package metrics
import "time"
// DummyMetrics is a dummy implementation of Metric interface.
type DummyMetrics struct{}
@ -46,3 +48,6 @@ func (d *DummyMetrics) IncVaaFailed(chainID uint16, retry uint8) {}
// IncWormchainUnknown is a dummy implementation of IncWormchainUnknown.
func (d *DummyMetrics) IncWormchainUnknown(srcChannel string, dstChannel string) {}
// VaaProcessingDuration increments the duration of VAA processing.
func (m *DummyMetrics) VaaProcessingDuration(chain string, start *time.Time) {}

View File

@ -1,5 +1,7 @@
package metrics
import "time"
const serviceName = "wormscan-tx-tracker"
type Metrics interface {
@ -16,4 +18,5 @@ type Metrics interface {
IncVaaProcessed(chainID uint16, retry uint8)
IncVaaFailed(chainID uint16, retry uint8)
IncWormchainUnknown(srcChannel string, dstChannel string)
VaaProcessingDuration(chain string, start *time.Time)
}

View File

@ -2,6 +2,7 @@ package metrics
import (
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -16,65 +17,60 @@ type PrometheusMetrics struct {
storeUnprocessedOriginTx *prometheus.CounterVec
vaaProcessed *prometheus.CounterVec
wormchainUnknown *prometheus.CounterVec
vaaProcessingDuration *prometheus.HistogramVec
}
// NewPrometheusMetrics returns a new instance of PrometheusMetrics.
func NewPrometheusMetrics(environment string) *PrometheusMetrics {
constLabels := map[string]string{
"environment": environment,
"service": serviceName,
}
vaaTxTrackerCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "vaa_tx_tracker_count_by_chain",
Help: "Total number of vaa processed by tx tracker by chain",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "vaa_tx_tracker_count_by_chain",
Help: "Total number of vaa processed by tx tracker by chain",
ConstLabels: constLabels,
}, []string{"chain", "source", "type"})
vaaProcesedDuration := promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "vaa_processed_duration",
Help: "Duration of vaa processing",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Buckets: []float64{.01, .05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 120, 300, 600, 1200},
Name: "vaa_processed_duration",
Help: "Duration of vaa processing",
ConstLabels: constLabels,
Buckets: []float64{.01, .05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 120, 300, 600, 1200},
}, []string{"chain"})
rpcCallCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "rpc_call_count_by_chain",
Help: "Total number of rpc calls by chain",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "rpc_call_count_by_chain",
Help: "Total number of rpc calls by chain",
ConstLabels: constLabels,
}, []string{"chain", "rpc", "status"})
storeUnprocessedOriginTx := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "store_unprocessed_origin_tx",
Help: "Total number of unprocessed origin tx",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "store_unprocessed_origin_tx",
Help: "Total number of unprocessed origin tx",
ConstLabels: constLabels,
}, []string{"chain"})
vaaProcessed := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "vaa_processed",
Help: "Total number of processed vaa with retry context",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "vaa_processed",
Help: "Total number of processed vaa with retry context",
ConstLabels: constLabels,
}, []string{"chain", "retry", "status"})
wormchainUnknown := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormchain_unknown",
Help: "Total number of unknown wormchain",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
Name: "wormchain_unknown",
Help: "Total number of unknown wormchain",
ConstLabels: constLabels,
}, []string{"srcChannel", "dstChannel"})
vaaProcessingDuration := promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "vaa_processing_duration_seconds",
Help: "Duration of all vaa processing by chain.",
ConstLabels: constLabels,
Buckets: []float64{.01, .05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 120, 300, 600, 1200},
},
[]string{"chain"},
)
return &PrometheusMetrics{
vaaTxTrackerCount: vaaTxTrackerCount,
vaaProcesedDuration: vaaProcesedDuration,
@ -82,6 +78,7 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
storeUnprocessedOriginTx: storeUnprocessedOriginTx,
vaaProcessed: vaaProcessed,
wormchainUnknown: wormchainUnknown,
vaaProcessingDuration: vaaProcessingDuration,
}
}
@ -157,3 +154,12 @@ func (m *PrometheusMetrics) IncVaaFailed(chainID uint16, retry uint8) {
func (m *PrometheusMetrics) IncWormchainUnknown(srcChannel string, dstChannel string) {
m.wormchainUnknown.WithLabelValues(srcChannel, dstChannel).Inc()
}
// VaaProcessingDuration increases the duration of vaa processing.
func (p *PrometheusMetrics) VaaProcessingDuration(chain string, start *time.Time) {
if start == nil {
return
}
elapsed := float64(time.Since(*start).Nanoseconds()) / 1e9
p.vaaProcessingDuration.WithLabelValues(chain).Observe(elapsed)
}

View File

@ -100,15 +100,16 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
retry, _ := strconv.Atoi(msg.Attributes["ApproximateReceiveCount"])
q.wg.Add(1)
q.ch <- &sqsConsumerMessage{
id: msg.ReceiptHandle,
data: event,
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
expiredAt: expiredAt,
retry: uint8(retry),
metrics: q.metrics,
ctx: ctx,
id: msg.ReceiptHandle,
data: event,
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
expiredAt: expiredAt,
sentTimestamp: sqs_client.GetSentTimestamp(msg),
retry: uint8(retry),
metrics: q.metrics,
ctx: ctx,
}
}
q.wg.Wait()
@ -124,15 +125,16 @@ func (q *SQS) Close() {
}
type sqsConsumerMessage struct {
data *Event
consumer *sqs_client.Consumer
wg *sync.WaitGroup
id *string
logger *zap.Logger
expiredAt time.Time
retry uint8
metrics metrics.Metrics
ctx context.Context
data *Event
consumer *sqs_client.Consumer
wg *sync.WaitGroup
id *string
logger *zap.Logger
expiredAt time.Time
sentTimestamp *time.Time
retry uint8
metrics metrics.Metrics
ctx context.Context
}
func (m *sqsConsumerMessage) Data() *Event {
@ -164,3 +166,7 @@ func (m *sqsConsumerMessage) IsExpired() bool {
func (m *sqsConsumerMessage) Retry() uint8 {
return m.retry
}
func (m *sqsConsumerMessage) SentTimestamp() *time.Time {
return m.sentTimestamp
}

View File

@ -69,6 +69,7 @@ type ConsumerMessage interface {
Done()
Failed()
IsExpired() bool
SentTimestamp() *time.Time
}
// ConsumeFunc is a function to consume Event.