fixed dynamic/static test interference

This commit is contained in:
Matt Johnstone 2024-06-14 00:47:47 +02:00
parent b742be47ef
commit 14cbbde7e4
No known key found for this signature in database
GPG Key ID: 7D96C656728409F9
5 changed files with 99 additions and 78 deletions

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -148,12 +149,9 @@ func getSlotMetricValues() slotMetricValues {
} }
func TestSolanaCollector_WatchSlots_Dynamic(t *testing.T) { func TestSolanaCollector_WatchSlots_Dynamic(t *testing.T) {
// this test passes, however, it seems to cause the static tests to fail (after this test runs, // reset metrics before running tests:
// the static tests fail to set their correct values to the prometheus metrics). So, putting this leaderSlotsTotal.Reset()
// here while I debug leaderSlotsByEpoch.Reset()
if testing.Short() {
t.Skip()
}
// create clients: // create clients:
client := newDynamicRPCClient() client := newDynamicRPCClient()
@ -165,9 +163,10 @@ func TestSolanaCollector_WatchSlots_Dynamic(t *testing.T) {
// start client/collector and wait a bit: // start client/collector and wait a bit:
go client.Run() go client.Run()
time.Sleep(1 * time.Second) time.Sleep(time.Second)
go collector.WatchSlots() ctx, cancel := context.WithCancel(context.Background())
time.Sleep(1 * time.Second) go collector.WatchSlots(ctx)
time.Sleep(time.Second)
initial := getSlotMetricValues() initial := getSlotMetricValues()
@ -175,7 +174,7 @@ func TestSolanaCollector_WatchSlots_Dynamic(t *testing.T) {
var epochChanged bool var epochChanged bool
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
// wait a bit then get new metrics // wait a bit then get new metrics
time.Sleep(1 * time.Second) time.Sleep(time.Second)
final := getSlotMetricValues() final := getSlotMetricValues()
// make sure that things have increased // make sure that things have increased
@ -203,10 +202,16 @@ func TestSolanaCollector_WatchSlots_Dynamic(t *testing.T) {
initial.EpochNumber, initial.EpochNumber,
final.EpochNumber, final.EpochNumber,
) )
if final.EpochNumber > initial.EpochNumber {
epochChanged = true
}
// make current final the new initial (for next iteration) // make current final the new initial (for next iteration)
initial = final initial = final
} }
assert.True(t, epochChanged) assert.Truef(t, epochChanged, "Epoch has not changed!")
// cancel and wait for cancellation:
cancel()
time.Sleep(time.Second)
} }

View File

@ -144,7 +144,7 @@ func main() {
collector := NewSolanaCollector(*rpcAddr) collector := NewSolanaCollector(*rpcAddr)
go collector.WatchSlots() go collector.WatchSlots(context.Background())
prometheus.MustRegister(collector) prometheus.MustRegister(collector)
http.Handle("/metrics", promhttp.Handler()) http.Handle("/metrics", promhttp.Handler())

View File

@ -65,10 +65,10 @@ func init() {
prometheus.MustRegister(leaderSlotsByEpoch) prometheus.MustRegister(leaderSlotsByEpoch)
} }
func (c *solanaCollector) WatchSlots() { func (c *solanaCollector) WatchSlots(ctx context.Context) {
// Get current slot height and epoch info // Get current slot height and epoch info
ctx, cancel := context.WithTimeout(context.Background(), httpTimeout) ctx_, cancel := context.WithTimeout(context.Background(), httpTimeout)
info, err := c.rpcClient.GetEpochInfo(ctx, rpc.CommitmentMax) info, err := c.rpcClient.GetEpochInfo(ctx_, rpc.CommitmentMax)
if err != nil { if err != nil {
klog.Fatalf("failed to fetch epoch info, bailing out: %v", err) klog.Fatalf("failed to fetch epoch info, bailing out: %v", err)
} }
@ -92,41 +92,75 @@ func (c *solanaCollector) WatchSlots() {
ticker := time.NewTicker(c.slotPace) ticker := time.NewTicker(c.slotPace)
for { for {
<-ticker.C select {
case <-ctx.Done():
klog.Infof("Stopping WatchSlots() at slot %v", watermark)
return
// Get current slot height and epoch info default:
ctx, cancel := context.WithTimeout(context.Background(), httpTimeout) <-ticker.C
info, err := c.rpcClient.GetEpochInfo(ctx, rpc.CommitmentMax)
if err != nil {
klog.Infof("failed to fetch epoch info, retrying: %v", err)
cancel()
continue
}
cancel()
if watermark == info.AbsoluteSlot { // Get current slot height and epoch info
klog.Infof("slot has not advanced at %d, skipping", info.AbsoluteSlot) ctx_, cancel := context.WithTimeout(context.Background(), httpTimeout)
continue info, err := c.rpcClient.GetEpochInfo(ctx_, rpc.CommitmentMax)
}
if currentEpoch != info.Epoch {
klog.Infof(
"changing epoch from %d to %d. Watermark: %d, lastSlot: %d",
currentEpoch,
info.Epoch,
watermark,
lastSlot,
)
last, err := updateCounters(c.rpcClient, currentEpoch, watermark, &lastSlot)
if err != nil { if err != nil {
klog.Error(err) klog.Infof("failed to fetch epoch info, retrying: %v", err)
cancel()
continue
}
cancel()
if watermark == info.AbsoluteSlot {
klog.Infof("slot has not advanced at %d, skipping", info.AbsoluteSlot)
continue
}
if currentEpoch != info.Epoch {
klog.Infof(
"changing epoch from %d to %d. Watermark: %d, lastSlot: %d",
currentEpoch,
info.Epoch,
watermark,
lastSlot,
)
last, err := updateCounters(c.rpcClient, currentEpoch, watermark, &lastSlot)
if err != nil {
klog.Error(err)
continue
}
klog.Infof(
"counters updated to slot %d (+%d), epoch %d (slots %d-%d, %d remaining)",
last,
last-watermark,
currentEpoch,
firstSlot,
lastSlot,
lastSlot-last,
)
watermark = last
currentEpoch, firstSlot, lastSlot = getEpochBounds(info)
currentEpochNumber.Set(float64(currentEpoch))
epochFirstSlot.Set(float64(firstSlot))
epochLastSlot.Set(float64(lastSlot))
}
totalTransactionsTotal.Set(float64(info.TransactionCount))
confirmedSlotHeight.Set(float64(info.AbsoluteSlot))
last, err := updateCounters(c.rpcClient, currentEpoch, watermark, nil)
if err != nil {
klog.Info(err)
continue continue
} }
klog.Infof( klog.Infof(
"counters updated to slot %d (+%d), epoch %d (slots %d-%d, %d remaining)", "counters updated to slot %d (offset %d, +%d), epoch %d (slots %d-%d, %d remaining)",
last, last,
info.SlotIndex,
last-watermark, last-watermark,
currentEpoch, currentEpoch,
firstSlot, firstSlot,
@ -135,34 +169,7 @@ func (c *solanaCollector) WatchSlots() {
) )
watermark = last watermark = last
currentEpoch, firstSlot, lastSlot = getEpochBounds(info)
currentEpochNumber.Set(float64(currentEpoch))
epochFirstSlot.Set(float64(firstSlot))
epochLastSlot.Set(float64(lastSlot))
} }
totalTransactionsTotal.Set(float64(info.TransactionCount))
confirmedSlotHeight.Set(float64(info.AbsoluteSlot))
last, err := updateCounters(c.rpcClient, currentEpoch, watermark, nil)
if err != nil {
klog.Info(err)
continue
}
klog.Infof(
"counters updated to slot %d (offset %d, +%d), epoch %d (slots %d-%d, %d remaining)",
last,
info.SlotIndex,
last-watermark,
currentEpoch,
firstSlot,
lastSlot,
lastSlot-last,
)
watermark = last
} }
} }

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/client_golang/prometheus/testutil"
@ -80,12 +81,17 @@ solana_node_version{version="1.16.7"} 1
} }
func TestSolanaCollector_WatchSlots_Static(t *testing.T) { func TestSolanaCollector_WatchSlots_Static(t *testing.T) {
// reset metrics before running tests:
leaderSlotsTotal.Reset()
leaderSlotsByEpoch.Reset()
collector := createSolanaCollector( collector := createSolanaCollector(
&staticRPCClient{}, &staticRPCClient{},
100*time.Millisecond, 100*time.Millisecond,
) )
prometheus.NewPedanticRegistry().MustRegister(collector) prometheus.NewPedanticRegistry().MustRegister(collector)
go collector.WatchSlots() ctx, cancel := context.WithCancel(context.Background())
go collector.WatchSlots(ctx)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
tests := []struct { tests := []struct {
@ -142,6 +148,9 @@ func TestSolanaCollector_WatchSlots_Static(t *testing.T) {
} }
}) })
} }
// cancel and wait for cancellation:
cancel()
time.Sleep(time.Second)
} }
func testBlockProductionMetric( func testBlockProductionMetric(

View File

@ -59,20 +59,20 @@ var (
TransactionCount: 22661093, TransactionCount: 22661093,
} }
staticBlockProduction = rpc.BlockProduction{ staticBlockProduction = rpc.BlockProduction{
FirstSlot: 1000, FirstSlot: 100000000,
LastSlot: 2000, LastSlot: 200000000,
Hosts: map[string]rpc.BlockProductionPerHost{ Hosts: map[string]rpc.BlockProductionPerHost{
"bbb": { "bbb": {
LeaderSlots: 400, LeaderSlots: 40000000,
BlocksProduced: 360, BlocksProduced: 36000000,
}, },
"ccc": { "ccc": {
LeaderSlots: 300, LeaderSlots: 30000000,
BlocksProduced: 296, BlocksProduced: 29600000,
}, },
"aaa": { "aaa": {
LeaderSlots: 300, LeaderSlots: 30000000,
BlocksProduced: 0, BlocksProduced: 10000000,
}, },
}, },
} }