Add the solana_leader_slots_by_epoch metric

This commit is contained in:
mcamou 2024-05-30 18:51:11 +02:00
parent 7fbc2a450e
commit 70eafb273e
2 changed files with 122 additions and 32 deletions

View File

@ -2,6 +2,7 @@ package main
import (
"context"
"fmt"
"os"
"time"
@ -43,9 +44,16 @@ var (
leaderSlotsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "solana_leader_slots_total",
Help: "Number of leader slots per leader, grouped by skip status (max confirmation)",
Help: "(DEPRECATED) Number of leader slots per leader, grouped by skip status",
},
[]string{"status", "nodekey"})
leaderSlotsByEpoch = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "solana_leader_slots_by_epoch",
Help: "Number of leader slots per leader, grouped by skip status and epoch",
},
[]string{"status", "nodekey", "epoch"})
)
func init() {
@ -55,14 +63,10 @@ func init() {
prometheus.MustRegister(epochFirstSlot)
prometheus.MustRegister(epochLastSlot)
prometheus.MustRegister(leaderSlotsTotal)
prometheus.MustRegister(leaderSlotsByEpoch)
}
func (c *solanaCollector) WatchSlots() {
var (
// Last slot number we generated ticks for.
watermark int64
)
// Get current slot height and epoch info
ctx, cancel := context.WithTimeout(context.Background(), httpTimeout)
info, err := c.rpcClient.GetEpochInfo(ctx, rpc.CommitmentMax)
@ -72,8 +76,11 @@ func (c *solanaCollector) WatchSlots() {
}
cancel()
// Set watermark to current offset on startup (we do not backfill slots we missed at startup)
watermark = info.AbsoluteSlot
// watermark is the last slot number we generated ticks for. Set it to the current offset on startup (we do not backfill slots we missed at startup)
watermark := info.AbsoluteSlot
currentEpoch := info.Epoch
firstSlot := info.AbsoluteSlot - info.SlotIndex
lastSlot := firstSlot + info.SlotsInEpoch
ticker := time.NewTicker(slotPacerSchedule)
@ -90,8 +97,18 @@ func (c *solanaCollector) WatchSlots() {
}
cancel()
if currentEpoch != info.Epoch {
last, err := updateCounters(c, currentEpoch, watermark, &lastSlot)
if err != nil {
klog.Info(err)
continue
}
watermark = last
}
currentEpoch = info.Epoch
// Calculate first and last slot in epoch.
firstSlot := info.AbsoluteSlot - info.SlotIndex
firstSlot = info.AbsoluteSlot - info.SlotIndex
lastSlot := firstSlot + info.SlotsInEpoch
totalTransactionsTotal.Set(float64(info.TransactionCount))
@ -108,31 +125,74 @@ func (c *solanaCollector) WatchSlots() {
klog.Infof("confirmed slot %d (offset %d, +%d), epoch %d (from slot %d to %d, %d remaining)",
info.AbsoluteSlot, info.SlotIndex, info.AbsoluteSlot-watermark, info.Epoch, firstSlot, lastSlot, lastSlot-info.AbsoluteSlot)
first := watermark + 1
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
blockProduction, err := c.rpcClient.GetBlockProduction(ctx, &first, nil)
last, err := updateCounters(c, currentEpoch, watermark, nil)
if err != nil {
klog.Infof("failed to fetch block production, retrying: %v", err)
cancel()
klog.Info(err)
continue
}
for host, prod := range blockProduction.Hosts {
leaderSlotsTotal.
With(prometheus.Labels{"status": "valid", "nodekey": host}).
Add(float64(prod.BlocksProduced))
leaderSlotsTotal.
With(prometheus.Labels{"status": "skipped", "nodekey": host}).
Add(float64(prod.LeaderSlots - prod.BlocksProduced))
klog.V(1).Infof(
"Slot %d, node %s: Added %d valid and %d skipped slots",
blockProduction.LastSlot,
host,
prod.BlocksProduced,
prod.LeaderSlots-prod.BlocksProduced,
)
}
watermark = blockProduction.LastSlot
watermark = last
}
}
func updateCounters(c *solanaCollector, epoch, firstSlot int64, lastSlotOpt *int64) (int64, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
var lastSlot int64
var err error
if lastSlotOpt == nil {
klog.V(2).Info("LastSlot is nil, getting last published slot")
lastSlot, err = c.rpcClient.GetSlot(ctx)
if err != nil {
cancel()
return 0, fmt.Errorf("Error while getting the last slot: %v", err)
}
cancel()
} else {
lastSlot = *lastSlotOpt
}
klog.V(2).Infof("LastSlot is %d", lastSlot)
if firstSlot > lastSlot {
return 0, fmt.Errorf(
"In epoch %d, firstSlot (%d) > lastSlot (%d). This should not happen. Not updating.",
epoch,
firstSlot,
lastSlot,
)
}
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
blockProduction, err := c.rpcClient.GetBlockProduction(ctx, &firstSlot, &lastSlot)
if err != nil {
cancel()
return 0, fmt.Errorf("failed to fetch block production, retrying: %v", err)
}
cancel()
for host, prod := range blockProduction.Hosts {
valid := float64(prod.BlocksProduced)
skipped := float64(prod.LeaderSlots - prod.BlocksProduced)
epochStr := fmt.Sprintf("%d", epoch)
leaderSlotsTotal.WithLabelValues("valid", host).Add(valid)
leaderSlotsTotal.WithLabelValues("skipped", host).Add(skipped)
leaderSlotsByEpoch.WithLabelValues("valid", host, epochStr).Add(valid)
leaderSlotsByEpoch.WithLabelValues("skipped", host, epochStr).Add(skipped)
klog.V(1).Infof(
"Epoch %s, slots %d-%d, node %s: Added %d valid and %d skipped slots",
epochStr,
firstSlot,
lastSlot,
host,
prod.BlocksProduced,
prod.LeaderSlots-prod.BlocksProduced,
)
}
return lastSlot, nil
}

30
pkg/rpc/slot.go Normal file
View File

@ -0,0 +1,30 @@
package rpc
import (
"context"
"encoding/json"
"fmt"
"k8s.io/klog/v2"
)
type getSlotResponse struct {
Result int64 `json:"result"`
}
// https://solana.com/docs/rpc/http/getslot
func (c *RPCClient) GetSlot(ctx context.Context) (int64, error) {
body, err := c.rpcRequest(ctx, formatRPCRequest("getSlot", []interface{}{}))
if err != nil {
return 0, fmt.Errorf("RPC call failed: %w", err)
}
klog.V(2).Infof("getSlot response: %v", string(body))
var resp getSlotResponse
if err = json.Unmarshal(body, &resp); err != nil {
return 0, fmt.Errorf("failed to decode response body: %w", err)
}
return resp.Result, nil
}