Merge remote-tracking branch 'origin' into feature-1042/mapped-wormchain-source-events

This commit is contained in:
julian merlo 2024-03-25 18:54:51 -03:00
commit e12e6cb576
20 changed files with 1338 additions and 1198 deletions

View File

@ -30,11 +30,13 @@ func (c *Consumer) Start(ctx context.Context) {
for msg := range c.consume(ctx) { for msg := range c.consume(ctx) {
event := msg.Data() event := msg.Data()
chainID := sdk.ChainID(event.ChainID).String()
// check id message is expired. // check id message is expired.
if msg.IsExpired() { if msg.IsExpired() {
msg.Failed() msg.Failed()
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID)) c.logger.Warn("Message with vaa expired", zap.String("id", event.ID))
c.metrics.IncExpiredMessage(c.p2pNetwork, event.Source) c.metrics.IncExpiredMessage(chainID, event.Source)
continue continue
} }
@ -43,7 +45,7 @@ func (c *Consumer) Start(ctx context.Context) {
if err != nil { if err != nil {
msg.Done() msg.Done()
c.logger.Error("Invalid vaa", zap.String("id", event.ID), zap.Error(err)) c.logger.Error("Invalid vaa", zap.String("id", event.ID), zap.Error(err))
c.metrics.IncInvalidMessage(c.p2pNetwork, event.Source) c.metrics.IncInvalidMessage(chainID, event.Source)
continue continue
} }
@ -51,13 +53,13 @@ func (c *Consumer) Start(ctx context.Context) {
err = c.pushMetric(ctx, &metric.Params{TrackID: event.TrackID, Vaa: vaa, VaaIsSigned: event.VaaIsSigned}) err = c.pushMetric(ctx, &metric.Params{TrackID: event.TrackID, Vaa: vaa, VaaIsSigned: event.VaaIsSigned})
if err != nil { if err != nil {
msg.Failed() msg.Failed()
c.metrics.IncUnprocessedMessage(c.p2pNetwork, event.Source) c.metrics.IncUnprocessedMessage(chainID, event.Source)
continue continue
} }
msg.Done() msg.Done()
c.logger.Debug("Pushed vaa metric", zap.String("id", event.ID)) c.logger.Debug("Pushed vaa metric", zap.String("id", event.ID))
c.metrics.IncProcessedMessage(c.p2pNetwork, event.Source) c.metrics.IncProcessedMessage(chainID, event.Source)
} }
}() }()
} }

View File

@ -281,7 +281,10 @@
] ]
}, },
{ {
"addresses": ["0xf3f04555f8fda510bfc77820fd6eb8446f59e72d"], "addresses": [
"0xf3f04555f8fda510bfc77820fd6eb8446f59e72d",
"0xe1c66210fb97c76cdaee38950f5e9c181e9da628"
],
"type": "Mayan", "type": "Mayan",
"methods": [ "methods": [
{ {
@ -291,6 +294,10 @@
{ {
"methodId": "0xc0e6d169", "methodId": "0xc0e6d169",
"method": "MethodRedeemAndUnwrap" "method": "MethodRedeemAndUnwrap"
},
{
"methodId": "0x6df0e4a8",
"method": "MethodRedeemWithPayload"
} }
] ]
}, },

File diff suppressed because it is too large Load Diff

View File

@ -632,7 +632,8 @@ data:
"0x9daF8c91AEFAE50b9c0E69629D3F6Ca40cA3B3FE", "0x9daF8c91AEFAE50b9c0E69629D3F6Ca40cA3B3FE",
"0x227babe533fa9a1085f5261210e0b7137e44437b", "0x227babe533fa9a1085f5261210e0b7137e44437b",
"0xf3f04555f8fda510bfc77820fd6eb8446f59e72d", "0xf3f04555f8fda510bfc77820fd6eb8446f59e72d",
"0x27428dd2d3dd32a4d7f7c497eaaa23130d894911" "0x27428dd2d3dd32a4d7f7c497eaaa23130d894911",
"0xe1c66210fb97c76cdaee38950f5e9c181e9da628"
], ],
"chain": "polygon", "chain": "polygon",
"chainId": 5, "chainId": 5,
@ -657,7 +658,8 @@ data:
"0x9daF8c91AEFAE50b9c0E69629D3F6Ca40cA3B3FE", "0x9daF8c91AEFAE50b9c0E69629D3F6Ca40cA3B3FE",
"0x227babe533fa9a1085f5261210e0b7137e44437b", "0x227babe533fa9a1085f5261210e0b7137e44437b",
"0xf3f04555f8fda510bfc77820fd6eb8446f59e72d", "0xf3f04555f8fda510bfc77820fd6eb8446f59e72d",
"0x27428dd2d3dd32a4d7f7c497eaaa23130d894911" "0x27428dd2d3dd32a4d7f7c497eaaa23130d894911",
"0xe1c66210fb97c76cdaee38950f5e9c181e9da628"
], ],
"topics": [ "topics": [
"0x1b2a7ff080b8cb6ff436ce0372e399692bbfb6d4ae5766fd8d58a7b8cc6142e6", "0x1b2a7ff080b8cb6ff436ce0372e399692bbfb6d4ae5766fd8d58a7b8cc6142e6",

View File

@ -3,14 +3,15 @@ package main
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/wormhole-foundation/wormhole-explorer/common/dbconsts"
"github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols"
"github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/repository"
"log" "log"
"os" "os"
"strings" "strings"
"time" "time"
"github.com/wormhole-foundation/wormhole-explorer/common/dbconsts"
"github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols"
"github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/repository"
influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/wormhole-foundation/wormhole-explorer/common/configuration" "github.com/wormhole-foundation/wormhole-explorer/common/configuration"
@ -104,7 +105,7 @@ func main() {
// initNotionalJob initializes notional job. // initNotionalJob initializes notional job.
func initNotionalJob(ctx context.Context, cfg *config.NotionalConfiguration, logger *zap.Logger) *notional.NotionalJob { func initNotionalJob(ctx context.Context, cfg *config.NotionalConfiguration, logger *zap.Logger) *notional.NotionalJob {
// init coingecko api client. // init coingecko api client.
api := coingecko.NewCoingeckoAPI(cfg.CoingeckoURL) api := coingecko.NewCoingeckoAPI(cfg.CoingeckoURL, logger)
// init redis client. // init redis client.
redisClient := redis.NewClient(&redis.Options{Addr: cfg.CacheURL}) redisClient := redis.NewClient(&redis.Options{Addr: cfg.CacheURL})
// init token provider. // init token provider.

View File

@ -8,6 +8,7 @@ import (
"strings" "strings"
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
"go.uber.org/zap"
) )
// CoingeckoAPI is a client for the coingecko API // CoingeckoAPI is a client for the coingecko API
@ -15,14 +16,16 @@ type CoingeckoAPI struct {
url string url string
chunkSize int chunkSize int
client *http.Client client *http.Client
logger *zap.Logger
} }
// NewCoingeckoAPI creates a new coingecko client // NewCoingeckoAPI creates a new coingecko client
func NewCoingeckoAPI(url string) *CoingeckoAPI { func NewCoingeckoAPI(url string, logger *zap.Logger) *CoingeckoAPI {
return &CoingeckoAPI{ return &CoingeckoAPI{
url: url, url: url,
chunkSize: 200, chunkSize: 200,
client: http.DefaultClient, client: http.DefaultClient,
logger: logger,
} }
} }
@ -37,8 +40,11 @@ func (c *CoingeckoAPI) GetNotionalUSD(ids []string) (map[string]NotionalUSD, err
response := map[string]NotionalUSD{} response := map[string]NotionalUSD{}
chunksIds := chunkChainIds(ids, c.chunkSize) chunksIds := chunkChainIds(ids, c.chunkSize)
c.logger.Info("fetching notional value of assets", zap.Int("total_chunks", len(chunksIds)))
// iterate over chunks of ids. // iterate over chunks of ids.
for _, chunk := range chunksIds { for i, chunk := range chunksIds {
notionalUrl := fmt.Sprintf("%s/simple/price?ids=%s&vs_currencies=usd", c.url, strings.Join(chunk, ",")) notionalUrl := fmt.Sprintf("%s/simple/price?ids=%s&vs_currencies=usd", c.url, strings.Join(chunk, ","))
req, err := http.NewRequest(http.MethodGet, notionalUrl, nil) req, err := http.NewRequest(http.MethodGet, notionalUrl, nil)
@ -51,6 +57,11 @@ func (c *CoingeckoAPI) GetNotionalUSD(ids []string) (map[string]NotionalUSD, err
} }
defer res.Body.Close() defer res.Body.Close()
if res.StatusCode != 200 {
c.logger.Error("failed to get notional value of assets", zap.Int("statusCode", res.StatusCode), zap.Int("chunk", i))
return response, fmt.Errorf("failed to get notional value of assets, status code: %d", res.StatusCode)
}
body, err := ioutil.ReadAll(res.Body) body, err := ioutil.ReadAll(res.Body)
if err != nil { if err != nil {
return response, err return response, err

View File

@ -6,6 +6,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/parser/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/parser/processor" "github.com/wormhole-foundation/wormhole-explorer/parser/processor"
"github.com/wormhole-foundation/wormhole-explorer/parser/queue" "github.com/wormhole-foundation/wormhole-explorer/parser/queue"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -28,13 +29,15 @@ func (c *Consumer) Start(ctx context.Context) {
for msg := range c.consume(ctx) { for msg := range c.consume(ctx) {
event := msg.Data() event := msg.Data()
emitterChainID := sdk.ChainID(event.ChainID).String()
// check id message is expired. // check id message is expired.
if msg.IsExpired() { if msg.IsExpired() {
c.metrics.IncExpiredMessage(emitterChainID, event.Source)
c.logger.Warn("Event expired", zap.String("id", event.ID)) c.logger.Warn("Event expired", zap.String("id", event.ID))
msg.Failed() msg.Failed()
continue continue
} }
c.metrics.IncVaaUnexpired(event.ChainID)
params := &processor.Params{ params := &processor.Params{
TrackID: event.TrackID, TrackID: event.TrackID,
@ -42,6 +45,7 @@ func (c *Consumer) Start(ctx context.Context) {
} }
_, err := c.process(ctx, params) _, err := c.process(ctx, params)
if err != nil { if err != nil {
c.metrics.IncUnprocessedMessage(emitterChainID, event.Source)
c.logger.Error("Error processing event", c.logger.Error("Error processing event",
zap.String("trackId", event.TrackID), zap.String("trackId", event.TrackID),
zap.String("id", event.ID), zap.String("id", event.ID),
@ -49,6 +53,7 @@ func (c *Consumer) Start(ctx context.Context) {
msg.Failed() msg.Failed()
continue continue
} else { } else {
c.metrics.IncProcessedMessage(emitterChainID, event.Source)
c.logger.Debug("Event processed", c.logger.Debug("Event processed",
zap.String("trackId", event.TrackID), zap.String("trackId", event.TrackID),
zap.String("id", event.ID)) zap.String("id", event.ID))

View File

@ -35,3 +35,12 @@ func (d *DummyMetrics) IncVaaPayloadParserSuccessCount(chainID uint16) {}
// IncVaaPayloadParserSuccessCount increments the number of vaa payload parser success. // IncVaaPayloadParserSuccessCount increments the number of vaa payload parser success.
func (d *DummyMetrics) IncVaaPayloadParserNotFoundCount(chainID uint16) {} func (d *DummyMetrics) IncVaaPayloadParserNotFoundCount(chainID uint16) {}
// IncExpiredMessage increments the number of expired message.
func (p *DummyMetrics) IncExpiredMessage(chain, source string) {}
// IncUnprocessedMessage increments the number of unprocessed message.
func (p *DummyMetrics) IncUnprocessedMessage(chain, source string) {}
// IncProcessedMessage increments the number of processed message.
func (p *DummyMetrics) IncProcessedMessage(chain, source string) {}

View File

@ -5,7 +5,6 @@ const serviceName = "wormscan-parser"
type Metrics interface { type Metrics interface {
IncVaaConsumedQueue(chainID uint16) IncVaaConsumedQueue(chainID uint16)
IncVaaUnfiltered(chainID uint16) IncVaaUnfiltered(chainID uint16)
IncVaaUnexpired(chainID uint16)
IncVaaParsed(chainID uint16) IncVaaParsed(chainID uint16)
IncVaaParsedInserted(chainID uint16) IncVaaParsedInserted(chainID uint16)
@ -13,4 +12,8 @@ type Metrics interface {
IncVaaPayloadParserErrorCount(chainID uint16) IncVaaPayloadParserErrorCount(chainID uint16)
IncVaaPayloadParserNotFoundCount(chainID uint16) IncVaaPayloadParserNotFoundCount(chainID uint16)
IncVaaPayloadParserSuccessCount(chainID uint16) IncVaaPayloadParserSuccessCount(chainID uint16)
IncExpiredMessage(chain, source string)
IncUnprocessedMessage(chain, source string)
IncProcessedMessage(chain, source string)
} }

View File

@ -11,6 +11,7 @@ type PrometheusMetrics struct {
vaaParseCount *prometheus.CounterVec vaaParseCount *prometheus.CounterVec
vaaPayloadParserRequest *prometheus.CounterVec vaaPayloadParserRequest *prometheus.CounterVec
vaaPayloadParserResponseCount *prometheus.CounterVec vaaPayloadParserResponseCount *prometheus.CounterVec
processedMessage *prometheus.CounterVec
} }
// NewPrometheusMetrics returns a new instance of PrometheusMetrics. // NewPrometheusMetrics returns a new instance of PrometheusMetrics.
@ -42,10 +43,22 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
"service": serviceName, "service": serviceName,
}, },
}, []string{"chain", "status"}) }, []string{"chain", "status"})
processedMessage := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "processed_message",
Help: "Total number of processed message",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
},
[]string{"chain", "source", "status"},
)
return &PrometheusMetrics{ return &PrometheusMetrics{
vaaParseCount: vaaParseCount, vaaParseCount: vaaParseCount,
vaaPayloadParserRequest: vaaPayloadParserRequestCount, vaaPayloadParserRequest: vaaPayloadParserRequestCount,
vaaPayloadParserResponseCount: vaaPayloadParserResponseCount, vaaPayloadParserResponseCount: vaaPayloadParserResponseCount,
processedMessage: processedMessage,
} }
} }
@ -102,3 +115,18 @@ func (m *PrometheusMetrics) IncVaaPayloadParserNotFoundCount(chainID uint16) {
chain := vaa.ChainID(chainID).String() chain := vaa.ChainID(chainID).String()
m.vaaPayloadParserResponseCount.WithLabelValues(chain, "not_found").Inc() m.vaaPayloadParserResponseCount.WithLabelValues(chain, "not_found").Inc()
} }
// IncExpiredMessage increments the number of expired message.
func (p *PrometheusMetrics) IncExpiredMessage(chain, source string) {
p.processedMessage.WithLabelValues(chain, source, "expired").Inc()
}
// IncUnprocessedMessage increments the number of unprocessed message.
func (p *PrometheusMetrics) IncUnprocessedMessage(chain, source string) {
p.processedMessage.WithLabelValues(chain, source, "unprocessed").Inc()
}
// IncProcessedMessage increments the number of processed message.
func (p *PrometheusMetrics) IncProcessedMessage(chain, source string) {
p.processedMessage.WithLabelValues(chain, source, "processed").Inc()
}

View File

@ -37,6 +37,7 @@ func NewVaaConverter(log *zap.Logger) ConverterFunc {
return nil, err return nil, err
} }
return &Event{ return &Event{
Source: "pipeline",
TrackID: fmt.Sprintf("pipeline-%s", vaaEvent.ID), TrackID: fmt.Sprintf("pipeline-%s", vaaEvent.ID),
ID: vaaEvent.ID, ID: vaaEvent.ID,
ChainID: vaaEvent.ChainID, ChainID: vaaEvent.ChainID,
@ -73,6 +74,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
} }
return &Event{ return &Event{
Source: "chain-event",
TrackID: notification.TrackID, TrackID: notification.TrackID,
ID: signedVaaEvent.ID, ID: signedVaaEvent.ID,
ChainID: signedVaaEvent.EmitterChain, ChainID: signedVaaEvent.EmitterChain,
@ -101,6 +103,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
} }
return &Event{ return &Event{
Source: "chain-event",
TrackID: notification.TrackID, TrackID: notification.TrackID,
ID: vaa.MessageID(), ID: vaa.MessageID(),
ChainID: plm.ChainID, ChainID: plm.ChainID,

View File

@ -12,6 +12,7 @@ type sqsEvent struct {
// Event represents a event data to be handle. // Event represents a event data to be handle.
type Event struct { type Event struct {
Source string
TrackID string TrackID string
ID string ID string
ChainID uint16 ChainID uint16

View File

@ -97,7 +97,7 @@ func (c *Consumer) processSourceTx(ctx context.Context, msg queue.ConsumerMessag
start := time.Now() start := time.Now()
c.metrics.IncVaaUnfiltered(uint16(event.ChainID)) c.metrics.IncVaaUnfiltered(event.ChainID.String(), event.Source)
// Process the VAA // Process the VAA
p := ProcessSourceTxParams{ p := ProcessSourceTxParams{
@ -147,7 +147,7 @@ func (c *Consumer) processSourceTx(ctx context.Context, msg queue.ConsumerMessag
zap.String("id", event.ID), zap.String("id", event.ID),
elapsedLog, elapsedLog,
) )
c.metrics.IncOriginTxInserted(uint16(event.ChainID)) c.metrics.IncOriginTxInserted(event.ChainID.String(), event.Source)
} }
} }
@ -165,9 +165,10 @@ func (c *Consumer) processTargetTx(ctx context.Context, msg queue.ConsumerMessag
// Process the VAA // Process the VAA
p := ProcessTargetTxParams{ p := ProcessTargetTxParams{
Source: event.Source,
TrackID: event.TrackID, TrackID: event.TrackID,
VaaId: event.ID, VaaId: event.ID,
ChainId: event.ChainID, ChainID: event.ChainID,
Emitter: event.EmitterAddress, Emitter: event.EmitterAddress,
TxHash: event.TxHash, TxHash: event.TxHash,
BlockTimestamp: event.Timestamp, BlockTimestamp: event.Timestamp,

View File

@ -19,9 +19,10 @@ var (
// ProcessSourceTxParams is a struct that contains the parameters for the ProcessSourceTx method. // ProcessSourceTxParams is a struct that contains the parameters for the ProcessSourceTx method.
type ProcessTargetTxParams struct { type ProcessTargetTxParams struct {
Source string
TrackID string TrackID string
VaaId string VaaId string
ChainId sdk.ChainID ChainID sdk.ChainID
Emitter string Emitter string
TxHash string TxHash string
BlockTimestamp *time.Time BlockTimestamp *time.Time
@ -45,7 +46,7 @@ func ProcessTargetTx(
ID: params.VaaId, ID: params.VaaId,
TrackID: params.TrackID, TrackID: params.TrackID,
Destination: &DestinationTx{ Destination: &DestinationTx{
ChainID: params.ChainId, ChainID: params.ChainID,
Status: params.Status, Status: params.Status,
TxHash: params.TxHash, TxHash: params.TxHash,
BlockNumber: params.BlockHeight, BlockNumber: params.BlockHeight,
@ -65,7 +66,7 @@ func ProcessTargetTx(
} }
err = repository.UpsertTargetTx(ctx, update) err = repository.UpsertTargetTx(ctx, update)
if err == nil { if err == nil {
params.Metrics.IncDestinationTxInserted(uint16(params.ChainId)) params.Metrics.IncDestinationTxInserted(params.ChainID.String(), params.Source)
} }
return err return err
} }

View File

@ -9,16 +9,16 @@ func NewDummyMetrics() *DummyMetrics {
} }
// IncVaaConsumedQueue is a dummy implementation of IncVaaConsumedQueue. // IncVaaConsumedQueue is a dummy implementation of IncVaaConsumedQueue.
func (d *DummyMetrics) IncVaaConsumedQueue(chainID uint16) {} func (d *DummyMetrics) IncVaaConsumedQueue(chainID string, source string) {}
// IncVaaUnfiltered is a dummy implementation of IncVaaUnfiltered. // IncVaaUnfiltered is a dummy implementation of IncVaaUnfiltered.
func (d *DummyMetrics) IncVaaUnfiltered(chainID uint16) {} func (d *DummyMetrics) IncVaaUnfiltered(chainID string, source string) {}
// IncOriginTxInserted is a dummy implementation of IncOriginTxInserted. // IncOriginTxInserted is a dummy implementation of IncOriginTxInserted.
func (d *DummyMetrics) IncOriginTxInserted(chainID uint16) {} func (d *DummyMetrics) IncOriginTxInserted(chainID string, source string) {}
// IncDestinationTxInserted is a dummy implementation of IncDestinationTxInserted. // IncDestinationTxInserted is a dummy implementation of IncDestinationTxInserted.
func (d *DummyMetrics) IncDestinationTxInserted(chainID uint16) {} func (d *DummyMetrics) IncDestinationTxInserted(chainID string, source string) {}
// IncVaaWithoutTxHash is a dummy implementation of IncVaaWithoutTxHash. // IncVaaWithoutTxHash is a dummy implementation of IncVaaWithoutTxHash.
func (d *DummyMetrics) IncVaaWithoutTxHash(chainID uint16) {} func (d *DummyMetrics) IncVaaWithoutTxHash(chainID uint16) {}

View File

@ -3,12 +3,12 @@ package metrics
const serviceName = "wormscan-tx-tracker" const serviceName = "wormscan-tx-tracker"
type Metrics interface { type Metrics interface {
IncVaaConsumedQueue(chainID uint16) IncVaaConsumedQueue(chainID string, source string)
IncVaaUnfiltered(chainID uint16) IncVaaUnfiltered(chainID string, source string)
IncOriginTxInserted(chainID uint16) IncOriginTxInserted(chainID string, source string)
IncVaaWithoutTxHash(chainID uint16) IncVaaWithoutTxHash(chainID uint16)
IncVaaWithTxHashFixed(chainID uint16) IncVaaWithTxHashFixed(chainID uint16)
IncDestinationTxInserted(chainID uint16) IncDestinationTxInserted(chainID string, source string)
AddVaaProcessedDuration(chainID uint16, duration float64) AddVaaProcessedDuration(chainID uint16, duration float64)
IncCallRpcSuccess(chainID uint16, rpc string) IncCallRpcSuccess(chainID uint16, rpc string)
IncCallRpcError(chainID uint16, rpc string) IncCallRpcError(chainID uint16, rpc string)

View File

@ -27,7 +27,7 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
"environment": environment, "environment": environment,
"service": serviceName, "service": serviceName,
}, },
}, []string{"chain", "type"}) }, []string{"chain", "source", "type"})
vaaProcesedDuration := promauto.NewHistogramVec(prometheus.HistogramOpts{ vaaProcesedDuration := promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "vaa_processed_duration", Name: "vaa_processed_duration",
Help: "Duration of vaa processing", Help: "Duration of vaa processing",
@ -74,27 +74,23 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
} }
// IncVaaConsumedQueue increments the number of consumed VAA. // IncVaaConsumedQueue increments the number of consumed VAA.
func (m *PrometheusMetrics) IncVaaConsumedQueue(chainID uint16) { func (m *PrometheusMetrics) IncVaaConsumedQueue(chainID string, source string) {
chain := vaa.ChainID(chainID).String() m.vaaTxTrackerCount.WithLabelValues(chainID, source, "consumed_queue").Inc()
m.vaaTxTrackerCount.WithLabelValues(chain, "consumed_queue").Inc()
} }
// IncVaaUnfiltered increments the number of unfiltered VAA. // IncVaaUnfiltered increments the number of unfiltered VAA.
func (m *PrometheusMetrics) IncVaaUnfiltered(chainID uint16) { func (m *PrometheusMetrics) IncVaaUnfiltered(chainID string, source string) {
chain := vaa.ChainID(chainID).String() m.vaaTxTrackerCount.WithLabelValues(chainID, source, "unfiltered").Inc()
m.vaaTxTrackerCount.WithLabelValues(chain, "unfiltered").Inc()
} }
// IncOriginTxInserted increments the number of inserted origin tx. // IncOriginTxInserted increments the number of inserted origin tx.
func (m *PrometheusMetrics) IncOriginTxInserted(chainID uint16) { func (m *PrometheusMetrics) IncOriginTxInserted(chainID string, source string) {
chain := vaa.ChainID(chainID).String() m.vaaTxTrackerCount.WithLabelValues(chainID, source, "origin_tx_inserted").Inc()
m.vaaTxTrackerCount.WithLabelValues(chain, "origin_tx_inserted").Inc()
} }
// IncDestinationTxInserted increments the number of inserted destination tx. // IncDestinationTxInserted increments the number of inserted destination tx.
func (m *PrometheusMetrics) IncDestinationTxInserted(chainID uint16) { func (m *PrometheusMetrics) IncDestinationTxInserted(chainID string, source string) {
chain := vaa.ChainID(chainID).String() m.vaaTxTrackerCount.WithLabelValues(chainID, source, "destination_tx_inserted").Inc()
m.vaaTxTrackerCount.WithLabelValues(chain, "destination_tx_inserted").Inc()
} }
// AddVaaProcessedDuration adds the duration of vaa processing. // AddVaaProcessedDuration adds the duration of vaa processing.

View File

@ -38,6 +38,7 @@ func NewVaaConverter(log *zap.Logger) ConverterFunc {
return nil, err return nil, err
} }
return &Event{ return &Event{
Source: "pipeline",
TrackID: fmt.Sprintf("pipeline-%s", vaaEvent.ID), TrackID: fmt.Sprintf("pipeline-%s", vaaEvent.ID),
Type: SourceChainEvent, Type: SourceChainEvent,
ID: vaaEvent.ID, ID: vaaEvent.ID,
@ -80,6 +81,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
} }
return &Event{ return &Event{
Source: "chain-event",
TrackID: notification.TrackID, TrackID: notification.TrackID,
Type: SourceChainEvent, Type: SourceChainEvent,
ID: signedVaa.ID, ID: signedVaa.ID,
@ -104,6 +106,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
} }
return &Event{ return &Event{
Source: "chain-event",
TrackID: notification.TrackID, TrackID: notification.TrackID,
Type: SourceChainEvent, Type: SourceChainEvent,
ID: vaa.MessageID(), ID: vaa.MessageID(),
@ -136,6 +139,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
} }
return &Event{ return &Event{
Source: "chain-event",
TrackID: notification.TrackID, TrackID: notification.TrackID,
Type: TargetChainEvent, Type: TargetChainEvent,
ID: vaa.MessageID(), ID: vaa.MessageID(),
@ -171,6 +175,7 @@ func NewNotificationEvent(log *zap.Logger) ConverterFunc {
} }
return &Event{ return &Event{
Source: "chain-event",
TrackID: notification.TrackID, TrackID: notification.TrackID,
Type: TargetChainEvent, Type: TargetChainEvent,
ID: vaa.MessageID(), ID: vaa.MessageID(),

View File

@ -95,7 +95,7 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
} }
continue continue
} }
q.metrics.IncVaaConsumedQueue(uint16(event.ChainID)) q.metrics.IncVaaConsumedQueue(event.ChainID.String(), event.Source)
retry, _ := strconv.Atoi(msg.Attributes["ApproximateReceiveCount"]) retry, _ := strconv.Atoi(msg.Attributes["ApproximateReceiveCount"])
q.wg.Add(1) q.wg.Add(1)

View File

@ -39,6 +39,7 @@ type EventAttributes interface {
// Event represents a event data to be handle. // Event represents a event data to be handle.
type Event struct { type Event struct {
Source string
TrackID string TrackID string
Type EventType Type EventType
ID string ID string