added fee rewards

This commit is contained in:
Matt Johnstone 2024-10-08 16:33:41 +02:00
parent 328074ea6f
commit c65c347504
No known key found for this signature in database
GPG Key ID: BE985FBB9BE7D3BB
5 changed files with 179 additions and 58 deletions

View File

@ -276,15 +276,20 @@ func main() {
)
if *balanceAddresses != "" {
balAddresses = strings.Split(*balanceAddresses, ",")
klog.Infof("Monitoring balances for %v", balAddresses)
}
if *leaderSlotAddresses != "" {
lsAddresses = strings.Split(*leaderSlotAddresses, ",")
klog.Infof("Monitoring leader-slot by epoch for %v", lsAddresses)
}
if *inflationRewardAddresses != "" {
irAddresses = strings.Split(*inflationRewardAddresses, ",")
klog.Infof("Monitoring inflation reward by epoch for %v", irAddresses)
}
if *feeRewardAddresses != "" {
frAddresses = strings.Split(*feeRewardAddresses, ",")
klog.Infof("Monitoring fee reward by epoch for %v", frAddresses)
}
collector := NewSolanaCollector(*rpcAddr, balAddresses, lsAddresses, irAddresses, frAddresses)

View File

@ -47,7 +47,7 @@ var (
identityVotes = map[string]string{"aaa": "AAA", "bbb": "BBB", "ccc": "CCC"}
nv = len(identities)
staticEpochInfo = rpc.EpochInfo{
AbsoluteSlot: 166598,
AbsoluteSlot: 166599,
BlockHeight: 166500,
Epoch: 27,
SlotIndex: 2790,
@ -70,12 +70,9 @@ var (
staticVoteAccounts = rpc.VoteAccounts{
Current: []rpc.VoteAccount{
{
ActivatedStake: 42,
Commission: 0,
EpochCredits: [][]int{
{1, 64, 0},
{2, 192, 64},
},
ActivatedStake: 42,
Commission: 0,
EpochCredits: [][]int{{1, 64, 0}, {2, 192, 64}},
EpochVoteAccount: true,
LastVote: 147,
NodePubkey: "bbb",
@ -83,12 +80,9 @@ var (
VotePubkey: "BBB",
},
{
ActivatedStake: 43,
Commission: 1,
EpochCredits: [][]int{
{2, 65, 1},
{3, 193, 65},
},
ActivatedStake: 43,
Commission: 1,
EpochCredits: [][]int{{2, 65, 1}, {3, 193, 65}},
EpochVoteAccount: true,
LastVote: 148,
NodePubkey: "ccc",
@ -98,12 +92,9 @@ var (
},
Delinquent: []rpc.VoteAccount{
{
ActivatedStake: 49,
Commission: 2,
EpochCredits: [][]int{
{10, 594, 6},
{9, 98, 4},
},
ActivatedStake: 49,
Commission: 2,
EpochCredits: [][]int{{10, 594, 6}, {9, 98, 4}},
EpochVoteAccount: true,
LastVote: 92,
NodePubkey: "aaa",
@ -112,6 +103,9 @@ var (
},
},
}
staticLeaderSchedule = map[string][]int64{
"aaa": {0, 3, 6, 9, 12}, "bbb": {1, 4, 7, 10, 13}, "ccc": {2, 5, 8, 11, 14},
}
)
/*
@ -164,7 +158,7 @@ func (c *staticRPCClient) GetInflationReward(
func (c *staticRPCClient) GetLeaderSchedule(
ctx context.Context, commitment rpc.Commitment, slot int64,
) (map[string][]int64, error) {
return nil, nil
return staticLeaderSchedule, nil
}
//goland:noinspection GoUnusedParameter

View File

@ -130,15 +130,17 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
<-ticker.C
ctx_, cancel := context.WithTimeout(ctx, httpTimeout)
epochInfo, err := c.client.GetEpochInfo(ctx_, rpc.CommitmentConfirmed)
if err != nil {
klog.Warningf("Failed to get epoch info, bailing out: %v", err)
}
// TODO: separate fee-rewards watching from general slot watching, such that general slot watching commitment level can be dropped to confirmed
epochInfo, err := c.client.GetEpochInfo(ctx_, rpc.CommitmentFinalized)
cancel()
if err != nil {
klog.Errorf("Failed to get epoch info, bailing out: %v", err)
continue
}
// if we are running for the first time, then we need to set our tracking numbers:
if c.currentEpoch == 0 {
c.trackEpoch(epochInfo)
c.trackEpoch(ctx, epochInfo)
}
totalTransactionsTotal.Set(float64(epochInfo.TransactionCount))
@ -163,14 +165,15 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
}
// update block production metrics up until the current slot:
c.fetchAndEmitBlockProduction(ctx, epochInfo.AbsoluteSlot)
c.moveSlotWatermark(ctx, epochInfo.AbsoluteSlot)
}
}
}
// trackEpoch takes in a new rpc.EpochInfo and sets the SlotWatcher tracking metrics accordingly,
// and updates the prometheus gauges associated with those metrics.
func (c *SlotWatcher) trackEpoch(epoch *rpc.EpochInfo) {
func (c *SlotWatcher) trackEpoch(ctx context.Context, epoch *rpc.EpochInfo) {
klog.Infof("Tracking epoch %v (from %v)", epoch.Epoch, c.currentEpoch)
firstSlot, lastSlot := getEpochBounds(epoch)
// if we haven't yet set c.currentEpoch, that (hopefully) means this is the initial setup,
// and so we can simply store the tracking numbers
@ -207,16 +210,29 @@ func (c *SlotWatcher) trackEpoch(epoch *rpc.EpochInfo) {
}
// emit epoch bounds:
klog.Infof("Emitting epoch bounds: %v (slots %v -> %v)", c.currentEpoch, c.firstSlot, c.lastSlot)
currentEpochNumber.Set(float64(c.currentEpoch))
epochFirstSlot.Set(float64(c.firstSlot))
epochLastSlot.Set(float64(c.lastSlot))
// update leader schedule:
ctx, cancel := context.WithTimeout(ctx, httpTimeout)
defer cancel()
klog.Infof("Updating leader schedule for epoch %v ...", c.currentEpoch)
leaderSchedule, err := GetTrimmedLeaderSchedule(
ctx, c.client, c.feeRewardAddresses, epoch.AbsoluteSlot, c.firstSlot,
)
if err != nil {
klog.Errorf("Failed to get trimmed leader schedule, bailing out: %v", err)
}
c.leaderSchedule = leaderSchedule
}
// closeCurrentEpoch is called when an epoch change-over happens, and we need to make sure we track the last
// remaining slots in the "current" epoch before we start tracking the new one.
func (c *SlotWatcher) closeCurrentEpoch(ctx context.Context, newEpoch *rpc.EpochInfo) {
c.fetchAndEmitBlockProduction(ctx, c.lastSlot)
c.trackEpoch(newEpoch)
c.moveSlotWatermark(ctx, c.lastSlot)
c.trackEpoch(ctx, newEpoch)
}
// checkValidSlotRange makes sure that the slot range we are going to query is within the current epoch we are tracking.
@ -234,6 +250,13 @@ func (c *SlotWatcher) checkValidSlotRange(from, to int64) error {
return nil
}
// moveSlotWatermark performs all the slot-watching tasks required to move the slotWatermark to the provided 'to' slot.
func (c *SlotWatcher) moveSlotWatermark(ctx context.Context, to int64) {
c.fetchAndEmitBlockProduction(ctx, to)
c.fetchAndEmitFeeRewards(ctx, to)
c.slotWatermark = to
}
// fetchAndEmitBlockProduction fetches block production up to the provided endSlot, emits the prometheus metrics,
// and updates the SlotWatcher.slotWatermark accordingly
func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot int64) {
@ -249,9 +272,10 @@ func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot i
// fetch block production:
ctx, cancel := context.WithTimeout(ctx, httpTimeout)
defer cancel()
blockProduction, err := c.client.GetBlockProduction(ctx, rpc.CommitmentConfirmed, nil, &startSlot, &endSlot)
blockProduction, err := c.client.GetBlockProduction(ctx, rpc.CommitmentFinalized, nil, &startSlot, &endSlot)
if err != nil {
klog.Warningf("Failed to get block production, bailing out: %v", err)
klog.Errorf("Failed to get block production, bailing out: %v", err)
return
}
// emit the metrics:
@ -270,8 +294,61 @@ func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot i
}
klog.Infof("Fetched block production in [%v -> %v]", startSlot, endSlot)
// update the slot watermark:
c.slotWatermark = endSlot
}
// fetchAndEmitFeeRewards fetches and emits all the fee rewards for the tracked addresses between the
// slotWatermark and endSlot
func (c *SlotWatcher) fetchAndEmitFeeRewards(ctx context.Context, endSlot int64) {
startSlot := c.slotWatermark + 1
klog.Infof("Fetching fee rewards in [%v -> %v]", startSlot, endSlot)
if err := c.checkValidSlotRange(startSlot, endSlot); err != nil {
klog.Fatalf("invalid slot range: %v", err)
}
scheduleToFetch := SelectFromSchedule(c.leaderSchedule, startSlot, endSlot)
for identity, leaderSlots := range scheduleToFetch {
if len(leaderSlots) == 0 {
continue
}
klog.Infof("Fetching fee rewards for %v in [%v -> %v]: %v ...", identity, startSlot, endSlot, leaderSlots)
for _, slot := range leaderSlots {
ctx, cancel := context.WithTimeout(ctx, httpTimeout)
err := c.fetchAndEmitSingleFeeReward(ctx, identity, c.currentEpoch, slot)
cancel()
if err != nil {
klog.Errorf("Failed to fetch fee rewards for %v at %v: %v", identity, slot, err)
}
}
}
klog.Infof("Fetched fee rewards in [%v -> %v]", startSlot, endSlot)
}
// fetchAndEmitSingleFeeReward fetches and emits the fee reward for a single block.
func (c *SlotWatcher) fetchAndEmitSingleFeeReward(
ctx context.Context, identity string, epoch int64, slot int64,
) error {
block, err := c.client.GetBlock(ctx, rpc.CommitmentConfirmed, slot)
if err != nil {
return err
}
for _, reward := range block.Rewards {
if reward.RewardType == "fee" {
// make sure we haven't made a logic issue or something:
assertf(
reward.Pubkey == identity,
"fetching fee reward for %v but got fee reward for %v",
identity,
reward.Pubkey,
)
amount := float64(reward.Lamports) / float64(rpc.LamportsInSol)
feeRewards.WithLabelValues(identity, toString(epoch)).Add(amount)
}
}
return nil
}
// getEpochBounds returns the first slot and last slot within an [inclusive] Epoch
@ -303,28 +380,3 @@ func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch in
klog.Infof("Fetched inflation reward for epoch %v.", epoch)
return nil
}
func (c *SlotWatcher) fetchAndEmitFeeReward(
ctx context.Context, identity string, epoch int64, slot int64,
) error {
block, err := c.client.GetBlock(ctx, rpc.CommitmentConfirmed, slot)
if err != nil {
return err
}
for _, reward := range block.Rewards {
if reward.RewardType == "fee" {
// make sure we haven't made a logic issue or something:
assertf(
reward.Pubkey == identity,
"fetching fee reward for %v but got fee reward for %v",
identity,
reward.Pubkey,
)
amount := float64(reward.Lamports) / float64(rpc.LamportsInSol)
feeRewards.WithLabelValues(identity, toString(epoch)).Add(amount)
}
}
return nil
}

View File

@ -1,7 +1,9 @@
package main
import (
"context"
"fmt"
"github.com/asymmetric-research/solana_exporter/pkg/rpc"
"k8s.io/klog/v2"
)
@ -11,6 +13,50 @@ func assertf(condition bool, format string, args ...any) {
}
}
// toString is just a simple utility function for converting int -> string
func toString(i int64) string {
return fmt.Sprintf("%v", i)
}
// SelectFromSchedule takes a leader-schedule and returns a trimmed leader-schedule
// containing only the slots within the provided range
func SelectFromSchedule(schedule map[string][]int64, startSlot, endSlot int64) map[string][]int64 {
selected := make(map[string][]int64)
for key, values := range schedule {
var selectedValues []int64
for _, value := range values {
if value >= startSlot && value <= endSlot {
selectedValues = append(selectedValues, value)
}
}
selected[key] = selectedValues
}
return selected
}
// GetTrimmedLeaderSchedule fetches the leader schedule, but only for the validators we are interested in.
// Additionally, it adjusts the leader schedule to the current epoch offset.
func GetTrimmedLeaderSchedule(
ctx context.Context, client rpc.Provider, identities []string, slot, epochFirstSlot int64,
) (map[string][]int64, error) {
leaderSchedule, err := client.GetLeaderSchedule(ctx, rpc.CommitmentConfirmed, slot)
if err != nil {
return nil, fmt.Errorf("failed to get leader schedule: %w", err)
}
trimmedLeaderSchedule := make(map[string][]int64)
for _, id := range identities {
if leaderSlots, ok := leaderSchedule[id]; ok {
// when you fetch the leader schedule, it gives you slot indexes, we want absolute slots:
absoluteSlots := make([]int64, len(leaderSlots))
for i, slotIndex := range leaderSlots {
absoluteSlots[i] = slotIndex + epochFirstSlot
}
trimmedLeaderSchedule[id] = absoluteSlots
} else {
klog.Warningf("failed to find leader slots for %v", id)
}
}
return trimmedLeaderSchedule, nil
}

View File

@ -0,0 +1,24 @@
package main
import (
"context"
"github.com/stretchr/testify/assert"
"testing"
)
func TestSelectFromSchedule(t *testing.T) {
selected := SelectFromSchedule(staticLeaderSchedule, 5, 10)
assert.Equal(t,
map[string][]int64{"aaa": {6, 9}, "bbb": {7, 10}, "ccc": {5, 8}},
selected,
)
}
func TestGetTrimmedLeaderSchedule(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
schedule, err := GetTrimmedLeaderSchedule(ctx, &staticRPCClient{}, []string{"aaa", "bbb"}, 10, 10)
assert.NoError(t, err)
assert.Equal(t, map[string][]int64{"aaa": {10, 13, 16, 19, 22}, "bbb": {11, 14, 17, 20, 23}}, schedule)
}