From 70eafb273e60d0bd90263501d031180ffafb7c05 Mon Sep 17 00:00:00 2001 From: mcamou Date: Thu, 30 May 2024 18:51:11 +0200 Subject: [PATCH] Add the solana_leader_slots_by_epoch metric --- cmd/solana_exporter/slots.go | 124 ++++++++++++++++++++++++++--------- pkg/rpc/slot.go | 30 +++++++++ 2 files changed, 122 insertions(+), 32 deletions(-) create mode 100644 pkg/rpc/slot.go diff --git a/cmd/solana_exporter/slots.go b/cmd/solana_exporter/slots.go index 02b2b39..897aed8 100644 --- a/cmd/solana_exporter/slots.go +++ b/cmd/solana_exporter/slots.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "os" "time" @@ -43,9 +44,16 @@ var ( leaderSlotsTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "solana_leader_slots_total", - Help: "Number of leader slots per leader, grouped by skip status (max confirmation)", + Help: "(DEPRECATED) Number of leader slots per leader, grouped by skip status", }, []string{"status", "nodekey"}) + + leaderSlotsByEpoch = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "solana_leader_slots_by_epoch", + Help: "Number of leader slots per leader, grouped by skip status and epoch", + }, + []string{"status", "nodekey", "epoch"}) ) func init() { @@ -55,14 +63,10 @@ func init() { prometheus.MustRegister(epochFirstSlot) prometheus.MustRegister(epochLastSlot) prometheus.MustRegister(leaderSlotsTotal) + prometheus.MustRegister(leaderSlotsByEpoch) } func (c *solanaCollector) WatchSlots() { - var ( - // Last slot number we generated ticks for. - watermark int64 - ) - // Get current slot height and epoch info ctx, cancel := context.WithTimeout(context.Background(), httpTimeout) info, err := c.rpcClient.GetEpochInfo(ctx, rpc.CommitmentMax) @@ -72,8 +76,11 @@ func (c *solanaCollector) WatchSlots() { } cancel() - // Set watermark to current offset on startup (we do not backfill slots we missed at startup) - watermark = info.AbsoluteSlot + // watermark is the last slot number we generated ticks for. Set it to the current offset on startup (we do not backfill slots we missed at startup) + watermark := info.AbsoluteSlot + currentEpoch := info.Epoch + firstSlot := info.AbsoluteSlot - info.SlotIndex + lastSlot := firstSlot + info.SlotsInEpoch ticker := time.NewTicker(slotPacerSchedule) @@ -90,8 +97,18 @@ func (c *solanaCollector) WatchSlots() { } cancel() + if currentEpoch != info.Epoch { + last, err := updateCounters(c, currentEpoch, watermark, &lastSlot) + if err != nil { + klog.Info(err) + continue + } + watermark = last + } + + currentEpoch = info.Epoch // Calculate first and last slot in epoch. - firstSlot := info.AbsoluteSlot - info.SlotIndex + firstSlot = info.AbsoluteSlot - info.SlotIndex lastSlot := firstSlot + info.SlotsInEpoch totalTransactionsTotal.Set(float64(info.TransactionCount)) @@ -108,31 +125,74 @@ func (c *solanaCollector) WatchSlots() { klog.Infof("confirmed slot %d (offset %d, +%d), epoch %d (from slot %d to %d, %d remaining)", info.AbsoluteSlot, info.SlotIndex, info.AbsoluteSlot-watermark, info.Epoch, firstSlot, lastSlot, lastSlot-info.AbsoluteSlot) - first := watermark + 1 - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) - blockProduction, err := c.rpcClient.GetBlockProduction(ctx, &first, nil) + last, err := updateCounters(c, currentEpoch, watermark, nil) if err != nil { - klog.Infof("failed to fetch block production, retrying: %v", err) - cancel() + klog.Info(err) continue } - - for host, prod := range blockProduction.Hosts { - leaderSlotsTotal. - With(prometheus.Labels{"status": "valid", "nodekey": host}). - Add(float64(prod.BlocksProduced)) - leaderSlotsTotal. - With(prometheus.Labels{"status": "skipped", "nodekey": host}). - Add(float64(prod.LeaderSlots - prod.BlocksProduced)) - klog.V(1).Infof( - "Slot %d, node %s: Added %d valid and %d skipped slots", - blockProduction.LastSlot, - host, - prod.BlocksProduced, - prod.LeaderSlots-prod.BlocksProduced, - ) - } - - watermark = blockProduction.LastSlot + watermark = last } } + +func updateCounters(c *solanaCollector, epoch, firstSlot int64, lastSlotOpt *int64) (int64, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + + var lastSlot int64 + var err error + + if lastSlotOpt == nil { + klog.V(2).Info("LastSlot is nil, getting last published slot") + lastSlot, err = c.rpcClient.GetSlot(ctx) + + if err != nil { + cancel() + return 0, fmt.Errorf("Error while getting the last slot: %v", err) + } + cancel() + } else { + lastSlot = *lastSlotOpt + } + klog.V(2).Infof("LastSlot is %d", lastSlot) + + if firstSlot > lastSlot { + return 0, fmt.Errorf( + "In epoch %d, firstSlot (%d) > lastSlot (%d). This should not happen. Not updating.", + epoch, + firstSlot, + lastSlot, + ) + } + + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) + blockProduction, err := c.rpcClient.GetBlockProduction(ctx, &firstSlot, &lastSlot) + if err != nil { + cancel() + return 0, fmt.Errorf("failed to fetch block production, retrying: %v", err) + } + cancel() + + for host, prod := range blockProduction.Hosts { + valid := float64(prod.BlocksProduced) + skipped := float64(prod.LeaderSlots - prod.BlocksProduced) + + epochStr := fmt.Sprintf("%d", epoch) + + leaderSlotsTotal.WithLabelValues("valid", host).Add(valid) + leaderSlotsTotal.WithLabelValues("skipped", host).Add(skipped) + + leaderSlotsByEpoch.WithLabelValues("valid", host, epochStr).Add(valid) + leaderSlotsByEpoch.WithLabelValues("skipped", host, epochStr).Add(skipped) + + klog.V(1).Infof( + "Epoch %s, slots %d-%d, node %s: Added %d valid and %d skipped slots", + epochStr, + firstSlot, + lastSlot, + host, + prod.BlocksProduced, + prod.LeaderSlots-prod.BlocksProduced, + ) + } + + return lastSlot, nil +} diff --git a/pkg/rpc/slot.go b/pkg/rpc/slot.go new file mode 100644 index 0000000..304ede5 --- /dev/null +++ b/pkg/rpc/slot.go @@ -0,0 +1,30 @@ +package rpc + +import ( + "context" + "encoding/json" + "fmt" + + "k8s.io/klog/v2" +) + +type getSlotResponse struct { + Result int64 `json:"result"` +} + +// https://solana.com/docs/rpc/http/getslot +func (c *RPCClient) GetSlot(ctx context.Context) (int64, error) { + body, err := c.rpcRequest(ctx, formatRPCRequest("getSlot", []interface{}{})) + if err != nil { + return 0, fmt.Errorf("RPC call failed: %w", err) + } + + klog.V(2).Infof("getSlot response: %v", string(body)) + + var resp getSlotResponse + if err = json.Unmarshal(body, &resp); err != nil { + return 0, fmt.Errorf("failed to decode response body: %w", err) + } + + return resp.Result, nil +}