Merge pull request #37 from asymmetric-research/fee-rewards

WIP: Fee rewards!!!
This commit is contained in:
Matt Johnstone 2024-10-14 11:01:55 +02:00 committed by GitHub
commit a3c498da5e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 345 additions and 72 deletions

View File

@ -36,6 +36,11 @@ var (
"", "",
"Comma-separated list of validator vote accounts to track inflationary rewards for", "Comma-separated list of validator vote accounts to track inflationary rewards for",
) )
feeRewardAddresses = flag.String(
"fee-reward-addresses",
"",
"Comma-separated list of validator identity accounts to track fee rewards for.",
)
) )
func init() { func init() {
@ -50,6 +55,7 @@ type solanaCollector struct {
balanceAddresses []string balanceAddresses []string
leaderSlotAddresses []string leaderSlotAddresses []string
inflationRewardAddresses []string inflationRewardAddresses []string
feeRewardAddresses []string
/// descriptors: /// descriptors:
totalValidatorsDesc *prometheus.Desc totalValidatorsDesc *prometheus.Desc
@ -67,6 +73,7 @@ func createSolanaCollector(
balanceAddresses []string, balanceAddresses []string,
leaderSlotAddresses []string, leaderSlotAddresses []string,
inflationRewardAddresses []string, inflationRewardAddresses []string,
feeRewardAddresses []string,
) *solanaCollector { ) *solanaCollector {
return &solanaCollector{ return &solanaCollector{
rpcClient: provider, rpcClient: provider,
@ -74,6 +81,7 @@ func createSolanaCollector(
balanceAddresses: balanceAddresses, balanceAddresses: balanceAddresses,
leaderSlotAddresses: leaderSlotAddresses, leaderSlotAddresses: leaderSlotAddresses,
inflationRewardAddresses: inflationRewardAddresses, inflationRewardAddresses: inflationRewardAddresses,
feeRewardAddresses: feeRewardAddresses,
totalValidatorsDesc: prometheus.NewDesc( totalValidatorsDesc: prometheus.NewDesc(
"solana_active_validators", "solana_active_validators",
"Total number of active validators by state", "Total number of active validators by state",
@ -120,10 +128,19 @@ func createSolanaCollector(
} }
func NewSolanaCollector( func NewSolanaCollector(
rpcAddr string, balanceAddresses []string, leaderSlotAddresses []string, inflationRewardAddresses []string, rpcAddr string,
balanceAddresses []string,
leaderSlotAddresses []string,
inflationRewardAddresses []string,
feeRewardAddresses []string,
) *solanaCollector { ) *solanaCollector {
return createSolanaCollector( return createSolanaCollector(
rpc.NewRPCClient(rpcAddr), slotPacerSchedule, balanceAddresses, leaderSlotAddresses, inflationRewardAddresses, rpc.NewRPCClient(rpcAddr),
slotPacerSchedule,
balanceAddresses,
leaderSlotAddresses,
inflationRewardAddresses,
feeRewardAddresses,
) )
} }
@ -138,7 +155,7 @@ func (c *solanaCollector) Describe(ch chan<- *prometheus.Desc) {
} }
func (c *solanaCollector) collectVoteAccounts(ctx context.Context, ch chan<- prometheus.Metric) { func (c *solanaCollector) collectVoteAccounts(ctx context.Context, ch chan<- prometheus.Metric) {
voteAccounts, err := c.rpcClient.GetVoteAccounts(ctx, rpc.CommitmentProcessed, votePubkey) voteAccounts, err := c.rpcClient.GetVoteAccounts(ctx, rpc.CommitmentConfirmed, votePubkey)
if err != nil { if err != nil {
ch <- prometheus.NewInvalidMetric(c.totalValidatorsDesc, err) ch <- prometheus.NewInvalidMetric(c.totalValidatorsDesc, err)
ch <- prometheus.NewInvalidMetric(c.validatorActivatedStake, err) ch <- prometheus.NewInvalidMetric(c.validatorActivatedStake, err)
@ -217,7 +234,7 @@ func (c *solanaCollector) collectBalances(ctx context.Context, ch chan<- prometh
func fetchBalances(ctx context.Context, client rpc.Provider, addresses []string) (map[string]float64, error) { func fetchBalances(ctx context.Context, client rpc.Provider, addresses []string) (map[string]float64, error) {
balances := make(map[string]float64) balances := make(map[string]float64)
for _, address := range addresses { for _, address := range addresses {
balance, err := client.GetBalance(ctx, address) balance, err := client.GetBalance(ctx, rpc.CommitmentConfirmed, address)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -255,18 +272,27 @@ func main() {
balAddresses []string balAddresses []string
lsAddresses []string lsAddresses []string
irAddresses []string irAddresses []string
frAddresses []string
) )
if *balanceAddresses != "" { if *balanceAddresses != "" {
balAddresses = strings.Split(*balanceAddresses, ",") balAddresses = strings.Split(*balanceAddresses, ",")
klog.Infof("Monitoring balances for %v", balAddresses)
} }
if *leaderSlotAddresses != "" { if *leaderSlotAddresses != "" {
lsAddresses = strings.Split(*leaderSlotAddresses, ",") lsAddresses = strings.Split(*leaderSlotAddresses, ",")
klog.Infof("Monitoring leader-slot by epoch for %v", lsAddresses)
} }
if *inflationRewardAddresses != "" { if *inflationRewardAddresses != "" {
irAddresses = strings.Split(*inflationRewardAddresses, ",") irAddresses = strings.Split(*inflationRewardAddresses, ",")
klog.Infof("Monitoring inflation reward by epoch for %v", irAddresses)
}
if *feeRewardAddresses != "" {
frAddresses = strings.Split(*feeRewardAddresses, ",")
klog.Infof("Monitoring fee reward by epoch for %v", frAddresses)
} }
collector := NewSolanaCollector(*rpcAddr, balAddresses, lsAddresses, irAddresses) collector := NewSolanaCollector(*rpcAddr, balAddresses, lsAddresses, irAddresses, frAddresses)
slotWatcher := NewCollectorSlotWatcher(collector) slotWatcher := NewCollectorSlotWatcher(collector)
go slotWatcher.WatchSlots(context.Background(), collector.slotPace) go slotWatcher.WatchSlots(context.Background(), collector.slotPace)

View File

@ -47,7 +47,7 @@ var (
identityVotes = map[string]string{"aaa": "AAA", "bbb": "BBB", "ccc": "CCC"} identityVotes = map[string]string{"aaa": "AAA", "bbb": "BBB", "ccc": "CCC"}
nv = len(identities) nv = len(identities)
staticEpochInfo = rpc.EpochInfo{ staticEpochInfo = rpc.EpochInfo{
AbsoluteSlot: 166598, AbsoluteSlot: 166599,
BlockHeight: 166500, BlockHeight: 166500,
Epoch: 27, Epoch: 27,
SlotIndex: 2790, SlotIndex: 2790,
@ -72,10 +72,7 @@ var (
{ {
ActivatedStake: 42, ActivatedStake: 42,
Commission: 0, Commission: 0,
EpochCredits: [][]int{ EpochCredits: [][]int{{1, 64, 0}, {2, 192, 64}},
{1, 64, 0},
{2, 192, 64},
},
EpochVoteAccount: true, EpochVoteAccount: true,
LastVote: 147, LastVote: 147,
NodePubkey: "bbb", NodePubkey: "bbb",
@ -85,10 +82,7 @@ var (
{ {
ActivatedStake: 43, ActivatedStake: 43,
Commission: 1, Commission: 1,
EpochCredits: [][]int{ EpochCredits: [][]int{{2, 65, 1}, {3, 193, 65}},
{2, 65, 1},
{3, 193, 65},
},
EpochVoteAccount: true, EpochVoteAccount: true,
LastVote: 148, LastVote: 148,
NodePubkey: "ccc", NodePubkey: "ccc",
@ -100,10 +94,7 @@ var (
{ {
ActivatedStake: 49, ActivatedStake: 49,
Commission: 2, Commission: 2,
EpochCredits: [][]int{ EpochCredits: [][]int{{10, 594, 6}, {9, 98, 4}},
{10, 594, 6},
{9, 98, 4},
},
EpochVoteAccount: true, EpochVoteAccount: true,
LastVote: 92, LastVote: 92,
NodePubkey: "aaa", NodePubkey: "aaa",
@ -112,6 +103,9 @@ var (
}, },
}, },
} }
staticLeaderSchedule = map[string][]int64{
"aaa": {0, 3, 6, 9, 12}, "bbb": {1, 4, 7, 10, 13}, "ccc": {2, 5, 8, 11, 14},
}
) )
/* /*
@ -124,7 +118,7 @@ func (c *staticRPCClient) GetEpochInfo(ctx context.Context, commitment rpc.Commi
} }
//goland:noinspection GoUnusedParameter //goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetSlot(ctx context.Context) (int64, error) { func (c *staticRPCClient) GetSlot(ctx context.Context, commitment rpc.Commitment) (int64, error) {
return staticEpochInfo.AbsoluteSlot, nil return staticEpochInfo.AbsoluteSlot, nil
} }
@ -143,23 +137,35 @@ func (c *staticRPCClient) GetVoteAccounts(
//goland:noinspection GoUnusedParameter //goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetBlockProduction( func (c *staticRPCClient) GetBlockProduction(
ctx context.Context, identity *string, firstSlot *int64, lastSlot *int64, ctx context.Context, commitment rpc.Commitment, identity *string, firstSlot *int64, lastSlot *int64,
) (*rpc.BlockProduction, error) { ) (*rpc.BlockProduction, error) {
return &staticBlockProduction, nil return &staticBlockProduction, nil
} }
//goland:noinspection GoUnusedParameter //goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetBalance(ctx context.Context, address string) (float64, error) { func (c *staticRPCClient) GetBalance(ctx context.Context, commitment rpc.Commitment, address string) (float64, error) {
return balances[address], nil return balances[address], nil
} }
//goland:noinspection GoUnusedParameter //goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetInflationReward( func (c *staticRPCClient) GetInflationReward(
ctx context.Context, addresses []string, commitment rpc.Commitment, epoch *int64, minContextSlot *int64, ctx context.Context, commitment rpc.Commitment, addresses []string, epoch *int64, minContextSlot *int64,
) ([]rpc.InflationReward, error) { ) ([]rpc.InflationReward, error) {
return staticInflationRewards, nil return staticInflationRewards, nil
} }
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetLeaderSchedule(
ctx context.Context, commitment rpc.Commitment, slot int64,
) (map[string][]int64, error) {
return staticLeaderSchedule, nil
}
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetBlock(ctx context.Context, commitment rpc.Commitment, slot int64) (*rpc.Block, error) {
return nil, nil
}
/* /*
===== DYNAMIC CLIENT =====: ===== DYNAMIC CLIENT =====:
*/ */
@ -271,7 +277,7 @@ func (c *dynamicRPCClient) GetEpochInfo(ctx context.Context, commitment rpc.Comm
} }
//goland:noinspection GoUnusedParameter //goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetSlot(ctx context.Context) (int64, error) { func (c *dynamicRPCClient) GetSlot(ctx context.Context, commitment rpc.Commitment) (int64, error) {
return int64(c.Slot), nil return int64(c.Slot), nil
} }
@ -308,7 +314,7 @@ func (c *dynamicRPCClient) GetVoteAccounts(
//goland:noinspection GoUnusedParameter //goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetBlockProduction( func (c *dynamicRPCClient) GetBlockProduction(
ctx context.Context, identity *string, firstSlot *int64, lastSlot *int64, ctx context.Context, commitment rpc.Commitment, identity *string, firstSlot *int64, lastSlot *int64,
) (*rpc.BlockProduction, error) { ) (*rpc.BlockProduction, error) {
byIdentity := make(map[string]rpc.HostProduction) byIdentity := make(map[string]rpc.HostProduction)
for _, identity := range identities { for _, identity := range identities {
@ -330,17 +336,29 @@ func (c *dynamicRPCClient) GetBlockProduction(
} }
//goland:noinspection GoUnusedParameter //goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetBalance(ctx context.Context, address string) (float64, error) { func (c *dynamicRPCClient) GetBalance(ctx context.Context, client rpc.Commitment, address string) (float64, error) {
return balances[address], nil return balances[address], nil
} }
//goland:noinspection GoUnusedParameter //goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetInflationReward( func (c *dynamicRPCClient) GetInflationReward(
ctx context.Context, addresses []string, commitment rpc.Commitment, epoch *int64, minContextSlot *int64, ctx context.Context, commitment rpc.Commitment, addresses []string, epoch *int64, minContextSlot *int64,
) ([]rpc.InflationReward, error) { ) ([]rpc.InflationReward, error) {
return staticInflationRewards, nil return staticInflationRewards, nil
} }
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetLeaderSchedule(
ctx context.Context, commitment rpc.Commitment, slot int64,
) (map[string][]int64, error) {
return nil, nil
}
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetBlock(ctx context.Context, commitment rpc.Commitment, slot int64) (*rpc.Block, error) {
return nil, nil
}
/* /*
===== OTHER TEST UTILITIES =====: ===== OTHER TEST UTILITIES =====:
*/ */
@ -376,7 +394,9 @@ func runCollectionTests(t *testing.T, collector prometheus.Collector, testCases
} }
func TestSolanaCollector_Collect_Static(t *testing.T) { func TestSolanaCollector_Collect_Static(t *testing.T) {
collector := createSolanaCollector(&staticRPCClient{}, slotPacerSchedule, identities, []string{}, votekeys) collector := createSolanaCollector(
&staticRPCClient{}, slotPacerSchedule, identities, []string{}, votekeys, identities,
)
prometheus.NewPedanticRegistry().MustRegister(collector) prometheus.NewPedanticRegistry().MustRegister(collector)
testCases := []collectionTest{ testCases := []collectionTest{
@ -454,7 +474,7 @@ solana_account_balance{address="ccc"} 3
func TestSolanaCollector_Collect_Dynamic(t *testing.T) { func TestSolanaCollector_Collect_Dynamic(t *testing.T) {
client := newDynamicRPCClient() client := newDynamicRPCClient()
collector := createSolanaCollector(client, slotPacerSchedule, identities, []string{}, votekeys) collector := createSolanaCollector(client, slotPacerSchedule, identities, []string{}, votekeys, identities)
prometheus.NewPedanticRegistry().MustRegister(collector) prometheus.NewPedanticRegistry().MustRegister(collector)
// start off by testing initial state: // start off by testing initial state:

View File

@ -21,6 +21,7 @@ type SlotWatcher struct {
// config: // config:
leaderSlotAddresses []string leaderSlotAddresses []string
inflationRewardAddresses []string inflationRewardAddresses []string
feeRewardAddresses []string
// currentEpoch is the current epoch we are watching // currentEpoch is the current epoch we are watching
currentEpoch int64 currentEpoch int64
@ -30,6 +31,8 @@ type SlotWatcher struct {
lastSlot int64 lastSlot int64
// slotWatermark is the last (most recent) slot we have tracked // slotWatermark is the last (most recent) slot we have tracked
slotWatermark int64 slotWatermark int64
leaderSchedule map[string][]int64
} }
var ( var (
@ -81,6 +84,14 @@ var (
}, },
[]string{"votekey", "epoch"}, []string{"votekey", "epoch"},
) )
feeRewards = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "solana_fee_rewards",
Help: "Transaction fee rewards earned per validator identity account, per epoch",
},
[]string{"nodekey", "epoch"},
)
) )
func NewCollectorSlotWatcher(collector *solanaCollector) *SlotWatcher { func NewCollectorSlotWatcher(collector *solanaCollector) *SlotWatcher {
@ -88,6 +99,7 @@ func NewCollectorSlotWatcher(collector *solanaCollector) *SlotWatcher {
client: collector.rpcClient, client: collector.rpcClient,
leaderSlotAddresses: collector.leaderSlotAddresses, leaderSlotAddresses: collector.leaderSlotAddresses,
inflationRewardAddresses: collector.inflationRewardAddresses, inflationRewardAddresses: collector.inflationRewardAddresses,
feeRewardAddresses: collector.feeRewardAddresses,
} }
} }
@ -100,6 +112,7 @@ func init() {
prometheus.MustRegister(leaderSlotsTotal) prometheus.MustRegister(leaderSlotsTotal)
prometheus.MustRegister(leaderSlotsByEpoch) prometheus.MustRegister(leaderSlotsByEpoch)
prometheus.MustRegister(inflationRewards) prometheus.MustRegister(inflationRewards)
prometheus.MustRegister(feeRewards)
} }
func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) { func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
@ -117,15 +130,17 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
<-ticker.C <-ticker.C
ctx_, cancel := context.WithTimeout(ctx, httpTimeout) ctx_, cancel := context.WithTimeout(ctx, httpTimeout)
// TODO: separate fee-rewards watching from general slot watching, such that general slot watching commitment level can be dropped to confirmed
epochInfo, err := c.client.GetEpochInfo(ctx_, rpc.CommitmentFinalized) epochInfo, err := c.client.GetEpochInfo(ctx_, rpc.CommitmentFinalized)
if err != nil {
klog.Warningf("Failed to get epoch info, bailing out: %v", err)
}
cancel() cancel()
if err != nil {
klog.Errorf("Failed to get epoch info, bailing out: %v", err)
continue
}
// if we are running for the first time, then we need to set our tracking numbers: // if we are running for the first time, then we need to set our tracking numbers:
if c.currentEpoch == 0 { if c.currentEpoch == 0 {
c.trackEpoch(epochInfo) c.trackEpoch(ctx, epochInfo)
} }
totalTransactionsTotal.Set(float64(epochInfo.TransactionCount)) totalTransactionsTotal.Set(float64(epochInfo.TransactionCount))
@ -150,14 +165,15 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
} }
// update block production metrics up until the current slot: // update block production metrics up until the current slot:
c.fetchAndEmitBlockProduction(ctx, epochInfo.AbsoluteSlot) c.moveSlotWatermark(ctx, epochInfo.AbsoluteSlot)
} }
} }
} }
// trackEpoch takes in a new rpc.EpochInfo and sets the SlotWatcher tracking metrics accordingly, // trackEpoch takes in a new rpc.EpochInfo and sets the SlotWatcher tracking metrics accordingly,
// and updates the prometheus gauges associated with those metrics. // and updates the prometheus gauges associated with those metrics.
func (c *SlotWatcher) trackEpoch(epoch *rpc.EpochInfo) { func (c *SlotWatcher) trackEpoch(ctx context.Context, epoch *rpc.EpochInfo) {
klog.Infof("Tracking epoch %v (from %v)", epoch.Epoch, c.currentEpoch)
firstSlot, lastSlot := getEpochBounds(epoch) firstSlot, lastSlot := getEpochBounds(epoch)
// if we haven't yet set c.currentEpoch, that (hopefully) means this is the initial setup, // if we haven't yet set c.currentEpoch, that (hopefully) means this is the initial setup,
// and so we can simply store the tracking numbers // and so we can simply store the tracking numbers
@ -194,16 +210,29 @@ func (c *SlotWatcher) trackEpoch(epoch *rpc.EpochInfo) {
} }
// emit epoch bounds: // emit epoch bounds:
klog.Infof("Emitting epoch bounds: %v (slots %v -> %v)", c.currentEpoch, c.firstSlot, c.lastSlot)
currentEpochNumber.Set(float64(c.currentEpoch)) currentEpochNumber.Set(float64(c.currentEpoch))
epochFirstSlot.Set(float64(c.firstSlot)) epochFirstSlot.Set(float64(c.firstSlot))
epochLastSlot.Set(float64(c.lastSlot)) epochLastSlot.Set(float64(c.lastSlot))
// update leader schedule:
ctx, cancel := context.WithTimeout(ctx, httpTimeout)
defer cancel()
klog.Infof("Updating leader schedule for epoch %v ...", c.currentEpoch)
leaderSchedule, err := GetTrimmedLeaderSchedule(
ctx, c.client, c.feeRewardAddresses, epoch.AbsoluteSlot, c.firstSlot,
)
if err != nil {
klog.Errorf("Failed to get trimmed leader schedule, bailing out: %v", err)
}
c.leaderSchedule = leaderSchedule
} }
// closeCurrentEpoch is called when an epoch change-over happens, and we need to make sure we track the last // closeCurrentEpoch is called when an epoch change-over happens, and we need to make sure we track the last
// remaining slots in the "current" epoch before we start tracking the new one. // remaining slots in the "current" epoch before we start tracking the new one.
func (c *SlotWatcher) closeCurrentEpoch(ctx context.Context, newEpoch *rpc.EpochInfo) { func (c *SlotWatcher) closeCurrentEpoch(ctx context.Context, newEpoch *rpc.EpochInfo) {
c.fetchAndEmitBlockProduction(ctx, c.lastSlot) c.moveSlotWatermark(ctx, c.lastSlot)
c.trackEpoch(newEpoch) c.trackEpoch(ctx, newEpoch)
} }
// checkValidSlotRange makes sure that the slot range we are going to query is within the current epoch we are tracking. // checkValidSlotRange makes sure that the slot range we are going to query is within the current epoch we are tracking.
@ -221,6 +250,13 @@ func (c *SlotWatcher) checkValidSlotRange(from, to int64) error {
return nil return nil
} }
// moveSlotWatermark performs all the slot-watching tasks required to move the slotWatermark to the provided 'to' slot.
func (c *SlotWatcher) moveSlotWatermark(ctx context.Context, to int64) {
c.fetchAndEmitBlockProduction(ctx, to)
c.fetchAndEmitFeeRewards(ctx, to)
c.slotWatermark = to
}
// fetchAndEmitBlockProduction fetches block production up to the provided endSlot, emits the prometheus metrics, // fetchAndEmitBlockProduction fetches block production up to the provided endSlot, emits the prometheus metrics,
// and updates the SlotWatcher.slotWatermark accordingly // and updates the SlotWatcher.slotWatermark accordingly
func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot int64) { func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot int64) {
@ -236,9 +272,10 @@ func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot i
// fetch block production: // fetch block production:
ctx, cancel := context.WithTimeout(ctx, httpTimeout) ctx, cancel := context.WithTimeout(ctx, httpTimeout)
defer cancel() defer cancel()
blockProduction, err := c.client.GetBlockProduction(ctx, nil, &startSlot, &endSlot) blockProduction, err := c.client.GetBlockProduction(ctx, rpc.CommitmentFinalized, nil, &startSlot, &endSlot)
if err != nil { if err != nil {
klog.Warningf("Failed to get block production, bailing out: %v", err) klog.Errorf("Failed to get block production, bailing out: %v", err)
return
} }
// emit the metrics: // emit the metrics:
@ -246,20 +283,72 @@ func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot i
valid := float64(production.BlocksProduced) valid := float64(production.BlocksProduced)
skipped := float64(production.LeaderSlots - production.BlocksProduced) skipped := float64(production.LeaderSlots - production.BlocksProduced)
epochStr := fmt.Sprintf("%d", c.currentEpoch)
leaderSlotsTotal.WithLabelValues("valid", address).Add(valid) leaderSlotsTotal.WithLabelValues("valid", address).Add(valid)
leaderSlotsTotal.WithLabelValues("skipped", address).Add(skipped) leaderSlotsTotal.WithLabelValues("skipped", address).Add(skipped)
if len(c.leaderSlotAddresses) == 0 || slices.Contains(c.leaderSlotAddresses, address) { if len(c.leaderSlotAddresses) == 0 || slices.Contains(c.leaderSlotAddresses, address) {
epochStr := toString(c.currentEpoch)
leaderSlotsByEpoch.WithLabelValues("valid", address, epochStr).Add(valid) leaderSlotsByEpoch.WithLabelValues("valid", address, epochStr).Add(valid)
leaderSlotsByEpoch.WithLabelValues("skipped", address, epochStr).Add(skipped) leaderSlotsByEpoch.WithLabelValues("skipped", address, epochStr).Add(skipped)
} }
} }
klog.Infof("Fetched block production in [%v -> %v]", startSlot, endSlot) klog.Infof("Fetched block production in [%v -> %v]", startSlot, endSlot)
// update the slot watermark: }
c.slotWatermark = endSlot
// fetchAndEmitFeeRewards fetches and emits all the fee rewards for the tracked addresses between the
// slotWatermark and endSlot
func (c *SlotWatcher) fetchAndEmitFeeRewards(ctx context.Context, endSlot int64) {
startSlot := c.slotWatermark + 1
klog.Infof("Fetching fee rewards in [%v -> %v]", startSlot, endSlot)
if err := c.checkValidSlotRange(startSlot, endSlot); err != nil {
klog.Fatalf("invalid slot range: %v", err)
}
scheduleToFetch := SelectFromSchedule(c.leaderSchedule, startSlot, endSlot)
for identity, leaderSlots := range scheduleToFetch {
if len(leaderSlots) == 0 {
continue
}
klog.Infof("Fetching fee rewards for %v in [%v -> %v]: %v ...", identity, startSlot, endSlot, leaderSlots)
for _, slot := range leaderSlots {
ctx, cancel := context.WithTimeout(ctx, httpTimeout)
err := c.fetchAndEmitSingleFeeReward(ctx, identity, c.currentEpoch, slot)
cancel()
if err != nil {
klog.Errorf("Failed to fetch fee rewards for %v at %v: %v", identity, slot, err)
}
}
}
klog.Infof("Fetched fee rewards in [%v -> %v]", startSlot, endSlot)
}
// fetchAndEmitSingleFeeReward fetches and emits the fee reward for a single block.
func (c *SlotWatcher) fetchAndEmitSingleFeeReward(
ctx context.Context, identity string, epoch int64, slot int64,
) error {
block, err := c.client.GetBlock(ctx, rpc.CommitmentConfirmed, slot)
if err != nil {
return err
}
for _, reward := range block.Rewards {
if reward.RewardType == "fee" {
// make sure we haven't made a logic issue or something:
assertf(
reward.Pubkey == identity,
"fetching fee reward for %v but got fee reward for %v",
identity,
reward.Pubkey,
)
amount := float64(reward.Lamports) / float64(rpc.LamportsInSol)
feeRewards.WithLabelValues(identity, toString(epoch)).Add(amount)
}
}
return nil
} }
// getEpochBounds returns the first slot and last slot within an [inclusive] Epoch // getEpochBounds returns the first slot and last slot within an [inclusive] Epoch
@ -271,14 +360,13 @@ func getEpochBounds(info *rpc.EpochInfo) (int64, int64) {
// fetchAndEmitInflationRewards fetches and emits the inflation rewards for the configured inflationRewardAddresses // fetchAndEmitInflationRewards fetches and emits the inflation rewards for the configured inflationRewardAddresses
// at the provided epoch // at the provided epoch
func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch int64) error { func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch int64) error {
epochStr := fmt.Sprintf("%d", epoch) klog.Infof("Fetching inflation reward for epoch %v ...", toString(epoch))
klog.Infof("Fetching inflation reward for epoch %v ...", epochStr)
ctx, cancel := context.WithTimeout(ctx, httpTimeout) ctx, cancel := context.WithTimeout(ctx, httpTimeout)
defer cancel() defer cancel()
rewardInfos, err := c.client.GetInflationReward( rewardInfos, err := c.client.GetInflationReward(
ctx, c.inflationRewardAddresses, rpc.CommitmentFinalized, &epoch, nil, ctx, rpc.CommitmentConfirmed, c.inflationRewardAddresses, &epoch, nil,
) )
if err != nil { if err != nil {
return fmt.Errorf("error fetching inflation rewards: %w", err) return fmt.Errorf("error fetching inflation rewards: %w", err)
@ -287,8 +375,8 @@ func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch in
for i, rewardInfo := range rewardInfos { for i, rewardInfo := range rewardInfos {
address := c.inflationRewardAddresses[i] address := c.inflationRewardAddresses[i]
reward := float64(rewardInfo.Amount) / float64(rpc.LamportsInSol) reward := float64(rewardInfo.Amount) / float64(rpc.LamportsInSol)
inflationRewards.WithLabelValues(address, epochStr).Set(reward) inflationRewards.WithLabelValues(address, toString(epoch)).Set(reward)
} }
klog.Infof("Fetched inflation reward for epoch %v.", epochStr) klog.Infof("Fetched inflation reward for epoch %v.", epoch)
return nil return nil
} }

View File

@ -92,7 +92,9 @@ func TestSolanaCollector_WatchSlots_Static(t *testing.T) {
leaderSlotsTotal.Reset() leaderSlotsTotal.Reset()
leaderSlotsByEpoch.Reset() leaderSlotsByEpoch.Reset()
collector := createSolanaCollector(&staticRPCClient{}, 100*time.Millisecond, identities, []string{}, votekeys) collector := createSolanaCollector(
&staticRPCClient{}, 100*time.Millisecond, identities, []string{}, votekeys, identities,
)
watcher := NewCollectorSlotWatcher(collector) watcher := NewCollectorSlotWatcher(collector)
prometheus.NewPedanticRegistry().MustRegister(collector) prometheus.NewPedanticRegistry().MustRegister(collector)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -161,7 +163,7 @@ func TestSolanaCollector_WatchSlots_Dynamic(t *testing.T) {
// create clients: // create clients:
client := newDynamicRPCClient() client := newDynamicRPCClient()
collector := createSolanaCollector(client, 300*time.Millisecond, identities, []string{}, votekeys) collector := createSolanaCollector(client, 300*time.Millisecond, identities, []string{}, votekeys, identities)
watcher := NewCollectorSlotWatcher(collector) watcher := NewCollectorSlotWatcher(collector)
prometheus.NewPedanticRegistry().MustRegister(collector) prometheus.NewPedanticRegistry().MustRegister(collector)

View File

@ -1,6 +1,9 @@
package main package main
import ( import (
"context"
"fmt"
"github.com/asymmetric-research/solana_exporter/pkg/rpc"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@ -9,3 +12,51 @@ func assertf(condition bool, format string, args ...any) {
klog.Fatalf(format, args...) klog.Fatalf(format, args...)
} }
} }
// toString is just a simple utility function for converting int -> string
func toString(i int64) string {
return fmt.Sprintf("%v", i)
}
// SelectFromSchedule takes a leader-schedule and returns a trimmed leader-schedule
// containing only the slots within the provided range
func SelectFromSchedule(schedule map[string][]int64, startSlot, endSlot int64) map[string][]int64 {
selected := make(map[string][]int64)
for key, values := range schedule {
var selectedValues []int64
for _, value := range values {
if value >= startSlot && value <= endSlot {
selectedValues = append(selectedValues, value)
}
}
selected[key] = selectedValues
}
return selected
}
// GetTrimmedLeaderSchedule fetches the leader schedule, but only for the validators we are interested in.
// Additionally, it adjusts the leader schedule to the current epoch offset.
func GetTrimmedLeaderSchedule(
ctx context.Context, client rpc.Provider, identities []string, slot, epochFirstSlot int64,
) (map[string][]int64, error) {
leaderSchedule, err := client.GetLeaderSchedule(ctx, rpc.CommitmentConfirmed, slot)
if err != nil {
return nil, fmt.Errorf("failed to get leader schedule: %w", err)
}
trimmedLeaderSchedule := make(map[string][]int64)
for _, id := range identities {
if leaderSlots, ok := leaderSchedule[id]; ok {
// when you fetch the leader schedule, it gives you slot indexes, we want absolute slots:
absoluteSlots := make([]int64, len(leaderSlots))
for i, slotIndex := range leaderSlots {
absoluteSlots[i] = slotIndex + epochFirstSlot
}
trimmedLeaderSchedule[id] = absoluteSlots
} else {
klog.Warningf("failed to find leader slots for %v", id)
}
}
return trimmedLeaderSchedule, nil
}

View File

@ -0,0 +1,24 @@
package main
import (
"context"
"github.com/stretchr/testify/assert"
"testing"
)
func TestSelectFromSchedule(t *testing.T) {
selected := SelectFromSchedule(staticLeaderSchedule, 5, 10)
assert.Equal(t,
map[string][]int64{"aaa": {6, 9}, "bbb": {7, 10}, "ccc": {5, 8}},
selected,
)
}
func TestGetTrimmedLeaderSchedule(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
schedule, err := GetTrimmedLeaderSchedule(ctx, &staticRPCClient{}, []string{"aaa", "bbb"}, 10, 10)
assert.NoError(t, err)
assert.Equal(t, map[string][]int64{"aaa": {10, 13, 16, 19, 22}, "bbb": {11, 14, 17, 20, 23}}, schedule)
}

View File

@ -39,7 +39,7 @@ type Provider interface {
// The method takes a context for cancellation, and pointers to the first and last slots of the range. // The method takes a context for cancellation, and pointers to the first and last slots of the range.
// It returns a BlockProduction struct containing the block production details, or an error if the operation fails. // It returns a BlockProduction struct containing the block production details, or an error if the operation fails.
GetBlockProduction( GetBlockProduction(
ctx context.Context, identity *string, firstSlot *int64, lastSlot *int64, ctx context.Context, commitment Commitment, identity *string, firstSlot *int64, lastSlot *int64,
) (*BlockProduction, error) ) (*BlockProduction, error)
// GetEpochInfo retrieves the information regarding the current epoch. // GetEpochInfo retrieves the information regarding the current epoch.
@ -50,7 +50,7 @@ type Provider interface {
// GetSlot retrieves the current slot number. // GetSlot retrieves the current slot number.
// The method takes a context for cancellation. // The method takes a context for cancellation.
// It returns the current slot number as an int64, or an error if the operation fails. // It returns the current slot number as an int64, or an error if the operation fails.
GetSlot(ctx context.Context) (int64, error) GetSlot(ctx context.Context, commitment Commitment) (int64, error)
// GetVoteAccounts retrieves the vote accounts information. // GetVoteAccounts retrieves the vote accounts information.
// The method takes a context for cancellation and a slice of parameters to filter the vote accounts. // The method takes a context for cancellation and a slice of parameters to filter the vote accounts.
@ -64,13 +64,17 @@ type Provider interface {
GetVersion(ctx context.Context) (string, error) GetVersion(ctx context.Context) (string, error)
// GetBalance returns the SOL balance of the account at the provided address // GetBalance returns the SOL balance of the account at the provided address
GetBalance(ctx context.Context, address string) (float64, error) GetBalance(ctx context.Context, commitment Commitment, address string) (float64, error)
// GetInflationReward returns the inflation rewards (in lamports) awarded to the given addresses (vote accounts) // GetInflationReward returns the inflation rewards (in lamports) awarded to the given addresses (vote accounts)
// during the given epoch. // during the given epoch.
GetInflationReward( GetInflationReward(
ctx context.Context, addresses []string, commitment Commitment, epoch *int64, minContextSlot *int64, ctx context.Context, commitment Commitment, addresses []string, epoch *int64, minContextSlot *int64,
) ([]InflationReward, error) ) ([]InflationReward, error)
GetLeaderSchedule(ctx context.Context, commitment Commitment, slot int64) (map[string][]int64, error)
GetBlock(ctx context.Context, commitment Commitment, slot int64) (*Block, error)
} }
func (c Commitment) MarshalJSON() ([]byte, error) { func (c Commitment) MarshalJSON() ([]byte, error) {
@ -138,6 +142,8 @@ func (c *Client) getResponse(ctx context.Context, method string, params []any, r
return nil return nil
} }
// GetEpochInfo returns information about the current epoch.
// See API docs: https://solana.com/docs/rpc/http/getepochinfo
func (c *Client) GetEpochInfo(ctx context.Context, commitment Commitment) (*EpochInfo, error) { func (c *Client) GetEpochInfo(ctx context.Context, commitment Commitment) (*EpochInfo, error) {
var resp response[EpochInfo] var resp response[EpochInfo]
if err := c.getResponse(ctx, "getEpochInfo", []any{commitment}, &resp); err != nil { if err := c.getResponse(ctx, "getEpochInfo", []any{commitment}, &resp); err != nil {
@ -146,6 +152,8 @@ func (c *Client) GetEpochInfo(ctx context.Context, commitment Commitment) (*Epoc
return &resp.Result, nil return &resp.Result, nil
} }
// GetVoteAccounts returns the account info and associated stake for all the voting accounts in the current bank.
// See API docs: https://solana.com/docs/rpc/http/getvoteaccounts
func (c *Client) GetVoteAccounts( func (c *Client) GetVoteAccounts(
ctx context.Context, commitment Commitment, votePubkey *string, ctx context.Context, commitment Commitment, votePubkey *string,
) (*VoteAccounts, error) { ) (*VoteAccounts, error) {
@ -162,6 +170,8 @@ func (c *Client) GetVoteAccounts(
return &resp.Result, nil return &resp.Result, nil
} }
// GetVersion returns the current Solana version running on the node.
// See API docs: https://solana.com/docs/rpc/http/getversion
func (c *Client) GetVersion(ctx context.Context) (string, error) { func (c *Client) GetVersion(ctx context.Context) (string, error) {
var resp response[struct { var resp response[struct {
Version string `json:"solana-core"` Version string `json:"solana-core"`
@ -172,16 +182,21 @@ func (c *Client) GetVersion(ctx context.Context) (string, error) {
return resp.Result.Version, nil return resp.Result.Version, nil
} }
func (c *Client) GetSlot(ctx context.Context) (int64, error) { // GetSlot returns the slot that has reached the given or default commitment level.
// See API docs: https://solana.com/docs/rpc/http/getslot
func (c *Client) GetSlot(ctx context.Context, commitment Commitment) (int64, error) {
config := map[string]string{"commitment": string(commitment)}
var resp response[int64] var resp response[int64]
if err := c.getResponse(ctx, "getSlot", []any{}, &resp); err != nil { if err := c.getResponse(ctx, "getSlot", []any{config}, &resp); err != nil {
return 0, err return 0, err
} }
return resp.Result, nil return resp.Result, nil
} }
// GetBlockProduction returns recent block production information from the current or previous epoch.
// See API docs: https://solana.com/docs/rpc/http/getblockproduction
func (c *Client) GetBlockProduction( func (c *Client) GetBlockProduction(
ctx context.Context, identity *string, firstSlot *int64, lastSlot *int64, ctx context.Context, commitment Commitment, identity *string, firstSlot *int64, lastSlot *int64,
) (*BlockProduction, error) { ) (*BlockProduction, error) {
// can't provide a last slot without a first: // can't provide a last slot without a first:
if firstSlot == nil && lastSlot != nil { if firstSlot == nil && lastSlot != nil {
@ -189,7 +204,7 @@ func (c *Client) GetBlockProduction(
} }
// format params: // format params:
config := make(map[string]any) config := map[string]any{"commitment": string(commitment)}
if identity != nil { if identity != nil {
config["identity"] = *identity config["identity"] = *identity
} }
@ -206,29 +221,29 @@ func (c *Client) GetBlockProduction(
config["range"] = blockRange config["range"] = blockRange
} }
var params []any
if len(config) > 0 {
params = append(params, config)
}
// make request: // make request:
var resp response[contextualResult[BlockProduction]] var resp response[contextualResult[BlockProduction]]
if err := c.getResponse(ctx, "getBlockProduction", params, &resp); err != nil { if err := c.getResponse(ctx, "getBlockProduction", []any{config}, &resp); err != nil {
return nil, err return nil, err
} }
return &resp.Result.Value, nil return &resp.Result.Value, nil
} }
func (c *Client) GetBalance(ctx context.Context, address string) (float64, error) { // GetBalance returns the lamport balance of the account of provided pubkey.
// See API docs:https://solana.com/docs/rpc/http/getbalance
func (c *Client) GetBalance(ctx context.Context, commitment Commitment, address string) (float64, error) {
config := map[string]string{"commitment": string(commitment)}
var resp response[contextualResult[int64]] var resp response[contextualResult[int64]]
if err := c.getResponse(ctx, "getBalance", []any{address}, &resp); err != nil { if err := c.getResponse(ctx, "getBalance", []any{address, config}, &resp); err != nil {
return 0, err return 0, err
} }
return float64(resp.Result.Value) / float64(LamportsInSol), nil return float64(resp.Result.Value) / float64(LamportsInSol), nil
} }
// GetInflationReward returns the inflation / staking reward for a list of addresses for an epoch.
// See API docs: https://solana.com/docs/rpc/http/getinflationreward
func (c *Client) GetInflationReward( func (c *Client) GetInflationReward(
ctx context.Context, addresses []string, commitment Commitment, epoch *int64, minContextSlot *int64, ctx context.Context, commitment Commitment, addresses []string, epoch *int64, minContextSlot *int64,
) ([]InflationReward, error) { ) ([]InflationReward, error) {
// format params: // format params:
config := map[string]any{"commitment": string(commitment)} config := map[string]any{"commitment": string(commitment)}
@ -245,3 +260,33 @@ func (c *Client) GetInflationReward(
} }
return resp.Result, nil return resp.Result, nil
} }
// GetLeaderSchedule returns the leader schedule for an epoch.
// See API docs: https://solana.com/docs/rpc/http/getleaderschedule
func (c *Client) GetLeaderSchedule(ctx context.Context, commitment Commitment, slot int64) (map[string][]int64, error) {
config := map[string]any{"commitment": string(commitment)}
var resp response[map[string][]int64]
if err := c.getResponse(ctx, "getLeaderSchedule", []any{slot, config}, &resp); err != nil {
return nil, err
}
return resp.Result, nil
}
// GetBlock returns identity and transaction information about a confirmed block in the ledger.
// See API docs: https://solana.com/docs/rpc/http/getblock
func (c *Client) GetBlock(ctx context.Context, commitment Commitment, slot int64) (*Block, error) {
if commitment == CommitmentProcessed {
klog.Fatalf("commitment %v is not supported for GetBlock", commitment)
}
config := map[string]any{
"commitment": commitment,
"encoding": "json", // this is default, but no harm in specifying it
"transactionDetails": "none", // for now, can hard-code this out, as we don't need it
"rewards": true, // what we here for!
}
var resp response[Block]
if err := c.getResponse(ctx, "getBlock", []any{slot, config}, &resp); err != nil {
return nil, err
}
return &resp.Result, nil
}

View File

@ -72,6 +72,23 @@ type (
Epoch int64 `json:"epoch"` Epoch int64 `json:"epoch"`
PostBalance int64 `json:"postBalance"` PostBalance int64 `json:"postBalance"`
} }
Block struct {
BlockHeight int64 `json:"blockHeight"`
BlockTime int64 `json:"blockTime,omitempty"`
Blockhash string `json:"blockhash"`
ParentSlot int64 `json:"parentSlot"`
PreviousBlockhash string `json:"previousBlockhash"`
Rewards []BlockReward `json:"rewards"`
}
BlockReward struct {
Pubkey string `json:"pubkey"`
Lamports int64 `json:"lamports"`
PostBalance int64 `json:"postBalance"`
RewardType string `json:"rewardType"`
Commission uint8 `json:"commission"`
}
) )
func (hp *HostProduction) UnmarshalJSON(data []byte) error { func (hp *HostProduction) UnmarshalJSON(data []byte) error {