refactored GetBlockProduction

This commit is contained in:
Matt Johnstone 2024-10-02 14:47:04 +02:00
parent b331072a4d
commit ab386801b1
No known key found for this signature in database
GPG Key ID: BE985FBB9BE7D3BB
6 changed files with 119 additions and 96 deletions

View File

@ -114,9 +114,9 @@ func (c *solanaCollector) Describe(ch chan<- *prometheus.Desc) {
}
func (c *solanaCollector) collectVoteAccounts(ctx context.Context, ch chan<- prometheus.Metric) {
params := map[string]string{"commitment": string(rpc.CommitmentRecent)}
params := map[string]string{"commitment": string(rpc.CommitmentProcessed)}
if *votePubkey != "" {
params = map[string]string{"commitment": string(rpc.CommitmentRecent), "votePubkey": *votePubkey}
params = map[string]string{"commitment": string(rpc.CommitmentProcessed), "votePubkey": *votePubkey}
}
voteAccounts, err := c.rpcClient.GetVoteAccounts(ctx, []interface{}{params})

View File

@ -54,13 +54,12 @@ var (
TransactionCount: 22661093,
}
staticBlockProduction = rpc.BlockProduction{
FirstSlot: 100000000,
LastSlot: 200000000,
Hosts: map[string]rpc.BlockProductionPerHost{
"bbb": {LeaderSlots: 40000000, BlocksProduced: 36000000},
"ccc": {LeaderSlots: 30000000, BlocksProduced: 29600000},
"aaa": {LeaderSlots: 30000000, BlocksProduced: 10000000},
ByIdentity: map[string]rpc.HostProduction{
"aaa": {300, 100},
"bbb": {400, 360},
"ccc": {300, 296},
},
Range: rpc.BlockProductionRange{FirstSlot: 1000, LastSlot: 2000},
}
staticVoteAccounts = rpc.VoteAccounts{
Current: []rpc.VoteAccount{
@ -136,7 +135,7 @@ func (c *staticRPCClient) GetVoteAccounts(ctx context.Context, params []interfac
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetBlockProduction(
ctx context.Context, firstSlot *int64, lastSlot *int64,
ctx context.Context, identity *string, firstSlot *int64, lastSlot *int64,
) (*rpc.BlockProduction, error) {
return &staticBlockProduction, nil
}
@ -292,23 +291,25 @@ func (c *dynamicRPCClient) GetVoteAccounts(ctx context.Context, params []interfa
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetBlockProduction(
ctx context.Context, firstSlot *int64, lastSlot *int64,
ctx context.Context, identity *string, firstSlot *int64, lastSlot *int64,
) (*rpc.BlockProduction, error) {
hostProduction := make(map[string]rpc.BlockProductionPerHost)
byIdentity := make(map[string]rpc.HostProduction)
for _, identity := range identities {
hostProduction[identity] = rpc.BlockProductionPerHost{LeaderSlots: 0, BlocksProduced: 0}
byIdentity[identity] = rpc.HostProduction{LeaderSlots: 0, BlocksProduced: 0}
}
for i := *firstSlot; i <= *lastSlot; i++ {
info := c.SlotInfos[int(i)]
hp := hostProduction[info.leader]
hp.LeaderSlots++
production := byIdentity[info.leader]
production.LeaderSlots++
if info.blockProduced {
hp.BlocksProduced++
production.BlocksProduced++
}
hostProduction[info.leader] = hp
byIdentity[info.leader] = production
}
production := rpc.BlockProduction{FirstSlot: *firstSlot, LastSlot: *lastSlot, Hosts: hostProduction}
return &production, nil
blockProduction := rpc.BlockProduction{
ByIdentity: byIdentity, Range: rpc.BlockProductionRange{FirstSlot: *firstSlot, LastSlot: *lastSlot},
}
return &blockProduction, nil
}
//goland:noinspection GoUnusedParameter
@ -407,19 +408,19 @@ solana_validator_delinquent{nodekey="ccc",pubkey="CCC"} 0
{
Name: "solana_node_version",
ExpectedResponse: `
# HELP solana_node_version Node version of solana
# TYPE solana_node_version gauge
solana_node_version{version="1.16.7"} 1
# HELP solana_node_version Node version of solana
# TYPE solana_node_version gauge
solana_node_version{version="1.16.7"} 1
`,
},
{
Name: "solana_account_balance",
ExpectedResponse: `
# HELP solana_account_balance Solana account balances
# TYPE solana_account_balance gauge
solana_account_balance{address="aaa"} 1
solana_account_balance{address="bbb"} 2
solana_account_balance{address="ccc"} 3
# HELP solana_account_balance Solana account balances
# TYPE solana_account_balance gauge
solana_account_balance{address="aaa"} 1
solana_account_balance{address="bbb"} 2
solana_account_balance{address="ccc"} 3
`,
},
}

View File

@ -70,22 +70,22 @@ func init() {
func (c *solanaCollector) WatchSlots(ctx context.Context) {
// Get current slot height and epoch info
ctx_, cancel := context.WithTimeout(context.Background(), httpTimeout)
info, err := c.rpcClient.GetEpochInfo(ctx_, rpc.CommitmentMax)
ctx_, cancel := context.WithTimeout(ctx, httpTimeout)
epochInfo, err := c.rpcClient.GetEpochInfo(ctx_, rpc.CommitmentFinalized)
if err != nil {
klog.Fatalf("failed to fetch epoch info, bailing out: %v", err)
}
cancel()
totalTransactionsTotal.Set(float64(info.TransactionCount))
confirmedSlotHeight.Set(float64(info.AbsoluteSlot))
// watermark is the last slot number we generated ticks for. Set it to the current offset on startup (we do not backfill slots we missed at startup)
watermark := info.AbsoluteSlot
currentEpoch, firstSlot, lastSlot := getEpochBounds(info)
// watermark is the last slot number we generated ticks for. Set it to the current offset on startup (
// we do not backfill slots we missed at startup)
watermark := epochInfo.AbsoluteSlot
currentEpoch, firstSlot, lastSlot := getEpochBounds(epochInfo)
currentEpochNumber.Set(float64(currentEpoch))
epochFirstSlot.Set(float64(firstSlot))
epochLastSlot.Set(float64(lastSlot))
totalTransactionsTotal.Set(float64(epochInfo.TransactionCount))
confirmedSlotHeight.Set(float64(epochInfo.AbsoluteSlot))
klog.Infof("Starting at slot %d in epoch %d (%d-%d)", firstSlot, currentEpoch, firstSlot, lastSlot)
_, err = c.updateCounters(currentEpoch, watermark, &lastSlot)
@ -105,7 +105,7 @@ func (c *solanaCollector) WatchSlots(ctx context.Context) {
// Get current slot height and epoch info
ctx_, cancel := context.WithTimeout(context.Background(), httpTimeout)
info, err := c.rpcClient.GetEpochInfo(ctx_, rpc.CommitmentMax)
info, err := c.rpcClient.GetEpochInfo(ctx_, rpc.CommitmentFinalized)
if err != nil {
klog.Warningf("failed to fetch epoch info, retrying: %v", err)
cancel()
@ -215,23 +215,23 @@ func (c *solanaCollector) updateCounters(epoch, firstSlot int64, lastSlotOpt *in
ctx, cancel = context.WithTimeout(context.Background(), httpTimeout)
defer cancel()
blockProduction, err := c.rpcClient.GetBlockProduction(ctx, &firstSlot, &lastSlot)
blockProductionValue, err := c.rpcClient.GetBlockProduction(ctx, nil, &firstSlot, &lastSlot)
if err != nil {
return 0, fmt.Errorf("failed to fetch block production, retrying: %v", err)
}
for host, prod := range blockProduction.Hosts {
valid := float64(prod.BlocksProduced)
skipped := float64(prod.LeaderSlots - prod.BlocksProduced)
for identity, production := range blockProductionValue.ByIdentity {
valid := float64(production.BlocksProduced)
skipped := float64(production.LeaderSlots - production.BlocksProduced)
epochStr := fmt.Sprintf("%d", epoch)
leaderSlotsTotal.WithLabelValues("valid", host).Add(valid)
leaderSlotsTotal.WithLabelValues("skipped", host).Add(skipped)
leaderSlotsTotal.WithLabelValues("valid", identity).Add(valid)
leaderSlotsTotal.WithLabelValues("skipped", identity).Add(skipped)
if len(c.leaderSlotAddresses) == 0 || slices.Contains(c.leaderSlotAddresses, host) {
leaderSlotsByEpoch.WithLabelValues("valid", host, epochStr).Add(valid)
leaderSlotsByEpoch.WithLabelValues("skipped", host, epochStr).Add(skipped)
if len(c.leaderSlotAddresses) == 0 || slices.Contains(c.leaderSlotAddresses, identity) {
leaderSlotsByEpoch.WithLabelValues("valid", identity, epochStr).Add(valid)
leaderSlotsByEpoch.WithLabelValues("skipped", identity, epochStr).Add(skipped)
}
klog.V(1).Infof(
@ -239,9 +239,9 @@ func (c *solanaCollector) updateCounters(epoch, firstSlot int64, lastSlotOpt *in
epochStr,
firstSlot,
lastSlot,
host,
prod.BlocksProduced,
prod.LeaderSlots-prod.BlocksProduced,
identity,
production.BlocksProduced,
production.LeaderSlots-production.BlocksProduced,
)
}

View File

@ -34,7 +34,7 @@ func testBlockProductionMetric(
host string,
status string,
) {
hostInfo := staticBlockProduction.Hosts[host]
hostInfo := staticBlockProduction.ByIdentity[host]
// get expected value depending on status:
var expectedValue float64
switch status {

View File

@ -38,7 +38,9 @@ type Provider interface {
// GetBlockProduction retrieves the block production information for the specified slot range.
// The method takes a context for cancellation, and pointers to the first and last slots of the range.
// It returns a BlockProduction struct containing the block production details, or an error if the operation fails.
GetBlockProduction(ctx context.Context, firstSlot *int64, lastSlot *int64) (*BlockProduction, error)
GetBlockProduction(
ctx context.Context, identity *string, firstSlot *int64, lastSlot *int64,
) (*BlockProduction, error)
// GetEpochInfo retrieves the information regarding the current epoch.
// The method takes a context for cancellation and a commitment level to specify the desired state.
@ -70,16 +72,15 @@ func (c Commitment) MarshalJSON() ([]byte, error) {
}
const (
// CommitmentMax represents the most recent block confirmed by the cluster super-majority
//as having reached maximum lockout.
CommitmentMax Commitment = "max"
// CommitmentRoot Most recent block having reached maximum lockout on this node.
CommitmentRoot Commitment = "root"
// CommitmentSingleGossip represents the most recent block that has been voted on
//by the cluster super-majority (optimistic confirmation).
CommitmentSingleGossip Commitment = "singleGossip"
// CommitmentRecent represents the nodes most recent block
CommitmentRecent Commitment = "recent"
// CommitmentFinalized level offers the highest level of certainty for a transaction on the Solana blockchain.
// A transaction is considered “Finalized” when it is included in a block that has been confirmed by a
// supermajority of the stake, and at least 31 additional confirmed blocks have been built on top of it.
CommitmentFinalized Commitment = "finalized"
// CommitmentConfirmed level is reached when a transaction is included in a block that has been voted on
// by a supermajority (66%+) of the networks stake.
CommitmentConfirmed Commitment = "confirmed"
// CommitmentProcessed level represents a transaction that has been received by the network and included in a block.
CommitmentProcessed Commitment = "processed"
)
func NewRPCClient(rpcAddr string) *Client {
@ -163,32 +164,42 @@ func (c *Client) GetSlot(ctx context.Context) (int64, error) {
return resp.Result, nil
}
func (c *Client) GetBlockProduction(ctx context.Context, firstSlot *int64, lastSlot *int64) (*BlockProduction, error) {
func (c *Client) GetBlockProduction(
ctx context.Context, identity *string, firstSlot *int64, lastSlot *int64,
) (*BlockProduction, error) {
// can't provide a last slot without a first:
if firstSlot == nil && lastSlot != nil {
panic("can't provide a last slot without a first!")
}
// format params:
params := make([]interface{}, 1)
config := make(map[string]interface{})
if identity != nil {
config["identity"] = *identity
}
if firstSlot != nil {
params[0] = map[string]interface{}{"range": blockProductionRange{FirstSlot: *firstSlot, LastSlot: lastSlot}}
blockRange := map[string]int64{"firstSlot": *firstSlot}
if lastSlot != nil {
blockRange["lastSlot"] = *lastSlot
}
config["range"] = blockRange
}
var params []interface{}
if len(config) > 0 {
params = append(params, config)
}
// make request:
var resp response[blockProductionResult]
var resp response[contextualResult[BlockProduction]]
if err := c.getResponse(ctx, "getBlockProduction", params, &resp); err != nil {
return nil, err
}
// convert to BlockProduction format:
hosts := make(map[string]BlockProductionPerHost)
for id, arr := range resp.Result.Value.ByIdentity {
hosts[id] = BlockProductionPerHost{LeaderSlots: arr[0], BlocksProduced: arr[1]}
}
production := BlockProduction{
FirstSlot: resp.Result.Value.Range.FirstSlot, LastSlot: *resp.Result.Value.Range.LastSlot, Hosts: hosts,
}
return &production, nil
return &resp.Result.Value, nil
}
func (c *Client) GetBalance(ctx context.Context, address string) (float64, error) {
var resp response[BalanceResult]
var resp response[contextualResult[int64]]
if err := c.getResponse(ctx, "getBalance", []interface{}{address}, &resp); err != nil {
return 0, err
}

View File

@ -1,11 +1,23 @@
package rpc
import (
"encoding/json"
"fmt"
)
type (
response[T any] struct {
Result T `json:"result"`
Error rpcError `json:"error"`
}
contextualResult[T any] struct {
Value T `json:"value"`
Context struct {
Slot int64 `json:"slot"`
}
}
EpochInfo struct {
// Current absolute slot in epoch
AbsoluteSlot int64 `json:"absoluteSlot"`
@ -37,37 +49,36 @@ type (
Delinquent []VoteAccount `json:"delinquent"`
}
blockProductionRange struct {
FirstSlot int64 `json:"firstSlot"`
LastSlot *int64 `json:"lastSlot,omitempty"`
}
blockProductionResult struct {
Value struct {
ByIdentity map[string][]int64 `json:"byIdentity"`
Range blockProductionRange `json:"range"`
} `json:"value"`
}
BlockProductionPerHost struct {
HostProduction struct {
LeaderSlots int64
BlocksProduced int64
}
BlockProduction struct {
FirstSlot int64
LastSlot int64
Hosts map[string]BlockProductionPerHost
BlockProductionRange struct {
FirstSlot int64 `json:"firstSlot"`
LastSlot int64 `json:"lastSlot"`
}
BalanceResult struct {
Value int64 `json:"value"`
Context struct {
Slot int64 `json:"slot"`
} `json:"context"`
BlockProduction struct {
ByIdentity map[string]HostProduction `json:"byIdentity"`
Range BlockProductionRange `json:"range"`
}
)
func (hp *HostProduction) UnmarshalJSON(data []byte) error {
var arr []int64
if err := json.Unmarshal(data, &arr); err != nil {
return err
}
if len(arr) != 2 {
return fmt.Errorf("expected array of 2 integers, got %d", len(arr))
}
hp.LeaderSlots = arr[0]
hp.BlocksProduced = arr[1]
return nil
}
func (r response[T]) getError() rpcError {
return r.Error
}