replaced klog -> zap in exporter
This commit is contained in:
parent
1bbaa29acb
commit
14eab970db
|
@ -1,8 +1,8 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"github.com/asymmetric-research/solana_exporter/pkg/slog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
type GaugeDesc struct {
|
||||
|
@ -22,8 +22,9 @@ func NewGaugeDesc(name string, description string, variableLabels ...string) *Ga
|
|||
}
|
||||
|
||||
func (c *GaugeDesc) MustNewConstMetric(value float64, labels ...string) prometheus.Metric {
|
||||
logger := slog.Get()
|
||||
if len(labels) != len(c.VariableLabels) {
|
||||
klog.Fatalf("Provided labels (%v) do not match %s labels (%v)", labels, c.Name, c.VariableLabels)
|
||||
logger.Fatalf("Provided labels (%v) do not match %s labels (%v)", labels, c.Name, c.VariableLabels)
|
||||
}
|
||||
return prometheus.MustNewConstMetric(c.Desc, prometheus.GaugeValue, value, labels...)
|
||||
}
|
||||
|
|
|
@ -5,12 +5,12 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"github.com/asymmetric-research/solana_exporter/pkg/rpc"
|
||||
"github.com/asymmetric-research/solana_exporter/pkg/slog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"go.uber.org/zap"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -30,12 +30,9 @@ const (
|
|||
StateDelinquent = "delinquent"
|
||||
)
|
||||
|
||||
func init() {
|
||||
klog.InitFlags(nil)
|
||||
}
|
||||
|
||||
type SolanaCollector struct {
|
||||
rpcClient rpc.Provider
|
||||
logger *zap.SugaredLogger
|
||||
|
||||
// config:
|
||||
slotPace time.Duration
|
||||
|
@ -61,6 +58,7 @@ func NewSolanaCollector(
|
|||
) *SolanaCollector {
|
||||
collector := &SolanaCollector{
|
||||
rpcClient: provider,
|
||||
logger: slog.Get(),
|
||||
slotPace: slotPace,
|
||||
balanceAddresses: CombineUnique(balanceAddresses, nodekeys, votekeys),
|
||||
identity: identity,
|
||||
|
@ -149,7 +147,7 @@ func (c *SolanaCollector) Describe(ch chan<- *prometheus.Desc) {
|
|||
func (c *SolanaCollector) collectVoteAccounts(ctx context.Context, ch chan<- prometheus.Metric) {
|
||||
voteAccounts, err := c.rpcClient.GetVoteAccounts(ctx, rpc.CommitmentConfirmed, nil)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get vote accounts: %v", err)
|
||||
c.logger.Errorf("failed to get vote accounts: %v", err)
|
||||
ch <- c.ValidatorActive.NewInvalidMetric(err)
|
||||
ch <- c.ValidatorActiveStake.NewInvalidMetric(err)
|
||||
ch <- c.ValidatorLastVote.NewInvalidMetric(err)
|
||||
|
@ -180,7 +178,7 @@ func (c *SolanaCollector) collectVersion(ctx context.Context, ch chan<- promethe
|
|||
version, err := c.rpcClient.GetVersion(ctx)
|
||||
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get version: %v", err)
|
||||
c.logger.Errorf("failed to get version: %v", err)
|
||||
ch <- c.NodeVersion.NewInvalidMetric(err)
|
||||
return
|
||||
}
|
||||
|
@ -191,7 +189,7 @@ func (c *SolanaCollector) collectMinimumLedgerSlot(ctx context.Context, ch chan<
|
|||
slot, err := c.rpcClient.GetMinimumLedgerSlot(ctx)
|
||||
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get minimum lidger slot: %v", err)
|
||||
c.logger.Errorf("failed to get minimum lidger slot: %v", err)
|
||||
ch <- c.NodeMinimumLedgerSlot.NewInvalidMetric(err)
|
||||
return
|
||||
}
|
||||
|
@ -202,7 +200,7 @@ func (c *SolanaCollector) collectFirstAvailableBlock(ctx context.Context, ch cha
|
|||
block, err := c.rpcClient.GetFirstAvailableBlock(ctx)
|
||||
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get first available block: %v", err)
|
||||
c.logger.Errorf("failed to get first available block: %v", err)
|
||||
ch <- c.NodeFirstAvailableBlock.NewInvalidMetric(err)
|
||||
return
|
||||
}
|
||||
|
@ -213,7 +211,7 @@ func (c *SolanaCollector) collectFirstAvailableBlock(ctx context.Context, ch cha
|
|||
func (c *SolanaCollector) collectBalances(ctx context.Context, ch chan<- prometheus.Metric) {
|
||||
balances, err := FetchBalances(ctx, c.rpcClient, c.balanceAddresses)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get balances: %v", err)
|
||||
c.logger.Errorf("failed to get balances: %v", err)
|
||||
ch <- c.AccountBalances.NewInvalidMetric(err)
|
||||
return
|
||||
}
|
||||
|
@ -236,20 +234,20 @@ func (c *SolanaCollector) collectHealth(ctx context.Context, ch chan<- prometheu
|
|||
var errorData rpc.NodeUnhealthyErrorData
|
||||
if rpcError.Data == nil {
|
||||
// if there is no data, then this is some unexpected error and should just be logged
|
||||
klog.Errorf("failed to get health: %v", err)
|
||||
c.logger.Errorf("failed to get health: %v", err)
|
||||
ch <- c.NodeIsHealthy.NewInvalidMetric(err)
|
||||
ch <- c.NodeNumSlotsBehind.NewInvalidMetric(err)
|
||||
return
|
||||
}
|
||||
if err = rpc.UnpackRpcErrorData(rpcError, errorData); err != nil {
|
||||
// if we error here, it means we have the incorrect format
|
||||
klog.Fatalf("failed to unpack %s rpc error: %v", rpcError.Method, err.Error())
|
||||
c.logger.Fatalf("failed to unpack %s rpc error: %v", rpcError.Method, err.Error())
|
||||
}
|
||||
isHealthy = 0
|
||||
numSlotsBehind = errorData.NumSlotsBehind
|
||||
} else {
|
||||
// if it's not an RPC error, log it
|
||||
klog.Errorf("failed to get health: %v", err)
|
||||
c.logger.Errorf("failed to get health: %v", err)
|
||||
ch <- c.NodeIsHealthy.NewInvalidMetric(err)
|
||||
ch <- c.NodeNumSlotsBehind.NewInvalidMetric(err)
|
||||
return
|
||||
|
@ -275,11 +273,12 @@ func (c *SolanaCollector) Collect(ch chan<- prometheus.Metric) {
|
|||
}
|
||||
|
||||
func main() {
|
||||
logger := slog.Get()
|
||||
ctx := context.Background()
|
||||
|
||||
config := NewExporterConfigFromCLI()
|
||||
if config.ComprehensiveSlotTracking {
|
||||
klog.Warning(
|
||||
logger.Warn(
|
||||
"Comprehensive slot tracking will lead to potentially thousands of new " +
|
||||
"Prometheus metrics being created every epoch.",
|
||||
)
|
||||
|
@ -288,11 +287,11 @@ func main() {
|
|||
client := rpc.NewRPCClient(config.RpcUrl, config.HttpTimeout)
|
||||
votekeys, err := GetAssociatedVoteAccounts(ctx, client, rpc.CommitmentFinalized, config.NodeKeys)
|
||||
if err != nil {
|
||||
klog.Fatalf("Failed to get associated vote accounts for %v: %v", config.NodeKeys, err)
|
||||
logger.Fatalf("Failed to get associated vote accounts for %v: %v", config.NodeKeys, err)
|
||||
}
|
||||
identity, err := client.GetIdentity(ctx)
|
||||
if err != nil {
|
||||
klog.Fatalf("Failed to get identity: %v", err)
|
||||
logger.Fatalf("Failed to get identity: %v", err)
|
||||
}
|
||||
collector := NewSolanaCollector(
|
||||
client, slotPacerSchedule, config.BalanceAddresses, config.NodeKeys, votekeys, identity,
|
||||
|
@ -307,6 +306,6 @@ func main() {
|
|||
prometheus.MustRegister(collector)
|
||||
http.Handle("/metrics", promhttp.Handler())
|
||||
|
||||
klog.Infof("listening on %s", config.ListenAddress)
|
||||
klog.Fatal(http.ListenAndServe(config.ListenAddress, nil))
|
||||
logger.Infof("listening on %s", config.ListenAddress)
|
||||
logger.Fatal(http.ListenAndServe(config.ListenAddress, nil))
|
||||
}
|
||||
|
|
|
@ -4,13 +4,14 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/asymmetric-research/solana_exporter/pkg/slog"
|
||||
"go.uber.org/zap"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/asymmetric-research/solana_exporter/pkg/rpc"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -19,6 +20,7 @@ const (
|
|||
|
||||
type SlotWatcher struct {
|
||||
client rpc.Provider
|
||||
logger *zap.SugaredLogger
|
||||
|
||||
// config:
|
||||
nodekeys []string
|
||||
|
@ -60,8 +62,10 @@ func NewSlotWatcher(
|
|||
comprehensiveSlotTracking bool,
|
||||
monitorBlockSizes bool,
|
||||
) *SlotWatcher {
|
||||
logger := slog.Get()
|
||||
watcher := SlotWatcher{
|
||||
client: client,
|
||||
logger: logger,
|
||||
nodekeys: nodekeys,
|
||||
votekeys: votekeys,
|
||||
identity: identity,
|
||||
|
@ -159,7 +163,7 @@ func NewSlotWatcher(
|
|||
if errors.As(err, &alreadyRegisteredErr) || duplicateErr {
|
||||
continue
|
||||
} else {
|
||||
klog.Fatal(fmt.Errorf("failed to register collector: %w", err))
|
||||
logger.Fatal(fmt.Errorf("failed to register collector: %w", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -170,12 +174,12 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
|
|||
ticker := time.NewTicker(pace)
|
||||
defer ticker.Stop()
|
||||
|
||||
klog.Infof("Starting slot watcher")
|
||||
c.logger.Infof("Starting slot watcher")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
klog.Infof("Stopping WatchSlots() at slot %v", c.slotWatermark)
|
||||
c.logger.Infof("Stopping WatchSlots() at slot %v", c.slotWatermark)
|
||||
return
|
||||
default:
|
||||
<-ticker.C
|
||||
|
@ -183,7 +187,7 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
|
|||
commitment := rpc.CommitmentFinalized
|
||||
epochInfo, err := c.client.GetEpochInfo(ctx, commitment)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get epoch info, bailing out: %v", err)
|
||||
c.logger.Errorf("Failed to get epoch info, bailing out: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -199,7 +203,7 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
|
|||
// if we get here, then the tracking numbers are set, so this is a "normal" run.
|
||||
// start by checking if we have progressed since last run:
|
||||
if epochInfo.AbsoluteSlot <= c.slotWatermark {
|
||||
klog.Infof("%v slot number has not advanced from %v, skipping", commitment, c.slotWatermark)
|
||||
c.logger.Infof("%v slot number has not advanced from %v, skipping", commitment, c.slotWatermark)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -208,7 +212,7 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
|
|||
if len(c.votekeys) > 0 {
|
||||
err = c.fetchAndEmitInflationRewards(ctx, c.currentEpoch)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to emit inflation rewards, bailing out: %v", err)
|
||||
c.logger.Errorf("Failed to emit inflation rewards, bailing out: %v", err)
|
||||
}
|
||||
}
|
||||
c.closeCurrentEpoch(ctx, epochInfo)
|
||||
|
@ -223,7 +227,7 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
|
|||
// trackEpoch takes in a new rpc.EpochInfo and sets the SlotWatcher tracking metrics accordingly,
|
||||
// and updates the prometheus gauges associated with those metrics.
|
||||
func (c *SlotWatcher) trackEpoch(ctx context.Context, epoch *rpc.EpochInfo) {
|
||||
klog.Infof("Tracking epoch %v (from %v)", epoch.Epoch, c.currentEpoch)
|
||||
c.logger.Infof("Tracking epoch %v (from %v)", epoch.Epoch, c.currentEpoch)
|
||||
firstSlot, lastSlot := GetEpochBounds(epoch)
|
||||
// if we haven't yet set c.currentEpoch, that (hopefully) means this is the initial setup,
|
||||
// and so we can simply store the tracking numbers
|
||||
|
@ -260,16 +264,16 @@ func (c *SlotWatcher) trackEpoch(ctx context.Context, epoch *rpc.EpochInfo) {
|
|||
}
|
||||
|
||||
// emit epoch bounds:
|
||||
klog.Infof("Emitting epoch bounds: %v (slots %v -> %v)", c.currentEpoch, c.firstSlot, c.lastSlot)
|
||||
c.logger.Infof("Emitting epoch bounds: %v (slots %v -> %v)", c.currentEpoch, c.firstSlot, c.lastSlot)
|
||||
c.EpochNumberMetric.Set(float64(c.currentEpoch))
|
||||
c.EpochFirstSlotMetric.Set(float64(c.firstSlot))
|
||||
c.EpochLastSlotMetric.Set(float64(c.lastSlot))
|
||||
|
||||
// update leader schedule:
|
||||
klog.Infof("Updating leader schedule for epoch %v ...", c.currentEpoch)
|
||||
c.logger.Infof("Updating leader schedule for epoch %v ...", c.currentEpoch)
|
||||
leaderSchedule, err := GetTrimmedLeaderSchedule(ctx, c.client, c.nodekeys, epoch.AbsoluteSlot, c.firstSlot)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get trimmed leader schedule, bailing out: %v", err)
|
||||
c.logger.Errorf("Failed to get trimmed leader schedule, bailing out: %v", err)
|
||||
}
|
||||
c.leaderSchedule = leaderSchedule
|
||||
}
|
||||
|
@ -308,17 +312,17 @@ func (c *SlotWatcher) moveSlotWatermark(ctx context.Context, to int64) {
|
|||
func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot int64) {
|
||||
// add 1 because GetBlockProduction's range is inclusive, and the watermark is already tracked
|
||||
startSlot := c.slotWatermark + 1
|
||||
klog.Infof("Fetching block production in [%v -> %v]", startSlot, endSlot)
|
||||
c.logger.Infof("Fetching block production in [%v -> %v]", startSlot, endSlot)
|
||||
|
||||
// make sure the bounds are contained within the epoch we are currently watching:
|
||||
if err := c.checkValidSlotRange(startSlot, endSlot); err != nil {
|
||||
klog.Fatalf("invalid slot range: %v", err)
|
||||
c.logger.Fatalf("invalid slot range: %v", err)
|
||||
}
|
||||
|
||||
// fetch block production:
|
||||
blockProduction, err := c.client.GetBlockProduction(ctx, rpc.CommitmentFinalized, nil, &startSlot, &endSlot)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get block production, bailing out: %v", err)
|
||||
c.logger.Errorf("Failed to get block production, bailing out: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -337,17 +341,17 @@ func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot i
|
|||
}
|
||||
}
|
||||
|
||||
klog.Infof("Fetched block production in [%v -> %v]", startSlot, endSlot)
|
||||
c.logger.Infof("Fetched block production in [%v -> %v]", startSlot, endSlot)
|
||||
}
|
||||
|
||||
// fetchAndEmitBlockInfos fetches and emits all the fee rewards (+ block sizes) for the tracked addresses between the
|
||||
// slotWatermark and endSlot
|
||||
func (c *SlotWatcher) fetchAndEmitBlockInfos(ctx context.Context, endSlot int64) {
|
||||
startSlot := c.slotWatermark + 1
|
||||
klog.Infof("Fetching fee rewards in [%v -> %v]", startSlot, endSlot)
|
||||
c.logger.Infof("Fetching fee rewards in [%v -> %v]", startSlot, endSlot)
|
||||
|
||||
if err := c.checkValidSlotRange(startSlot, endSlot); err != nil {
|
||||
klog.Fatalf("invalid slot range: %v", err)
|
||||
c.logger.Fatalf("invalid slot range: %v", err)
|
||||
}
|
||||
scheduleToFetch := SelectFromSchedule(c.leaderSchedule, startSlot, endSlot)
|
||||
for identity, leaderSlots := range scheduleToFetch {
|
||||
|
@ -355,16 +359,16 @@ func (c *SlotWatcher) fetchAndEmitBlockInfos(ctx context.Context, endSlot int64)
|
|||
continue
|
||||
}
|
||||
|
||||
klog.Infof("Fetching fee rewards for %v in [%v -> %v]: %v ...", identity, startSlot, endSlot, leaderSlots)
|
||||
c.logger.Infof("Fetching fee rewards for %v in [%v -> %v]: %v ...", identity, startSlot, endSlot, leaderSlots)
|
||||
for _, slot := range leaderSlots {
|
||||
err := c.fetchAndEmitSingleBlockInfo(ctx, identity, c.currentEpoch, slot)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to fetch fee rewards for %v at %v: %v", identity, slot, err)
|
||||
c.logger.Errorf("Failed to fetch fee rewards for %v at %v: %v", identity, slot, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
klog.Infof("Fetched fee rewards in [%v -> %v]", startSlot, endSlot)
|
||||
c.logger.Infof("Fetched fee rewards in [%v -> %v]", startSlot, endSlot)
|
||||
}
|
||||
|
||||
// fetchAndEmitSingleBlockInfo fetches and emits the fee reward + block size for a single block.
|
||||
|
@ -383,7 +387,7 @@ func (c *SlotWatcher) fetchAndEmitSingleBlockInfo(
|
|||
if errors.As(err, &rpcError) {
|
||||
// this is the error code for slot was skipped:
|
||||
if rpcError.Code == rpc.SlotSkippedCode && strings.Contains(rpcError.Message, "skipped") {
|
||||
klog.Infof("slot %v was skipped, no fee rewards.", slot)
|
||||
c.logger.Infof("slot %v was skipped, no fee rewards.", slot)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -415,7 +419,7 @@ func (c *SlotWatcher) fetchAndEmitSingleBlockInfo(
|
|||
// fetchAndEmitInflationRewards fetches and emits the inflation rewards for the configured inflationRewardAddresses
|
||||
// at the provided epoch
|
||||
func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch int64) error {
|
||||
klog.Infof("Fetching inflation reward for epoch %v ...", toString(epoch))
|
||||
c.logger.Infof("Fetching inflation reward for epoch %v ...", toString(epoch))
|
||||
rewardInfos, err := c.client.GetInflationReward(ctx, rpc.CommitmentConfirmed, c.votekeys, &epoch, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error fetching inflation rewards: %w", err)
|
||||
|
@ -426,6 +430,6 @@ func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch in
|
|||
reward := float64(rewardInfo.Amount) / float64(rpc.LamportsInSol)
|
||||
c.InflationRewardsMetric.WithLabelValues(address, toString(epoch)).Set(reward)
|
||||
}
|
||||
klog.Infof("Fetched inflation reward for epoch %v.", epoch)
|
||||
c.logger.Infof("Fetched inflation reward for epoch %v.", epoch)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -4,13 +4,14 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"github.com/asymmetric-research/solana_exporter/pkg/rpc"
|
||||
"k8s.io/klog/v2"
|
||||
"github.com/asymmetric-research/solana_exporter/pkg/slog"
|
||||
"slices"
|
||||
)
|
||||
|
||||
func assertf(condition bool, format string, args ...any) {
|
||||
logger := slog.Get()
|
||||
if !condition {
|
||||
klog.Fatalf(format, args...)
|
||||
logger.Fatalf(format, args...)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -40,6 +41,7 @@ func SelectFromSchedule(schedule map[string][]int64, startSlot, endSlot int64) m
|
|||
func GetTrimmedLeaderSchedule(
|
||||
ctx context.Context, client rpc.Provider, identities []string, slot, epochFirstSlot int64,
|
||||
) (map[string][]int64, error) {
|
||||
logger := slog.Get()
|
||||
leaderSchedule, err := client.GetLeaderSchedule(ctx, rpc.CommitmentConfirmed, slot)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get leader schedule: %w", err)
|
||||
|
@ -55,7 +57,7 @@ func GetTrimmedLeaderSchedule(
|
|||
}
|
||||
trimmedLeaderSchedule[id] = absoluteSlots
|
||||
} else {
|
||||
klog.Warningf("failed to find leader slots for %v", id)
|
||||
logger.Warnf("failed to find leader slots for %v", id)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue