446 create prometheus metrics (#457)

* Add vaa metrics

* Add dummy metrics and observations metrics

* Add heartbeat, governorConfig, governorStatus metrics

* fix go.sum

* Add deploy en vars METRICS_ENABLED to fly component

Co-authored-by: ftocal <fert1335@gmail.com>

---------

Co-authored-by: Fernando Torres <fert1335@gmail.com>
This commit is contained in:
walker-16 2023-06-26 12:47:22 -03:00 committed by GitHub
parent 1c67efc3d5
commit 737405054a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 411 additions and 15 deletions

View File

@ -156,7 +156,7 @@ func main() {
app := fiber.New(fiber.Config{ErrorHandler: middleware.ErrorHandler})
// Configure middleware
prometheus := fiberprometheus.New("wormscan")
prometheus := fiberprometheus.New("wormscan-api")
prometheus.RegisterAt(app, "/metrics")
app.Use(prometheus.Middleware)

View File

@ -13,4 +13,5 @@ P2P_NETWORK=mainnet
PPROF_ENABLED=false
MAX_HEALTH_TIME_SECONDS=90
AWS_IAM_ROLE=
ALERT_ENABLED=false
ALERT_ENABLED=false
METRIC_ENABLED=false

View File

@ -13,4 +13,5 @@ P2P_NETWORK=mainnet
PPROF_ENABLED=true
MAX_HEALTH_TIME_SECONDS=90
AWS_IAM_ROLE=
ALERT_ENABLED=false
ALERT_ENABLED=false
METRIC_ENABLED=false

View File

@ -13,4 +13,5 @@ P2P_NETWORK=testnet
PPROF_ENABLED=false
MAX_HEALTH_TIME_SECONDS=300
AWS_IAM_ROLE=
ALERT_ENABLED=false
ALERT_ENABLED=false
METRIC_ENABLED=false

View File

@ -73,6 +73,8 @@ spec:
value: {{ .ALERT_ENABLED }}
- name: ENVIRONMENT
value: {{ .ENVIRONMENT }}
- name: METRIC_ENABLED
value: {{ .METRIC_ENABLED }}
resources:
limits:
memory: {{ .RESOURCES_LIMITS_MEMORY }}

View File

@ -7,6 +7,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
@ -29,7 +30,7 @@ func RunTxHashEncoding(cfg TxHashEncondingConfig) {
logger.Fatal("could not connect to DB", zap.Error(err))
}
repository := storage.NewRepository(alert.NewDummyClient(), db, logger)
repository := storage.NewRepository(alert.NewDummyClient(), metrics.NewDummyMetrics(), db, logger)
workerTxHashEncoding(ctx, logger, repository, vaa.ChainID(cfg.ChainID), cfg.PageSize)
}

View File

@ -7,6 +7,7 @@ import (
"github.com/schollz/progressbar/v3"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
@ -58,7 +59,7 @@ func NewWorkpool(ctx context.Context, cfg WorkerConfiguration, workerFunc Generi
}
func (w *Workpool) Process(ctx context.Context) error {
repo := storage.NewRepository(alert.NewDummyClient(), w.DB, w.Log)
repo := storage.NewRepository(alert.NewDummyClient(), metrics.NewDummyMetrics(), w.DB, w.Log)
var err error
for {

View File

@ -105,3 +105,13 @@ func getAlertEnabled() bool {
func getAlertApiKey() string {
return os.Getenv("ALERT_API_KEY")
}
// GetMetricEnabled get if metric is enabled.
func GetMetricEnabled() bool {
strMetricEnabled := os.Getenv("METRIC_ENABLED")
metricEnabled, err := strconv.ParseBool(strMetricEnabled)
if err != nil {
metricEnabled = false
}
return metricEnabled
}

View File

@ -3,6 +3,7 @@ module github.com/wormhole-foundation/wormhole-explorer/fly
go 1.19
require (
github.com/ansrivas/fiberprometheus/v2 v2.6.0
github.com/aws/aws-sdk-go-v2 v1.18.0
github.com/aws/aws-sdk-go-v2/config v1.1.1
github.com/aws/aws-sdk-go-v2/credentials v1.1.1
@ -36,6 +37,7 @@ require (
github.com/cosmos/gogoproto v1.4.3 // indirect
github.com/cosmos/ibc-go/v4 v4.2.0 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/gofiber/adaptor/v2 v2.1.31 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.0 // indirect
github.com/holiman/uint256 v1.2.1 // indirect
@ -209,7 +211,7 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.0.0-20190807091052-3d65705ee9f1 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect

View File

@ -599,9 +599,12 @@ github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3b
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/andybalholm/brotli v1.0.2/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/andybalholm/brotli v1.0.3/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/ansrivas/fiberprometheus/v2 v2.6.0 h1:QUaaKxil/N5IM1R19k6jsmFEJMfa4O3qtnDkiF+zxUc=
github.com/ansrivas/fiberprometheus/v2 v2.6.0/go.mod h1:hivZjKkqX04PPbMZNi9iGB0AQ90iN6RmKERiX1TdgTA=
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/aokoli/goutils v1.0.1/go.mod h1:SijmP0QR8LtwsmDs8Yii5Z/S4trXFGFC2oO5g9DP+DQ=
@ -1381,6 +1384,10 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x
github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofiber/adaptor/v2 v2.1.31 h1:E7LJre4uBc+RDsQfHCE+LKVkFcciSMYu4KhzbvoWgKU=
github.com/gofiber/adaptor/v2 v2.1.31/go.mod h1:vdSG9JhOhOLYjE4j14fx6sJvLJNFVf9o6rSyB5GkU4s=
github.com/gofiber/fiber/v2 v2.41.0/go.mod h1:RdebcCuCRFp4W6hr3968/XxwJVg0K+jr9/Ae0PFzZ0Q=
github.com/gofiber/fiber/v2 v2.42.0/go.mod h1:3+SGNjqMh5VQH5Vz2Wdi43zTIV16ktlFd3x3R6O1Zlc=
github.com/gofiber/fiber/v2 v2.47.0 h1:EN5lHVCc+Pyqh5OEsk8fzRiifgwpbrP0rulQ4iNf3fs=
github.com/gofiber/fiber/v2 v2.47.0/go.mod h1:mbFMVN1lQuzziTkkakgtKKdjfsXSw9BKR5lmcNksUoU=
github.com/gofrs/flock v0.0.0-20190320160742-5135e617513b/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
@ -2998,6 +3005,8 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/fasthttp v1.30.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus=
github.com/valyala/fasthttp v1.43.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seBFc2uWtgiY=
github.com/valyala/fasthttp v1.44.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seBFc2uWtgiY=
github.com/valyala/fasthttp v1.47.0 h1:y7moDoxYzMooFpT5aHgNgVOQDrS3qlkfiP9mDtGGK9c=
github.com/valyala/fasthttp v1.47.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
@ -3269,6 +3278,7 @@ golang.org/x/crypto v0.0.0-20210915214749-c084706c2272/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211202192323-5770296d904e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220313003712-b769efc7c000/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
@ -3452,6 +3462,7 @@ golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220726230323-06994584191e/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM=
golang.org/x/net v0.0.0-20220812174116-3211cb980234/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.0.0-20220906165146-f3363e06e74c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.0.0-20220909164309-bea034e7d591/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.0.0-20221012135044-0b7e1fb9d458/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=

View File

@ -0,0 +1,57 @@
package metrics
import sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
// DummyMetrics is a dummy implementation of Metric interface.
type DummyMetrics struct {
}
// NewDummyMetrics returns a new instance of DummyMetrics.
func NewDummyMetrics() *DummyMetrics {
return &DummyMetrics{}
}
// IncVaaFromGossipNetwork increases the number of vaa received by chain from Gossip network.
func (d *DummyMetrics) IncVaaFromGossipNetwork(chain sdk.ChainID) {}
// IncVaaUnfiltered increases the number of vaa passing through the local deduplicator.
func (d *DummyMetrics) IncVaaUnfiltered(chain sdk.ChainID) {}
// IncVaaConsumedFromQueue increases the number of vaa consumed from SQS queue with deduplication policy.
func (d *DummyMetrics) IncVaaConsumedFromQueue(chain sdk.ChainID) {}
// IncVaaInserted increases the number of vaa inserted into the database.
func (d *DummyMetrics) IncVaaInserted(chain sdk.ChainID) {}
// IncVaaTotal increases the number of vaa received from Gossip network.
func (d *DummyMetrics) IncVaaTotal() {}
// IncObservationFromGossipNetwork increases the number of observation received by chain from Gossip network.
func (d *DummyMetrics) IncObservationFromGossipNetwork(chain sdk.ChainID) {}
// IncObservationUnfiltered increases the number of observation not filtered
func (d *DummyMetrics) IncObservationUnfiltered(chain sdk.ChainID) {}
// IncObservationInserted increases the number of observation inserted in database.
func (d *DummyMetrics) IncObservationInserted(chain sdk.ChainID) {}
// IncObservationTotal increases the number of observation received from Gossip network.
func (d *DummyMetrics) IncObservationTotal() {}
// IncHeartbeatFromGossipNetwork increases the number of heartbeat received by guardian from Gossip network.
func (d *DummyMetrics) IncHeartbeatFromGossipNetwork(guardianName string) {}
// IncHeartbeatInserted increases the number of heartbeat inserted in database.
func (d *DummyMetrics) IncHeartbeatInserted(guardianName string) {}
// IncGovernorConfigFromGossipNetwork increases the number of guardian config received by guardian from Gossip network.
func (d *DummyMetrics) IncGovernorConfigFromGossipNetwork(guardianName string) {}
// IncGovernorConfigInserted increases the number of guardian config inserted in database.
func (d *DummyMetrics) IncGovernorConfigInserted(guardianName string) {}
// IncGovernorStatusFromGossipNetwork increases the number of guardian status received by guardian from Gossip network.
func (d *DummyMetrics) IncGovernorStatusFromGossipNetwork(guardianName string) {}
// IncGovernorStatusInserted increases the number of guardian status inserted in database.
func (d *DummyMetrics) IncGovernorStatusInserted(guardianName string) {}

View File

@ -0,0 +1,32 @@
package metrics
import sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
const serviceName = "wormscan-fly"
type Metrics interface {
// vaa metrics
IncVaaFromGossipNetwork(chain sdk.ChainID)
IncVaaUnfiltered(chain sdk.ChainID)
IncVaaConsumedFromQueue(chain sdk.ChainID)
IncVaaInserted(chain sdk.ChainID)
IncVaaTotal()
// observation metrics
IncObservationFromGossipNetwork(chain sdk.ChainID)
IncObservationUnfiltered(chain sdk.ChainID)
IncObservationInserted(chain sdk.ChainID)
IncObservationTotal()
// heartbeat metrics
IncHeartbeatFromGossipNetwork(guardianName string)
IncHeartbeatInserted(guardianName string)
// governor config metrics
IncGovernorConfigFromGossipNetwork(guardianName string)
IncGovernorConfigInserted(guardianName string)
// governor status metrics
IncGovernorStatusFromGossipNetwork(guardianName string)
IncGovernorStatusInserted(guardianName string)
}

View File

@ -0,0 +1,176 @@
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
)
// PrometheusMetrics is a Prometheus implementation of Metric interface.
type PrometheusMetrics struct {
vaaReceivedCount *prometheus.CounterVec
vaaTotal prometheus.Counter
observationReceivedCount *prometheus.CounterVec
observationTotal prometheus.Counter
heartbeatReceivedCount *prometheus.CounterVec
governorConfigReceivedCount *prometheus.CounterVec
governorStatusReceivedCount *prometheus.CounterVec
}
// NewPrometheusMetrics returns a new instance of PrometheusMetrics.
func NewPrometheusMetrics(environment string) *PrometheusMetrics {
vaaReceivedCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "vaa_count_by_chain",
Help: "Total number of vaa by chain",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
}, []string{"chain", "type"})
vaaTotal := promauto.NewCounter(
prometheus.CounterOpts{
Name: "vaa_total",
Help: "Total number of vaa from Gossip network",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
})
observationReceivedCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "observation_count_by_chain",
Help: "Total number of observation by chain",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
}, []string{"chain", "type"})
observationTotal := promauto.NewCounter(
prometheus.CounterOpts{
Name: "observation_total",
Help: "Total number of observation from Gossip network",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
})
heartbeatReceivedCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "heartbeat_count_by_guardian",
Help: "Total number of heartbeat by guardian",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
}, []string{"guardianNode", "type"})
governorConfigReceivedCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "guardian_config_count_by_guardian",
Help: "Total number of guardian config by guardian",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
}, []string{"guardianNode", "type"})
governorStatusReceivedCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "guardian_status_count_by_guardian",
Help: "Total number of guardian status by guardian",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
}, []string{"guardianNode", "type"})
return &PrometheusMetrics{
vaaReceivedCount: vaaReceivedCount,
vaaTotal: vaaTotal,
observationReceivedCount: observationReceivedCount,
observationTotal: observationTotal,
heartbeatReceivedCount: heartbeatReceivedCount,
governorConfigReceivedCount: governorConfigReceivedCount,
governorStatusReceivedCount: governorStatusReceivedCount,
}
}
// IncVaaFromGossipNetwork increases the number of vaa received by chain from Gossip network.
func (m *PrometheusMetrics) IncVaaFromGossipNetwork(chain sdk.ChainID) {
m.vaaReceivedCount.WithLabelValues(chain.String(), "gossip").Inc()
}
// IncVaaUnfiltered increases the number of vaa passing through the local deduplicator.
func (m *PrometheusMetrics) IncVaaUnfiltered(chain sdk.ChainID) {
m.vaaReceivedCount.WithLabelValues(chain.String(), "unfiltered").Inc()
}
// IncVaaConsumedFromQueue increases the number of vaa consumed from SQS queue with deduplication policy.
func (m *PrometheusMetrics) IncVaaConsumedFromQueue(chain sdk.ChainID) {
m.vaaReceivedCount.WithLabelValues(chain.String(), "consumed-queue").Inc()
}
// IncVaaInserted increases the number of vaa inserted in database.
func (m *PrometheusMetrics) IncVaaInserted(chain sdk.ChainID) {
m.vaaReceivedCount.WithLabelValues(chain.String(), "inserted").Inc()
}
// IncVaaTotal increases the number of vaa received from Gossip network.
func (m *PrometheusMetrics) IncVaaTotal() {
m.vaaTotal.Inc()
}
// IncObservationFromGossipNetwork increases the number of observation received by chain from Gossip network.
func (m *PrometheusMetrics) IncObservationFromGossipNetwork(chain sdk.ChainID) {
m.observationReceivedCount.WithLabelValues(chain.String(), "gossip").Inc()
}
// IncObservationUnfiltered increases the number of observation not filtered
func (m *PrometheusMetrics) IncObservationUnfiltered(chain sdk.ChainID) {
m.observationReceivedCount.WithLabelValues(chain.String(), "unfiltered").Inc()
}
// IncObservationInserted increases the number of observation inserted in database.
func (m *PrometheusMetrics) IncObservationInserted(chain sdk.ChainID) {
m.observationReceivedCount.WithLabelValues(chain.String(), "inserted").Inc()
}
// IncObservationTotal increases the number of observation received from Gossip network.
func (m *PrometheusMetrics) IncObservationTotal() {
m.observationTotal.Inc()
}
// IncHeartbeatFromGossipNetwork increases the number of heartbeat received by guardian from Gossip network.
func (m *PrometheusMetrics) IncHeartbeatFromGossipNetwork(guardianName string) {
m.heartbeatReceivedCount.WithLabelValues(guardianName, "gossip").Inc()
}
// IncHeartbeatInserted increases the number of heartbeat inserted in database.
func (m *PrometheusMetrics) IncHeartbeatInserted(guardianName string) {
m.heartbeatReceivedCount.WithLabelValues(guardianName, "inserted").Inc()
}
// IncGovernorConfigFromGossipNetwork increases the number of guardian config received by guardian from Gossip network.
func (m *PrometheusMetrics) IncGovernorConfigFromGossipNetwork(guardianName string) {
m.governorConfigReceivedCount.WithLabelValues(guardianName, "gossip").Inc()
}
// IncGovernorConfigInserted increases the number of guardian config inserted in database.
func (m *PrometheusMetrics) IncGovernorConfigInserted(guardianName string) {
m.governorConfigReceivedCount.WithLabelValues(guardianName, "inserted").Inc()
}
// IncGovernorStatusFromGossipNetwork increases the number of guardian status received by guardian from Gossip network.
func (m *PrometheusMetrics) IncGovernorStatusFromGossipNetwork(guardianName string) {
m.governorStatusReceivedCount.WithLabelValues(guardianName, "gossip").Inc()
}
// IncGovernorStatusInserted increases the number of guardian status inserted in database.
func (m *PrometheusMetrics) IncGovernorStatusInserted(guardianName string) {
m.governorStatusReceivedCount.WithLabelValues(guardianName, "inserted").Inc()
}

View File

@ -3,6 +3,7 @@ package main
import (
"context"
"flag"
"strconv"
"strings"
"fmt"
@ -20,6 +21,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/fly/guardiansets"
flyAlert "github.com/wormhole-foundation/wormhole-explorer/fly/internal/alert"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/health"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/sqs"
"github.com/wormhole-foundation/wormhole-explorer/fly/migration"
"github.com/wormhole-foundation/wormhole-explorer/fly/notifier"
@ -27,6 +29,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/fly/queue"
"github.com/wormhole-foundation/wormhole-explorer/fly/server"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"google.golang.org/protobuf/proto"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
@ -192,6 +195,14 @@ func newAlertClient() (alert.AlertClient, error) {
return alert.NewAlertService(alertConfig, flyAlert.LoadAlerts)
}
func newMetrics(p2pNetwork *config.P2pNetworkConfig) metrics.Metrics {
metricEnabled := config.GetMetricEnabled()
if !metricEnabled {
return metrics.NewDummyMetrics()
}
return metrics.NewPrometheusMetrics(p2pNetwork.Enviroment)
}
func main() {
//TODO: use a configuration structure to obtain the configuration
if err := godotenv.Load(); err != nil {
@ -232,6 +243,8 @@ func main() {
logger.Fatal("could not create alert client", zap.Error(err))
}
metrics := newMetrics(p2pNetworkConfig)
// Setup DB
uri := os.Getenv("MONGODB_URI")
if uri == "" {
@ -254,7 +267,7 @@ func main() {
logger.Fatal("error running migration", zap.Error(err))
}
repository := storage.NewRepository(alertClient, db, logger)
repository := storage.NewRepository(alertClient, metrics, db, logger)
// Outbound gossip message queue
sendC := make(chan []byte)
@ -297,18 +310,29 @@ func main() {
case <-rootCtx.Done():
return
case o := <-obsvC:
metrics.IncObservationTotal()
ok := verifyObservation(logger, o, gst.Get())
if !ok {
logger.Error("Could not verify observation", zap.String("id", o.MessageId))
continue
}
// get chainID from observationID.
chainID, err := getObservationChainID(logger, o)
if err != nil {
logger.Error("Error getting chainID", zap.Error(err))
continue
}
metrics.IncObservationFromGossipNetwork(chainID)
// apply filter observations by env.
if filterObservationByEnv(o, p2pNetworkConfig.Enviroment) {
continue
}
err := repository.UpsertObservation(o)
metrics.IncObservationUnfiltered(chainID)
err = repository.UpsertObservation(o)
if err != nil {
logger.Error("Error inserting observation", zap.Error(err))
}
@ -332,9 +356,9 @@ func main() {
// When recive a message, the message filter by deduplicator
// if VAA is from pyhnet should be saved directly to repository
// if VAA is from non pyhnet should be publish with nonPythVaaPublish
vaaGossipConsumer := processor.NewVAAGossipConsumer(&guardianSetHistory, deduplicator, nonPythVaaPublish, repository.UpsertVaa, logger)
vaaGossipConsumer := processor.NewVAAGossipConsumer(&guardianSetHistory, deduplicator, nonPythVaaPublish, repository.UpsertVaa, metrics, logger)
// Creates a instance to consume VAA messages (non pyth) from a queue and store in a storage
vaaQueueConsumer := processor.NewVAAQueueConsumer(vaaQueueConsume, repository, notifierFunc, logger)
vaaQueueConsumer := processor.NewVAAQueueConsumer(vaaQueueConsume, repository, notifierFunc, metrics, logger)
// Creates a wrapper that splits the incoming VAAs into 2 channels (pyth to non pyth) in order
// to be able to process them in a differentiated way
vaaGossipConsumerSplitter := processor.NewVAAGossipSplitterConsumer(vaaGossipConsumer.Push, logger)
@ -354,12 +378,14 @@ func main() {
case <-rootCtx.Done():
return
case sVaa := <-signedInC:
metrics.IncVaaTotal()
v, err := vaa.Unmarshal(sVaa.Vaa)
if err != nil {
logger.Error("Error unmarshalling vaa", zap.Error(err))
continue
}
metrics.IncVaaFromGossipNetwork(v.EmitterChain)
// apply filter observations by env.
if filterVaasByEnv(v, p2pNetworkConfig.Enviroment) {
continue
@ -380,9 +406,12 @@ func main() {
case <-rootCtx.Done():
return
case hb := <-heartbeatC:
metrics.IncHeartbeatFromGossipNetwork(hb.NodeName)
err := repository.UpsertHeartbeat(hb)
if err != nil {
logger.Error("Error inserting heartbeat", zap.Error(err))
} else {
metrics.IncHeartbeatInserted(hb.NodeName)
}
guardianCheck.Ping(rootCtx)
}
@ -396,9 +425,18 @@ func main() {
case <-rootCtx.Done():
return
case govConfig := <-govConfigC:
err := repository.UpsertGovernorConfig(govConfig)
nodeName, err := getGovernorConfigNodeName(govConfig)
if err != nil {
logger.Error("Error getting gov config node name", zap.Error(err))
continue
}
metrics.IncGovernorConfigFromGossipNetwork(nodeName)
err = repository.UpsertGovernorConfig(govConfig)
if err != nil {
logger.Error("Error inserting gov config", zap.Error(err))
} else {
metrics.IncGovernorConfigInserted(nodeName)
}
}
}
@ -411,9 +449,17 @@ func main() {
case <-rootCtx.Done():
return
case govStatus := <-govStatusC:
err := repository.UpsertGovernorStatus(govStatus)
nodeName, err := getGovernorStatusNodeName(govStatus)
if err != nil {
logger.Error("Error getting gov status node name", zap.Error(err))
continue
}
metrics.IncGovernorStatusFromGossipNetwork(nodeName)
err = repository.UpsertGovernorStatus(govStatus)
if err != nil {
logger.Error("Error inserting gov status", zap.Error(err))
} else {
metrics.IncGovernorStatusInserted(nodeName)
}
}
}
@ -448,6 +494,38 @@ func main() {
server.Stop()
}
// getGovernorConfigNodeName get node name from governor config.
func getGovernorConfigNodeName(govConfig *gossipv1.SignedChainGovernorConfig) (string, error) {
var gCfg gossipv1.ChainGovernorConfig
err := proto.Unmarshal(govConfig.Config, &gCfg)
if err != nil {
return "", err
}
return gCfg.NodeName, nil
}
// getGovernorStatusNodeName get node name from governor status.
func getGovernorStatusNodeName(govStatus *gossipv1.SignedChainGovernorStatus) (string, error) {
var gStatus gossipv1.ChainGovernorStatus
err := proto.Unmarshal(govStatus.Status, &gStatus)
if err != nil {
return "", err
}
return gStatus.NodeName, nil
}
// getObservationChainID get chainID from observationID.
func getObservationChainID(logger *zap.Logger, obs *gossipv1.SignedObservation) (vaa.ChainID, error) {
vaaID := strings.Split(obs.MessageId, "/")
chainIDStr := vaaID[0]
chainID, err := strconv.ParseUint(chainIDStr, 10, 16)
if err != nil {
logger.Error("Error parsing chainId", zap.Error(err))
return 0, err
}
return vaa.ChainID(chainID), nil
}
func verifyObservation(logger *zap.Logger, obs *gossipv1.SignedObservation, gs *common.GuardianSet) bool {
pk, err := crypto2.Ecrecover(obs.GetHash(), obs.GetSignature())
if err != nil {

View File

@ -5,6 +5,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/fly/deduplicator"
"github.com/wormhole-foundation/wormhole-explorer/fly/guardiansets"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
@ -16,6 +17,7 @@ type vaaGossipConsumer struct {
pythProcess VAAPushFunc
logger *zap.Logger
deduplicator *deduplicator.Deduplicator
metrics metrics.Metrics
}
// NewVAAGossipConsumer creates a new processor instances.
@ -24,6 +26,7 @@ func NewVAAGossipConsumer(
deduplicator *deduplicator.Deduplicator,
nonPythPublish VAAPushFunc,
pythPublish VAAPushFunc,
metrics metrics.Metrics,
logger *zap.Logger,
) *vaaGossipConsumer {
@ -32,6 +35,7 @@ func NewVAAGossipConsumer(
deduplicator: deduplicator,
nonPythProcess: nonPythPublish,
pythProcess: pythPublish,
metrics: metrics,
logger: logger,
}
}
@ -45,6 +49,7 @@ func (p *vaaGossipConsumer) Push(ctx context.Context, v *vaa.VAA, serializedVaa
}
err := p.deduplicator.Apply(ctx, v.MessageID(), func() error {
p.metrics.IncVaaUnfiltered(v.EmitterChain)
if vaa.ChainIDPythNet == v.EmitterChain {
return p.pythProcess(ctx, v, serializedVaa)
}

View File

@ -3,6 +3,7 @@ package processor
import (
"context"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/queue"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
@ -18,6 +19,7 @@ type VAAQueueConsumer struct {
consume VAAQueueConsumeFunc
repository *storage.Repository
notifyFunc VAANotifyFunc
metrics metrics.Metrics
logger *zap.Logger
}
@ -26,11 +28,13 @@ func NewVAAQueueConsumer(
consume VAAQueueConsumeFunc,
repository *storage.Repository,
notifyFunc VAANotifyFunc,
metrics metrics.Metrics,
logger *zap.Logger) *VAAQueueConsumer {
return &VAAQueueConsumer{
consume: consume,
repository: repository,
notifyFunc: notifyFunc,
metrics: metrics,
logger: logger,
}
}
@ -52,6 +56,8 @@ func (c *VAAQueueConsumer) Start(ctx context.Context) {
continue
}
c.metrics.IncVaaConsumedFromQueue(v.EmitterChain)
err = c.repository.UpsertVaa(ctx, v, msg.Data())
if err != nil {
c.logger.Error("Error inserting vaa in repository",

View File

@ -3,6 +3,7 @@ package server
import (
"os"
"github.com/ansrivas/fiberprometheus/v2"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/pprof"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/health"
@ -24,6 +25,12 @@ func NewServer(guardianCheck *health.GuardianCheck, logger *zap.Logger, reposito
}
ctrl := NewController(guardianCheck, repository, consumer, isLocal, logger)
app := fiber.New(fiber.Config{DisableStartupMessage: true})
// Configure middleware
prometheus := fiberprometheus.New("wormscan-fly")
prometheus.RegisterAt(app, "/metrics")
app.Use(prometheus.Middleware)
// config use of middlware.
if pprofEnabled {
app.Use(pprof.New())

View File

@ -13,6 +13,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
flyAlert "github.com/wormhole-foundation/wormhole-explorer/fly/internal/alert"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
@ -24,6 +25,7 @@ import (
// TODO separate and maybe share between fly and web
type Repository struct {
alertClient alert.AlertClient
metrics metrics.Metrics
db *mongo.Database
log *zap.Logger
collections struct {
@ -39,8 +41,8 @@ type Repository struct {
}
// TODO wrap repository with a service that filters using redis
func NewRepository(alertService alert.AlertClient, db *mongo.Database, log *zap.Logger) *Repository {
return &Repository{alertService, db, log, struct {
func NewRepository(alertService alert.AlertClient, metrics metrics.Metrics, db *mongo.Database, log *zap.Logger) *Repository {
return &Repository{alertService, metrics, db, log, struct {
vaas *mongo.Collection
heartbeats *mongo.Collection
observations *mongo.Collection
@ -103,6 +105,7 @@ func (s *Repository) UpsertVaa(ctx context.Context, v *vaa.VAA, serializedVaa []
}
}
if err == nil && s.isNewRecord(result) {
s.metrics.IncVaaInserted(v.EmitterChain)
s.updateVAACount(v.EmitterChain)
}
return err
@ -155,6 +158,8 @@ func (s *Repository) UpsertObservation(o *gossipv1.SignedObservation) error {
return err
}
s.metrics.IncObservationInserted(vaa.ChainID(chainID))
txHash, err := domain.EncodeTrxHashByChainID(vaa.ChainID(chainID), o.GetTxHash())
if err != nil {
s.log.Warn("Error encoding tx hash",