From c65c347504706703f9e0ba1071b417033d3fa725 Mon Sep 17 00:00:00 2001 From: Matt Johnstone Date: Tue, 8 Oct 2024 16:33:41 +0200 Subject: [PATCH] added fee rewards --- cmd/solana_exporter/exporter.go | 5 ++ cmd/solana_exporter/exporter_test.go | 34 +++---- cmd/solana_exporter/slots.go | 128 +++++++++++++++++++-------- cmd/solana_exporter/utils.go | 46 ++++++++++ cmd/solana_exporter/utils_test.go | 24 +++++ 5 files changed, 179 insertions(+), 58 deletions(-) create mode 100644 cmd/solana_exporter/utils_test.go diff --git a/cmd/solana_exporter/exporter.go b/cmd/solana_exporter/exporter.go index d31d40f..1d2325d 100644 --- a/cmd/solana_exporter/exporter.go +++ b/cmd/solana_exporter/exporter.go @@ -276,15 +276,20 @@ func main() { ) if *balanceAddresses != "" { balAddresses = strings.Split(*balanceAddresses, ",") + klog.Infof("Monitoring balances for %v", balAddresses) } if *leaderSlotAddresses != "" { lsAddresses = strings.Split(*leaderSlotAddresses, ",") + klog.Infof("Monitoring leader-slot by epoch for %v", lsAddresses) + } if *inflationRewardAddresses != "" { irAddresses = strings.Split(*inflationRewardAddresses, ",") + klog.Infof("Monitoring inflation reward by epoch for %v", irAddresses) } if *feeRewardAddresses != "" { frAddresses = strings.Split(*feeRewardAddresses, ",") + klog.Infof("Monitoring fee reward by epoch for %v", frAddresses) } collector := NewSolanaCollector(*rpcAddr, balAddresses, lsAddresses, irAddresses, frAddresses) diff --git a/cmd/solana_exporter/exporter_test.go b/cmd/solana_exporter/exporter_test.go index 8e3defa..455869e 100644 --- a/cmd/solana_exporter/exporter_test.go +++ b/cmd/solana_exporter/exporter_test.go @@ -47,7 +47,7 @@ var ( identityVotes = map[string]string{"aaa": "AAA", "bbb": "BBB", "ccc": "CCC"} nv = len(identities) staticEpochInfo = rpc.EpochInfo{ - AbsoluteSlot: 166598, + AbsoluteSlot: 166599, BlockHeight: 166500, Epoch: 27, SlotIndex: 2790, @@ -70,12 +70,9 @@ var ( staticVoteAccounts = rpc.VoteAccounts{ Current: []rpc.VoteAccount{ { - ActivatedStake: 42, - Commission: 0, - EpochCredits: [][]int{ - {1, 64, 0}, - {2, 192, 64}, - }, + ActivatedStake: 42, + Commission: 0, + EpochCredits: [][]int{{1, 64, 0}, {2, 192, 64}}, EpochVoteAccount: true, LastVote: 147, NodePubkey: "bbb", @@ -83,12 +80,9 @@ var ( VotePubkey: "BBB", }, { - ActivatedStake: 43, - Commission: 1, - EpochCredits: [][]int{ - {2, 65, 1}, - {3, 193, 65}, - }, + ActivatedStake: 43, + Commission: 1, + EpochCredits: [][]int{{2, 65, 1}, {3, 193, 65}}, EpochVoteAccount: true, LastVote: 148, NodePubkey: "ccc", @@ -98,12 +92,9 @@ var ( }, Delinquent: []rpc.VoteAccount{ { - ActivatedStake: 49, - Commission: 2, - EpochCredits: [][]int{ - {10, 594, 6}, - {9, 98, 4}, - }, + ActivatedStake: 49, + Commission: 2, + EpochCredits: [][]int{{10, 594, 6}, {9, 98, 4}}, EpochVoteAccount: true, LastVote: 92, NodePubkey: "aaa", @@ -112,6 +103,9 @@ var ( }, }, } + staticLeaderSchedule = map[string][]int64{ + "aaa": {0, 3, 6, 9, 12}, "bbb": {1, 4, 7, 10, 13}, "ccc": {2, 5, 8, 11, 14}, + } ) /* @@ -164,7 +158,7 @@ func (c *staticRPCClient) GetInflationReward( func (c *staticRPCClient) GetLeaderSchedule( ctx context.Context, commitment rpc.Commitment, slot int64, ) (map[string][]int64, error) { - return nil, nil + return staticLeaderSchedule, nil } //goland:noinspection GoUnusedParameter diff --git a/cmd/solana_exporter/slots.go b/cmd/solana_exporter/slots.go index d37b0d8..5f88c36 100644 --- a/cmd/solana_exporter/slots.go +++ b/cmd/solana_exporter/slots.go @@ -130,15 +130,17 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) { <-ticker.C ctx_, cancel := context.WithTimeout(ctx, httpTimeout) - epochInfo, err := c.client.GetEpochInfo(ctx_, rpc.CommitmentConfirmed) - if err != nil { - klog.Warningf("Failed to get epoch info, bailing out: %v", err) - } + // TODO: separate fee-rewards watching from general slot watching, such that general slot watching commitment level can be dropped to confirmed + epochInfo, err := c.client.GetEpochInfo(ctx_, rpc.CommitmentFinalized) cancel() + if err != nil { + klog.Errorf("Failed to get epoch info, bailing out: %v", err) + continue + } // if we are running for the first time, then we need to set our tracking numbers: if c.currentEpoch == 0 { - c.trackEpoch(epochInfo) + c.trackEpoch(ctx, epochInfo) } totalTransactionsTotal.Set(float64(epochInfo.TransactionCount)) @@ -163,14 +165,15 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) { } // update block production metrics up until the current slot: - c.fetchAndEmitBlockProduction(ctx, epochInfo.AbsoluteSlot) + c.moveSlotWatermark(ctx, epochInfo.AbsoluteSlot) } } } // trackEpoch takes in a new rpc.EpochInfo and sets the SlotWatcher tracking metrics accordingly, // and updates the prometheus gauges associated with those metrics. -func (c *SlotWatcher) trackEpoch(epoch *rpc.EpochInfo) { +func (c *SlotWatcher) trackEpoch(ctx context.Context, epoch *rpc.EpochInfo) { + klog.Infof("Tracking epoch %v (from %v)", epoch.Epoch, c.currentEpoch) firstSlot, lastSlot := getEpochBounds(epoch) // if we haven't yet set c.currentEpoch, that (hopefully) means this is the initial setup, // and so we can simply store the tracking numbers @@ -207,16 +210,29 @@ func (c *SlotWatcher) trackEpoch(epoch *rpc.EpochInfo) { } // emit epoch bounds: + klog.Infof("Emitting epoch bounds: %v (slots %v -> %v)", c.currentEpoch, c.firstSlot, c.lastSlot) currentEpochNumber.Set(float64(c.currentEpoch)) epochFirstSlot.Set(float64(c.firstSlot)) epochLastSlot.Set(float64(c.lastSlot)) + + // update leader schedule: + ctx, cancel := context.WithTimeout(ctx, httpTimeout) + defer cancel() + klog.Infof("Updating leader schedule for epoch %v ...", c.currentEpoch) + leaderSchedule, err := GetTrimmedLeaderSchedule( + ctx, c.client, c.feeRewardAddresses, epoch.AbsoluteSlot, c.firstSlot, + ) + if err != nil { + klog.Errorf("Failed to get trimmed leader schedule, bailing out: %v", err) + } + c.leaderSchedule = leaderSchedule } // closeCurrentEpoch is called when an epoch change-over happens, and we need to make sure we track the last // remaining slots in the "current" epoch before we start tracking the new one. func (c *SlotWatcher) closeCurrentEpoch(ctx context.Context, newEpoch *rpc.EpochInfo) { - c.fetchAndEmitBlockProduction(ctx, c.lastSlot) - c.trackEpoch(newEpoch) + c.moveSlotWatermark(ctx, c.lastSlot) + c.trackEpoch(ctx, newEpoch) } // checkValidSlotRange makes sure that the slot range we are going to query is within the current epoch we are tracking. @@ -234,6 +250,13 @@ func (c *SlotWatcher) checkValidSlotRange(from, to int64) error { return nil } +// moveSlotWatermark performs all the slot-watching tasks required to move the slotWatermark to the provided 'to' slot. +func (c *SlotWatcher) moveSlotWatermark(ctx context.Context, to int64) { + c.fetchAndEmitBlockProduction(ctx, to) + c.fetchAndEmitFeeRewards(ctx, to) + c.slotWatermark = to +} + // fetchAndEmitBlockProduction fetches block production up to the provided endSlot, emits the prometheus metrics, // and updates the SlotWatcher.slotWatermark accordingly func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot int64) { @@ -249,9 +272,10 @@ func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot i // fetch block production: ctx, cancel := context.WithTimeout(ctx, httpTimeout) defer cancel() - blockProduction, err := c.client.GetBlockProduction(ctx, rpc.CommitmentConfirmed, nil, &startSlot, &endSlot) + blockProduction, err := c.client.GetBlockProduction(ctx, rpc.CommitmentFinalized, nil, &startSlot, &endSlot) if err != nil { - klog.Warningf("Failed to get block production, bailing out: %v", err) + klog.Errorf("Failed to get block production, bailing out: %v", err) + return } // emit the metrics: @@ -270,8 +294,61 @@ func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot i } klog.Infof("Fetched block production in [%v -> %v]", startSlot, endSlot) - // update the slot watermark: - c.slotWatermark = endSlot +} + +// fetchAndEmitFeeRewards fetches and emits all the fee rewards for the tracked addresses between the +// slotWatermark and endSlot +func (c *SlotWatcher) fetchAndEmitFeeRewards(ctx context.Context, endSlot int64) { + startSlot := c.slotWatermark + 1 + klog.Infof("Fetching fee rewards in [%v -> %v]", startSlot, endSlot) + + if err := c.checkValidSlotRange(startSlot, endSlot); err != nil { + klog.Fatalf("invalid slot range: %v", err) + } + scheduleToFetch := SelectFromSchedule(c.leaderSchedule, startSlot, endSlot) + for identity, leaderSlots := range scheduleToFetch { + if len(leaderSlots) == 0 { + continue + } + + klog.Infof("Fetching fee rewards for %v in [%v -> %v]: %v ...", identity, startSlot, endSlot, leaderSlots) + for _, slot := range leaderSlots { + ctx, cancel := context.WithTimeout(ctx, httpTimeout) + err := c.fetchAndEmitSingleFeeReward(ctx, identity, c.currentEpoch, slot) + cancel() + if err != nil { + klog.Errorf("Failed to fetch fee rewards for %v at %v: %v", identity, slot, err) + } + } + } + + klog.Infof("Fetched fee rewards in [%v -> %v]", startSlot, endSlot) +} + +// fetchAndEmitSingleFeeReward fetches and emits the fee reward for a single block. +func (c *SlotWatcher) fetchAndEmitSingleFeeReward( + ctx context.Context, identity string, epoch int64, slot int64, +) error { + block, err := c.client.GetBlock(ctx, rpc.CommitmentConfirmed, slot) + if err != nil { + return err + } + + for _, reward := range block.Rewards { + if reward.RewardType == "fee" { + // make sure we haven't made a logic issue or something: + assertf( + reward.Pubkey == identity, + "fetching fee reward for %v but got fee reward for %v", + identity, + reward.Pubkey, + ) + amount := float64(reward.Lamports) / float64(rpc.LamportsInSol) + feeRewards.WithLabelValues(identity, toString(epoch)).Add(amount) + } + } + + return nil } // getEpochBounds returns the first slot and last slot within an [inclusive] Epoch @@ -303,28 +380,3 @@ func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch in klog.Infof("Fetched inflation reward for epoch %v.", epoch) return nil } - -func (c *SlotWatcher) fetchAndEmitFeeReward( - ctx context.Context, identity string, epoch int64, slot int64, -) error { - block, err := c.client.GetBlock(ctx, rpc.CommitmentConfirmed, slot) - if err != nil { - return err - } - - for _, reward := range block.Rewards { - if reward.RewardType == "fee" { - // make sure we haven't made a logic issue or something: - assertf( - reward.Pubkey == identity, - "fetching fee reward for %v but got fee reward for %v", - identity, - reward.Pubkey, - ) - amount := float64(reward.Lamports) / float64(rpc.LamportsInSol) - feeRewards.WithLabelValues(identity, toString(epoch)).Add(amount) - } - } - - return nil -} diff --git a/cmd/solana_exporter/utils.go b/cmd/solana_exporter/utils.go index 5c9f5e9..aaf06b9 100644 --- a/cmd/solana_exporter/utils.go +++ b/cmd/solana_exporter/utils.go @@ -1,7 +1,9 @@ package main import ( + "context" "fmt" + "github.com/asymmetric-research/solana_exporter/pkg/rpc" "k8s.io/klog/v2" ) @@ -11,6 +13,50 @@ func assertf(condition bool, format string, args ...any) { } } +// toString is just a simple utility function for converting int -> string func toString(i int64) string { return fmt.Sprintf("%v", i) } + +// SelectFromSchedule takes a leader-schedule and returns a trimmed leader-schedule +// containing only the slots within the provided range +func SelectFromSchedule(schedule map[string][]int64, startSlot, endSlot int64) map[string][]int64 { + selected := make(map[string][]int64) + for key, values := range schedule { + var selectedValues []int64 + for _, value := range values { + if value >= startSlot && value <= endSlot { + selectedValues = append(selectedValues, value) + } + } + selected[key] = selectedValues + } + return selected +} + +// GetTrimmedLeaderSchedule fetches the leader schedule, but only for the validators we are interested in. +// Additionally, it adjusts the leader schedule to the current epoch offset. +func GetTrimmedLeaderSchedule( + ctx context.Context, client rpc.Provider, identities []string, slot, epochFirstSlot int64, +) (map[string][]int64, error) { + leaderSchedule, err := client.GetLeaderSchedule(ctx, rpc.CommitmentConfirmed, slot) + if err != nil { + return nil, fmt.Errorf("failed to get leader schedule: %w", err) + } + + trimmedLeaderSchedule := make(map[string][]int64) + for _, id := range identities { + if leaderSlots, ok := leaderSchedule[id]; ok { + // when you fetch the leader schedule, it gives you slot indexes, we want absolute slots: + absoluteSlots := make([]int64, len(leaderSlots)) + for i, slotIndex := range leaderSlots { + absoluteSlots[i] = slotIndex + epochFirstSlot + } + trimmedLeaderSchedule[id] = absoluteSlots + } else { + klog.Warningf("failed to find leader slots for %v", id) + } + } + + return trimmedLeaderSchedule, nil +} diff --git a/cmd/solana_exporter/utils_test.go b/cmd/solana_exporter/utils_test.go new file mode 100644 index 0000000..5925ae6 --- /dev/null +++ b/cmd/solana_exporter/utils_test.go @@ -0,0 +1,24 @@ +package main + +import ( + "context" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestSelectFromSchedule(t *testing.T) { + selected := SelectFromSchedule(staticLeaderSchedule, 5, 10) + assert.Equal(t, + map[string][]int64{"aaa": {6, 9}, "bbb": {7, 10}, "ccc": {5, 8}}, + selected, + ) +} + +func TestGetTrimmedLeaderSchedule(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + schedule, err := GetTrimmedLeaderSchedule(ctx, &staticRPCClient{}, []string{"aaa", "bbb"}, 10, 10) + assert.NoError(t, err) + + assert.Equal(t, map[string][]int64{"aaa": {10, 13, 16, 19, 22}, "bbb": {11, 14, 17, 20, 23}}, schedule) +}