Merge pull request #47 from asymmetric-research/get-health

Get health!!
This commit is contained in:
Matt Johnstone 2024-10-22 20:35:56 +02:00 committed by GitHub
commit 3f6e578768
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 126 additions and 9 deletions

View File

@ -2,6 +2,7 @@ package main
import ( import (
"context" "context"
"errors"
"github.com/asymmetric-research/solana_exporter/pkg/rpc" "github.com/asymmetric-research/solana_exporter/pkg/rpc"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
@ -46,6 +47,8 @@ type SolanaCollector struct {
validatorDelinquent *prometheus.Desc validatorDelinquent *prometheus.Desc
solanaVersion *prometheus.Desc solanaVersion *prometheus.Desc
balances *prometheus.Desc balances *prometheus.Desc
isHealthy *prometheus.Desc
numSlotsBehind *prometheus.Desc
} }
func NewSolanaCollector( func NewSolanaCollector(
@ -97,6 +100,18 @@ func NewSolanaCollector(
[]string{AddressLabel}, []string{AddressLabel},
nil, nil,
), ),
isHealthy: prometheus.NewDesc(
"solana_is_healthy",
"Whether the node is healthy or not.",
nil,
nil,
),
numSlotsBehind: prometheus.NewDesc(
"solana_num_slots_behind",
"The number of slots that the node is behind the latest cluster confirmed slot.",
nil,
nil,
),
} }
return collector return collector
} }
@ -109,11 +124,14 @@ func (c *SolanaCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.validatorRootSlot ch <- c.validatorRootSlot
ch <- c.validatorDelinquent ch <- c.validatorDelinquent
ch <- c.balances ch <- c.balances
ch <- c.isHealthy
ch <- c.numSlotsBehind
} }
func (c *SolanaCollector) collectVoteAccounts(ctx context.Context, ch chan<- prometheus.Metric) { func (c *SolanaCollector) collectVoteAccounts(ctx context.Context, ch chan<- prometheus.Metric) {
voteAccounts, err := c.rpcClient.GetVoteAccounts(ctx, rpc.CommitmentConfirmed, nil) voteAccounts, err := c.rpcClient.GetVoteAccounts(ctx, rpc.CommitmentConfirmed, nil)
if err != nil { if err != nil {
klog.Errorf("failed to get vote accounts: %v", err)
ch <- prometheus.NewInvalidMetric(c.totalValidatorsDesc, err) ch <- prometheus.NewInvalidMetric(c.totalValidatorsDesc, err)
ch <- prometheus.NewInvalidMetric(c.validatorActivatedStake, err) ch <- prometheus.NewInvalidMetric(c.validatorActivatedStake, err)
ch <- prometheus.NewInvalidMetric(c.validatorLastVote, err) ch <- prometheus.NewInvalidMetric(c.validatorLastVote, err)
@ -169,6 +187,7 @@ func (c *SolanaCollector) collectVersion(ctx context.Context, ch chan<- promethe
version, err := c.rpcClient.GetVersion(ctx) version, err := c.rpcClient.GetVersion(ctx)
if err != nil { if err != nil {
klog.Errorf("failed to get version: %v", err)
ch <- prometheus.NewInvalidMetric(c.solanaVersion, err) ch <- prometheus.NewInvalidMetric(c.solanaVersion, err)
return return
} }
@ -179,6 +198,7 @@ func (c *SolanaCollector) collectVersion(ctx context.Context, ch chan<- promethe
func (c *SolanaCollector) collectBalances(ctx context.Context, ch chan<- prometheus.Metric) { func (c *SolanaCollector) collectBalances(ctx context.Context, ch chan<- prometheus.Metric) {
balances, err := FetchBalances(ctx, c.rpcClient, c.balanceAddresses) balances, err := FetchBalances(ctx, c.rpcClient, c.balanceAddresses)
if err != nil { if err != nil {
klog.Errorf("failed to get balances: %v", err)
ch <- prometheus.NewInvalidMetric(c.solanaVersion, err) ch <- prometheus.NewInvalidMetric(c.solanaVersion, err)
return return
} }
@ -188,6 +208,45 @@ func (c *SolanaCollector) collectBalances(ctx context.Context, ch chan<- prometh
} }
} }
func (c *SolanaCollector) collectHealth(ctx context.Context, ch chan<- prometheus.Metric) {
var (
isHealthy = 1
numSlotsBehind int64
)
_, err := c.rpcClient.GetHealth(ctx)
if err != nil {
var rpcError *rpc.RPCError
if errors.As(err, &rpcError) {
var errorData rpc.NodeUnhealthyErrorData
if rpcError.Data == nil {
// if there is no data, then this is some unexpected error and should just be logged
klog.Errorf("failed to get health: %v", err)
ch <- prometheus.NewInvalidMetric(c.isHealthy, err)
ch <- prometheus.NewInvalidMetric(c.numSlotsBehind, err)
return
}
if err = rpc.UnpackRpcErrorData(rpcError, errorData); err != nil {
// if we error here, it means we have the incorrect format
klog.Fatalf("failed to unpack %s rpc error: %v", rpcError.Method, err.Error())
}
isHealthy = 0
numSlotsBehind = errorData.NumSlotsBehind
} else {
// if it's not an RPC error, log it
klog.Errorf("failed to get health: %v", err)
ch <- prometheus.NewInvalidMetric(c.isHealthy, err)
ch <- prometheus.NewInvalidMetric(c.numSlotsBehind, err)
return
}
}
ch <- prometheus.MustNewConstMetric(c.isHealthy, prometheus.GaugeValue, float64(isHealthy))
ch <- prometheus.MustNewConstMetric(c.numSlotsBehind, prometheus.GaugeValue, float64(numSlotsBehind))
return
}
func (c *SolanaCollector) Collect(ch chan<- prometheus.Metric) { func (c *SolanaCollector) Collect(ch chan<- prometheus.Metric) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -195,6 +254,7 @@ func (c *SolanaCollector) Collect(ch chan<- prometheus.Metric) {
c.collectVoteAccounts(ctx, ch) c.collectVoteAccounts(ctx, ch)
c.collectVersion(ctx, ch) c.collectVersion(ctx, ch)
c.collectBalances(ctx, ch) c.collectBalances(ctx, ch)
c.collectHealth(ctx, ch)
} }
func main() { func main() {

View File

@ -178,6 +178,12 @@ func (c *staticRPCClient) GetBlock(
return nil, nil return nil, nil
} }
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetHealth(ctx context.Context) (*string, error) {
health := "ok"
return &health, nil
}
/* /*
===== DYNAMIC CLIENT =====: ===== DYNAMIC CLIENT =====:
*/ */
@ -373,6 +379,12 @@ func (c *dynamicRPCClient) GetBlock(
return nil, nil return nil, nil
} }
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetHealth(ctx context.Context) (*string, error) {
health := "ok"
return &health, nil
}
/* /*
===== OTHER TEST UTILITIES =====: ===== OTHER TEST UTILITIES =====:
*/ */

View File

@ -205,7 +205,7 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
// and updates the prometheus gauges associated with those metrics. // and updates the prometheus gauges associated with those metrics.
func (c *SlotWatcher) trackEpoch(ctx context.Context, epoch *rpc.EpochInfo) { func (c *SlotWatcher) trackEpoch(ctx context.Context, epoch *rpc.EpochInfo) {
klog.Infof("Tracking epoch %v (from %v)", epoch.Epoch, c.currentEpoch) klog.Infof("Tracking epoch %v (from %v)", epoch.Epoch, c.currentEpoch)
firstSlot, lastSlot := getEpochBounds(epoch) firstSlot, lastSlot := GetEpochBounds(epoch)
// if we haven't yet set c.currentEpoch, that (hopefully) means this is the initial setup, // if we haven't yet set c.currentEpoch, that (hopefully) means this is the initial setup,
// and so we can simply store the tracking numbers // and so we can simply store the tracking numbers
if c.currentEpoch == 0 { if c.currentEpoch == 0 {
@ -393,12 +393,6 @@ func (c *SlotWatcher) fetchAndEmitSingleBlockInfo(
return nil return nil
} }
// getEpochBounds returns the first slot and last slot within an [inclusive] Epoch
func getEpochBounds(info *rpc.EpochInfo) (int64, int64) {
firstSlot := info.AbsoluteSlot - info.SlotIndex
return firstSlot, firstSlot + info.SlotsInEpoch - 1
}
// fetchAndEmitInflationRewards fetches and emits the inflation rewards for the configured inflationRewardAddresses // fetchAndEmitInflationRewards fetches and emits the inflation rewards for the configured inflationRewardAddresses
// at the provided epoch // at the provided epoch
func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch int64) error { func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch int64) error {

View File

@ -106,7 +106,7 @@ func TestSolanaCollector_WatchSlots_Static(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
firstSlot, lastSlot := getEpochBounds(&staticEpochInfo) firstSlot, lastSlot := GetEpochBounds(&staticEpochInfo)
type testCase struct { type testCase struct {
expectedValue float64 expectedValue float64
metric prometheus.Gauge metric prometheus.Gauge

View File

@ -113,3 +113,9 @@ func CombineUnique[T comparable](args ...[]T) []T {
} }
return uniqueItems return uniqueItems
} }
// GetEpochBounds returns the first slot and last slot within an [inclusive] Epoch
func GetEpochBounds(info *rpc.EpochInfo) (int64, int64) {
firstSlot := info.AbsoluteSlot - info.SlotIndex
return firstSlot, firstSlot + info.SlotsInEpoch - 1
}

View File

@ -56,3 +56,10 @@ func TestGetAssociatedVoteAccounts(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, votekeys, voteAccounts) assert.Equal(t, votekeys, voteAccounts)
} }
func TestGetEpochBounds(t *testing.T) {
epoch := rpc.EpochInfo{AbsoluteSlot: 25, SlotIndex: 5, SlotsInEpoch: 10}
first, last := GetEpochBounds(&epoch)
assert.Equal(t, int64(20), first)
assert.Equal(t, int64(29), last)
}

View File

@ -73,6 +73,8 @@ type Provider interface {
GetLeaderSchedule(ctx context.Context, commitment Commitment, slot int64) (map[string][]int64, error) GetLeaderSchedule(ctx context.Context, commitment Commitment, slot int64) (map[string][]int64, error)
GetBlock(ctx context.Context, commitment Commitment, slot int64, transactionDetails string) (*Block, error) GetBlock(ctx context.Context, commitment Commitment, slot int64, transactionDetails string) (*Block, error)
GetHealth(ctx context.Context) (*string, error)
} }
func (c Commitment) MarshalJSON() ([]byte, error) { func (c Commitment) MarshalJSON() ([]byte, error) {
@ -138,6 +140,7 @@ func getResponse[T any](
// check for an actual rpc error // check for an actual rpc error
if rpcResponse.Error.Code != 0 { if rpcResponse.Error.Code != 0 {
rpcResponse.Error.Method = method
return &rpcResponse.Error return &rpcResponse.Error
} }
return nil return nil
@ -301,3 +304,14 @@ func (c *Client) GetBlock(
} }
return &resp.Result, nil return &resp.Result, nil
} }
// GetHealth returns the current health of the node. A healthy node is one that is within a blockchain-configured slots
// of the latest cluster confirmed slot.
// See API docs: https://solana.com/docs/rpc/http/gethealth
func (c *Client) GetHealth(ctx context.Context) (*string, error) {
var resp response[string]
if err := getResponse(ctx, c, "getHealth", []any{}, &resp); err != nil {
return nil, err
}
return &resp.Result, nil
}

View File

@ -1,5 +1,10 @@
package rpc package rpc
import (
"encoding/json"
"fmt"
)
// error codes: https://github.com/anza-xyz/agave/blob/489f483e1d7b30ef114e0123994818b2accfa389/rpc-client-api/src/custom_error.rs#L17 // error codes: https://github.com/anza-xyz/agave/blob/489f483e1d7b30ef114e0123994818b2accfa389/rpc-client-api/src/custom_error.rs#L17
const ( const (
BlockCleanedUpCode = -32001 BlockCleanedUpCode = -32001
@ -21,3 +26,20 @@ const (
EpochRewardsPeriodActiveCode = -32017 EpochRewardsPeriodActiveCode = -32017
SlotNotEpochBoundaryCode = -32018 SlotNotEpochBoundaryCode = -32018
) )
type (
NodeUnhealthyErrorData struct {
NumSlotsBehind int64 `json:"numSlotsBehind"`
}
)
func UnpackRpcErrorData[T any](rpcErr *RPCError, formatted T) error {
bytesData, err := json.Marshal(rpcErr.Data)
if err != nil {
return fmt.Errorf("failed to marshal %s rpc-error data: %w", rpcErr.Method, err)
}
if err = json.Unmarshal(bytesData, formatted); err != nil {
return fmt.Errorf("failed to unmarshal %s rpc-error data: %w", rpcErr.Method, err)
}
return nil
}

View File

@ -10,6 +10,8 @@ type (
Message string `json:"message"` Message string `json:"message"`
Code int64 `json:"code"` Code int64 `json:"code"`
Data map[string]any `json:"data"` Data map[string]any `json:"data"`
// Method is not returned by the RPC, rather added by the client for visibility purposes
Method string
} }
response[T any] struct { response[T any] struct {
@ -99,7 +101,7 @@ type (
) )
func (e *RPCError) Error() string { func (e *RPCError) Error() string {
return fmt.Sprintf("rpc error (code: %d): %s (data: %v)", e.Code, e.Message, e.Data) return fmt.Sprintf("%s rpc error (code: %d): %s (data: %v)", e.Method, e.Code, e.Message, e.Data)
} }
func (hp *HostProduction) UnmarshalJSON(data []byte) error { func (hp *HostProduction) UnmarshalJSON(data []byte) error {