Merge pull request #36 from asymmetric-research/inflation-rewards

Added inflation rewards
This commit is contained in:
Matt Johnstone 2024-10-07 20:34:57 +02:00 committed by GitHub
commit d77b929cfa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 173 additions and 29 deletions

View File

@ -14,17 +14,28 @@ import (
)
var (
httpTimeout = 60 * time.Second
rpcAddr = flag.String("rpcURI", "", "Solana RPC URI (including protocol and path)")
addr = flag.String("addr", ":8080", "Listen address")
votePubkey = flag.String("votepubkey", "", "Validator vote address (will only return results of this address)")
httpTimeoutSecs = flag.Int("http_timeout", 60, "HTTP timeout in seconds")
balanceAddresses = flag.String("balance-addresses", "", "Comma-separated list of addresses to monitor balances")
httpTimeout = 60 * time.Second
rpcAddr = flag.String("rpcURI", "", "Solana RPC URI (including protocol and path)")
addr = flag.String("addr", ":8080", "Listen address")
votePubkey = flag.String("votepubkey", "", "Validator vote address (will only return results of this address)")
httpTimeoutSecs = flag.Int("http_timeout", 60, "HTTP timeout in seconds")
// addresses:
balanceAddresses = flag.String(
"balance-addresses",
"",
"Comma-separated list of addresses to monitor SOL balances.",
)
leaderSlotAddresses = flag.String(
"leader-slot-addresses",
"",
"Comma-separated list of addresses to monitor leader slots by epoch for, leave nil to track by epoch for all validators (this creates a lot of Prometheus metrics with every new epoch).",
)
inflationRewardAddresses = flag.String(
"inflation-reward-addresses",
"",
"Comma-separated list of validator vote accounts to track inflationary rewards for",
)
)
func init() {
@ -35,9 +46,10 @@ type solanaCollector struct {
rpcClient rpc.Provider
// config:
slotPace time.Duration
balanceAddresses []string
leaderSlotAddresses []string
slotPace time.Duration
balanceAddresses []string
leaderSlotAddresses []string
inflationRewardAddresses []string
/// descriptors:
totalValidatorsDesc *prometheus.Desc
@ -50,13 +62,18 @@ type solanaCollector struct {
}
func createSolanaCollector(
provider rpc.Provider, slotPace time.Duration, balanceAddresses []string, leaderSlotAddresses []string,
provider rpc.Provider,
slotPace time.Duration,
balanceAddresses []string,
leaderSlotAddresses []string,
inflationRewardAddresses []string,
) *solanaCollector {
return &solanaCollector{
rpcClient: provider,
slotPace: slotPace,
balanceAddresses: balanceAddresses,
leaderSlotAddresses: leaderSlotAddresses,
rpcClient: provider,
slotPace: slotPace,
balanceAddresses: balanceAddresses,
leaderSlotAddresses: leaderSlotAddresses,
inflationRewardAddresses: inflationRewardAddresses,
totalValidatorsDesc: prometheus.NewDesc(
"solana_active_validators",
"Total number of active validators by state",
@ -102,8 +119,12 @@ func createSolanaCollector(
}
}
func NewSolanaCollector(rpcAddr string, balanceAddresses []string, leaderSlotAddresses []string) *solanaCollector {
return createSolanaCollector(rpc.NewRPCClient(rpcAddr), slotPacerSchedule, balanceAddresses, leaderSlotAddresses)
func NewSolanaCollector(
rpcAddr string, balanceAddresses []string, leaderSlotAddresses []string, inflationRewardAddresses []string,
) *solanaCollector {
return createSolanaCollector(
rpc.NewRPCClient(rpcAddr), slotPacerSchedule, balanceAddresses, leaderSlotAddresses, inflationRewardAddresses,
)
}
func (c *solanaCollector) Describe(ch chan<- *prometheus.Desc) {
@ -233,6 +254,7 @@ func main() {
var (
balAddresses []string
lsAddresses []string
irAddresses []string
)
if *balanceAddresses != "" {
balAddresses = strings.Split(*balanceAddresses, ",")
@ -240,8 +262,11 @@ func main() {
if *leaderSlotAddresses != "" {
lsAddresses = strings.Split(*leaderSlotAddresses, ",")
}
if *inflationRewardAddresses != "" {
irAddresses = strings.Split(*inflationRewardAddresses, ",")
}
collector := NewSolanaCollector(*rpcAddr, balAddresses, lsAddresses)
collector := NewSolanaCollector(*rpcAddr, balAddresses, lsAddresses, irAddresses)
slotWatcher := NewCollectorSlotWatcher(collector)
go slotWatcher.WatchSlots(context.Background(), collector.slotPace)

View File

@ -42,6 +42,7 @@ type (
var (
identities = []string{"aaa", "bbb", "ccc"}
votekeys = []string{"AAA", "BBB", "CCC"}
balances = map[string]float64{"aaa": 1, "bbb": 2, "ccc": 3}
identityVotes = map[string]string{"aaa": "AAA", "bbb": "BBB", "ccc": "CCC"}
nv = len(identities)
@ -61,6 +62,11 @@ var (
},
Range: rpc.BlockProductionRange{FirstSlot: 1000, LastSlot: 2000},
}
staticInflationRewards = []rpc.InflationReward{
{Amount: 1000, EffectiveSlot: 166598, Epoch: 27, PostBalance: 2000},
{Amount: 2000, EffectiveSlot: 166598, Epoch: 27, PostBalance: 4000},
{Amount: 3000, EffectiveSlot: 166598, Epoch: 27, PostBalance: 6000},
}
staticVoteAccounts = rpc.VoteAccounts{
Current: []rpc.VoteAccount{
{
@ -147,6 +153,13 @@ func (c *staticRPCClient) GetBalance(ctx context.Context, address string) (float
return balances[address], nil
}
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetInflationReward(
ctx context.Context, addresses []string, commitment rpc.Commitment, epoch *int64, minContextSlot *int64,
) ([]rpc.InflationReward, error) {
return staticInflationRewards, nil
}
/*
===== DYNAMIC CLIENT =====:
*/
@ -321,6 +334,13 @@ func (c *dynamicRPCClient) GetBalance(ctx context.Context, address string) (floa
return balances[address], nil
}
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetInflationReward(
ctx context.Context, addresses []string, commitment rpc.Commitment, epoch *int64, minContextSlot *int64,
) ([]rpc.InflationReward, error) {
return staticInflationRewards, nil
}
/*
===== OTHER TEST UTILITIES =====:
*/
@ -356,7 +376,7 @@ func runCollectionTests(t *testing.T, collector prometheus.Collector, testCases
}
func TestSolanaCollector_Collect_Static(t *testing.T) {
collector := createSolanaCollector(&staticRPCClient{}, slotPacerSchedule, identities, []string{})
collector := createSolanaCollector(&staticRPCClient{}, slotPacerSchedule, identities, []string{}, votekeys)
prometheus.NewPedanticRegistry().MustRegister(collector)
testCases := []collectionTest{
@ -434,7 +454,7 @@ solana_account_balance{address="ccc"} 3
func TestSolanaCollector_Collect_Dynamic(t *testing.T) {
client := newDynamicRPCClient()
collector := createSolanaCollector(client, slotPacerSchedule, identities, []string{})
collector := createSolanaCollector(client, slotPacerSchedule, identities, []string{}, votekeys)
prometheus.NewPedanticRegistry().MustRegister(collector)
// start off by testing initial state:

View File

@ -18,7 +18,9 @@ const (
type SlotWatcher struct {
client rpc.Provider
leaderSlotAddresses []string
// config:
leaderSlotAddresses []string
inflationRewardAddresses []string
// currentEpoch is the current epoch we are watching
currentEpoch int64
@ -71,10 +73,22 @@ var (
},
[]string{"status", "nodekey", "epoch"},
)
inflationRewards = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "solana_inflation_rewards",
Help: "Inflation reward earned per validator vote account, per epoch",
},
[]string{"votekey", "epoch"},
)
)
func NewCollectorSlotWatcher(collector *solanaCollector) *SlotWatcher {
return &SlotWatcher{client: collector.rpcClient, leaderSlotAddresses: collector.leaderSlotAddresses}
return &SlotWatcher{
client: collector.rpcClient,
leaderSlotAddresses: collector.leaderSlotAddresses,
inflationRewardAddresses: collector.inflationRewardAddresses,
}
}
func init() {
@ -85,6 +99,7 @@ func init() {
prometheus.MustRegister(epochLastSlot)
prometheus.MustRegister(leaderSlotsTotal)
prometheus.MustRegister(leaderSlotsByEpoch)
prometheus.MustRegister(inflationRewards)
}
func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
@ -124,6 +139,13 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
}
if epochInfo.Epoch > c.currentEpoch {
// if we have configured inflation reward addresses, fetch em
if len(c.inflationRewardAddresses) > 0 {
err = c.fetchAndEmitInflationRewards(ctx, c.currentEpoch)
if err != nil {
klog.Errorf("Failed to emit inflation rewards, bailing out: %v", err)
}
}
c.closeCurrentEpoch(ctx, epochInfo)
}
@ -245,3 +267,28 @@ func getEpochBounds(info *rpc.EpochInfo) (int64, int64) {
firstSlot := info.AbsoluteSlot - info.SlotIndex
return firstSlot, firstSlot + info.SlotsInEpoch - 1
}
// fetchAndEmitInflationRewards fetches and emits the inflation rewards for the configured inflationRewardAddresses
// at the provided epoch
func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch int64) error {
epochStr := fmt.Sprintf("%d", epoch)
klog.Infof("Fetching inflation reward for epoch %v ...", epochStr)
ctx, cancel := context.WithTimeout(ctx, httpTimeout)
defer cancel()
rewardInfos, err := c.client.GetInflationReward(
ctx, c.inflationRewardAddresses, rpc.CommitmentFinalized, &epoch, nil,
)
if err != nil {
return fmt.Errorf("error fetching inflation rewards: %w", err)
}
for i, rewardInfo := range rewardInfos {
address := c.inflationRewardAddresses[i]
reward := float64(rewardInfo.Amount) / float64(rpc.LamportsInSol)
inflationRewards.WithLabelValues(address, epochStr).Set(reward)
}
klog.Infof("Fetched inflation reward for epoch %v.", epochStr)
return nil
}

View File

@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"github.com/asymmetric-research/solana_exporter/pkg/rpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
@ -91,19 +92,24 @@ func TestSolanaCollector_WatchSlots_Static(t *testing.T) {
leaderSlotsTotal.Reset()
leaderSlotsByEpoch.Reset()
collector := createSolanaCollector(&staticRPCClient{}, 100*time.Millisecond, identities, []string{})
collector := createSolanaCollector(&staticRPCClient{}, 100*time.Millisecond, identities, []string{}, votekeys)
watcher := NewCollectorSlotWatcher(collector)
prometheus.NewPedanticRegistry().MustRegister(collector)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go watcher.WatchSlots(ctx, collector.slotPace)
// make sure inflation rewards are collected:
err := watcher.fetchAndEmitInflationRewards(ctx, staticEpochInfo.Epoch)
assert.NoError(t, err)
time.Sleep(1 * time.Second)
firstSlot, lastSlot := getEpochBounds(&staticEpochInfo)
tests := []struct {
type testCase struct {
expectedValue float64
metric prometheus.Gauge
}{
}
tests := []testCase{
{expectedValue: float64(staticEpochInfo.AbsoluteSlot), metric: confirmedSlotHeight},
{expectedValue: float64(staticEpochInfo.TransactionCount), metric: totalTransactionsTotal},
{expectedValue: float64(staticEpochInfo.Epoch), metric: currentEpochNumber},
@ -111,6 +117,16 @@ func TestSolanaCollector_WatchSlots_Static(t *testing.T) {
{expectedValue: float64(lastSlot), metric: epochLastSlot},
}
// add inflation reward tests:
for i, rewardInfo := range staticInflationRewards {
epoch := fmt.Sprintf("%v", staticEpochInfo.Epoch)
test := testCase{
expectedValue: float64(rewardInfo.Amount) / float64(rpc.LamportsInSol),
metric: inflationRewards.WithLabelValues(votekeys[i], epoch),
}
tests = append(tests, test)
}
for _, testCase := range tests {
name := extractName(testCase.metric.Desc())
t.Run(name, func(t *testing.T) {
@ -145,8 +161,8 @@ func TestSolanaCollector_WatchSlots_Dynamic(t *testing.T) {
// create clients:
client := newDynamicRPCClient()
collector := createSolanaCollector(client, 300*time.Millisecond, identities, []string{})
watcher := SlotWatcher{client: client}
collector := createSolanaCollector(client, 300*time.Millisecond, identities, []string{}, votekeys)
watcher := NewCollectorSlotWatcher(collector)
prometheus.NewPedanticRegistry().MustRegister(collector)
// start client/collector and wait a bit:

View File

@ -65,6 +65,12 @@ type Provider interface {
// GetBalance returns the SOL balance of the account at the provided address
GetBalance(ctx context.Context, address string) (float64, error)
// GetInflationReward returns the inflation rewards (in lamports) awarded to the given addresses (vote accounts)
// during the given epoch.
GetInflationReward(
ctx context.Context, addresses []string, commitment Commitment, epoch *int64, minContextSlot *int64,
) ([]InflationReward, error)
}
func (c Commitment) MarshalJSON() ([]byte, error) {
@ -72,6 +78,8 @@ func (c Commitment) MarshalJSON() ([]byte, error) {
}
const (
// LamportsInSol is the number of lamports in 1 SOL (a billion)
LamportsInSol = 1_000_000_000
// CommitmentFinalized level offers the highest level of certainty for a transaction on the Solana blockchain.
// A transaction is considered “Finalized” when it is included in a block that has been confirmed by a
// supermajority of the stake, and at least 31 additional confirmed blocks have been built on top of it.
@ -216,5 +224,24 @@ func (c *Client) GetBalance(ctx context.Context, address string) (float64, error
if err := c.getResponse(ctx, "getBalance", []any{address}, &resp); err != nil {
return 0, err
}
return float64(resp.Result.Value / 1_000_000_000), nil
return float64(resp.Result.Value) / float64(LamportsInSol), nil
}
func (c *Client) GetInflationReward(
ctx context.Context, addresses []string, commitment Commitment, epoch *int64, minContextSlot *int64,
) ([]InflationReward, error) {
// format params:
config := map[string]any{"commitment": string(commitment)}
if epoch != nil {
config["epoch"] = *epoch
}
if minContextSlot != nil {
config["minContextSlot"] = *minContextSlot
}
var resp response[[]InflationReward]
if err := c.getResponse(ctx, "getInflationReward", []any{addresses, config}, &resp); err != nil {
return nil, err
}
return resp.Result, nil
}

View File

@ -7,8 +7,10 @@ import (
type (
response[T any] struct {
Result T `json:"result"`
Error rpcError `json:"error"`
jsonrpc string
Result T `json:"result"`
Error rpcError `json:"error"`
Id int `json:"id"`
}
contextualResult[T any] struct {
@ -63,6 +65,13 @@ type (
ByIdentity map[string]HostProduction `json:"byIdentity"`
Range BlockProductionRange `json:"range"`
}
InflationReward struct {
Amount int64 `json:"amount"`
EffectiveSlot int64 `json:"effectiveSlot"`
Epoch int64 `json:"epoch"`
PostBalance int64 `json:"postBalance"`
}
)
func (hp *HostProduction) UnmarshalJSON(data []byte) error {