From b3d33313b08969c4f91cee10a2851189bb47102e Mon Sep 17 00:00:00 2001 From: Matt Johnstone Date: Mon, 7 Oct 2024 14:06:46 +0200 Subject: [PATCH 1/5] added GetLeaderSchedule + rpc documentation linking to offical methods --- pkg/rpc/client.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index d6adaa0..7a980e5 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -138,6 +138,8 @@ func (c *Client) getResponse(ctx context.Context, method string, params []any, r return nil } +// GetEpochInfo returns information about the current epoch. +// See API docs: https://solana.com/docs/rpc/http/getepochinfo func (c *Client) GetEpochInfo(ctx context.Context, commitment Commitment) (*EpochInfo, error) { var resp response[EpochInfo] if err := c.getResponse(ctx, "getEpochInfo", []any{commitment}, &resp); err != nil { @@ -146,6 +148,8 @@ func (c *Client) GetEpochInfo(ctx context.Context, commitment Commitment) (*Epoc return &resp.Result, nil } +// GetVoteAccounts returns the account info and associated stake for all the voting accounts in the current bank. +// See API docs: https://solana.com/docs/rpc/http/getvoteaccounts func (c *Client) GetVoteAccounts( ctx context.Context, commitment Commitment, votePubkey *string, ) (*VoteAccounts, error) { @@ -162,6 +166,8 @@ func (c *Client) GetVoteAccounts( return &resp.Result, nil } +// GetVersion returns the current Solana version running on the node. +// See API docs: https://solana.com/docs/rpc/http/getversion func (c *Client) GetVersion(ctx context.Context) (string, error) { var resp response[struct { Version string `json:"solana-core"` @@ -172,6 +178,8 @@ func (c *Client) GetVersion(ctx context.Context) (string, error) { return resp.Result.Version, nil } +// GetSlot returns the slot that has reached the given or default commitment level. +// See API docs: https://solana.com/docs/rpc/http/getslot func (c *Client) GetSlot(ctx context.Context) (int64, error) { var resp response[int64] if err := c.getResponse(ctx, "getSlot", []any{}, &resp); err != nil { @@ -180,6 +188,8 @@ func (c *Client) GetSlot(ctx context.Context) (int64, error) { return resp.Result, nil } +// GetBlockProduction returns recent block production information from the current or previous epoch. +// See API docs: https://solana.com/docs/rpc/http/getblockproduction func (c *Client) GetBlockProduction( ctx context.Context, identity *string, firstSlot *int64, lastSlot *int64, ) (*BlockProduction, error) { @@ -219,6 +229,8 @@ func (c *Client) GetBlockProduction( return &resp.Result.Value, nil } +// GetBalance returns the lamport balance of the account of provided pubkey. +// See API docs:https://solana.com/docs/rpc/http/getbalance func (c *Client) GetBalance(ctx context.Context, address string) (float64, error) { var resp response[contextualResult[int64]] if err := c.getResponse(ctx, "getBalance", []any{address}, &resp); err != nil { @@ -227,6 +239,8 @@ func (c *Client) GetBalance(ctx context.Context, address string) (float64, error return float64(resp.Result.Value) / float64(LamportsInSol), nil } +// GetInflationReward returns the inflation / staking reward for a list of addresses for an epoch. +// See API docs: https://solana.com/docs/rpc/http/getinflationreward func (c *Client) GetInflationReward( ctx context.Context, addresses []string, commitment Commitment, epoch *int64, minContextSlot *int64, ) ([]InflationReward, error) { @@ -245,3 +259,16 @@ func (c *Client) GetInflationReward( } return resp.Result, nil } + +// GetLeaderSchedule returns the leader schedule for an epoch. +// See API docs: https://solana.com/docs/rpc/http/getleaderschedule +func (c *Client) GetLeaderSchedule( + ctx context.Context, commitment Commitment, +) (map[string][]int64, error) { + config := map[string]any{"commitment": string(commitment)} + var resp response[map[string][]int64] + if err := c.getResponse(ctx, "getLeaderSchedule", []any{nil, config}, &resp); err != nil { + return nil, err + } + return resp.Result, nil +} From 7db68af8187025080420fd69f0f98cb88b384319 Mon Sep 17 00:00:00 2001 From: Matt Johnstone Date: Mon, 7 Oct 2024 14:49:37 +0200 Subject: [PATCH 2/5] added GetBlock --- pkg/rpc/client.go | 28 +++++++++++++++++++++++----- pkg/rpc/responses.go | 17 +++++++++++++++++ 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 7a980e5..a3a0fad 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -180,9 +180,10 @@ func (c *Client) GetVersion(ctx context.Context) (string, error) { // GetSlot returns the slot that has reached the given or default commitment level. // See API docs: https://solana.com/docs/rpc/http/getslot -func (c *Client) GetSlot(ctx context.Context) (int64, error) { +func (c *Client) GetSlot(ctx context.Context, commitment Commitment) (int64, error) { + config := map[string]string{"commitment": string(commitment)} var resp response[int64] - if err := c.getResponse(ctx, "getSlot", []any{}, &resp); err != nil { + if err := c.getResponse(ctx, "getSlot", []any{config}, &resp); err != nil { return 0, err } return resp.Result, nil @@ -262,9 +263,7 @@ func (c *Client) GetInflationReward( // GetLeaderSchedule returns the leader schedule for an epoch. // See API docs: https://solana.com/docs/rpc/http/getleaderschedule -func (c *Client) GetLeaderSchedule( - ctx context.Context, commitment Commitment, -) (map[string][]int64, error) { +func (c *Client) GetLeaderSchedule(ctx context.Context, commitment Commitment) (map[string][]int64, error) { config := map[string]any{"commitment": string(commitment)} var resp response[map[string][]int64] if err := c.getResponse(ctx, "getLeaderSchedule", []any{nil, config}, &resp); err != nil { @@ -272,3 +271,22 @@ func (c *Client) GetLeaderSchedule( } return resp.Result, nil } + +// GetBlock returns identity and transaction information about a confirmed block in the ledger. +// See API docs: https://solana.com/docs/rpc/http/getblock +func (c *Client) GetBlock(ctx context.Context, slot int64, commitment Commitment) (*Block, error) { + if commitment == CommitmentProcessed { + klog.Fatalf("commitment %v is not supported for GetBlock", commitment) + } + config := map[string]any{ + "commitment": commitment, + "encoding": "json", // this is default, but no harm in specifying it + "transactionDetails": "none", // for now, can hard-code this out, as we don't need it + "rewards": true, // what we here for! + } + var resp response[Block] + if err := c.getResponse(ctx, "getBlock", []any{slot, config}, &resp); err != nil { + return nil, err + } + return &resp.Result, nil +} diff --git a/pkg/rpc/responses.go b/pkg/rpc/responses.go index b8a76be..c88f682 100644 --- a/pkg/rpc/responses.go +++ b/pkg/rpc/responses.go @@ -72,6 +72,23 @@ type ( Epoch int64 `json:"epoch"` PostBalance int64 `json:"postBalance"` } + + Block struct { + BlockHeight int64 `json:"blockHeight"` + BlockTime int64 `json:"blockTime,omitempty"` + Blockhash string `json:"blockhash"` + ParentSlot int64 `json:"parentSlot"` + PreviousBlockhash string `json:"previousBlockhash"` + Rewards []BlockReward `json:"rewards"` + } + + BlockReward struct { + Pubkey string `json:"pubkey"` + Lamports int64 `json:"lamports"` + PostBalance int64 `json:"postBalance"` + RewardType string `json:"rewardType"` + Commission uint8 `json:"commission"` + } ) func (hp *HostProduction) UnmarshalJSON(data []byte) error { From 05411bc684635dc326de97cc5292a0510213f792 Mon Sep 17 00:00:00 2001 From: Matt Johnstone Date: Mon, 7 Oct 2024 15:05:24 +0200 Subject: [PATCH 3/5] standardised commitments --- cmd/solana_exporter/exporter.go | 4 ++-- cmd/solana_exporter/exporter_test.go | 16 ++++++++-------- cmd/solana_exporter/slots.go | 6 +++--- pkg/rpc/client.go | 28 ++++++++++++---------------- 4 files changed, 25 insertions(+), 29 deletions(-) diff --git a/cmd/solana_exporter/exporter.go b/cmd/solana_exporter/exporter.go index 04e4aeb..d4ac8e9 100644 --- a/cmd/solana_exporter/exporter.go +++ b/cmd/solana_exporter/exporter.go @@ -138,7 +138,7 @@ func (c *solanaCollector) Describe(ch chan<- *prometheus.Desc) { } func (c *solanaCollector) collectVoteAccounts(ctx context.Context, ch chan<- prometheus.Metric) { - voteAccounts, err := c.rpcClient.GetVoteAccounts(ctx, rpc.CommitmentProcessed, votePubkey) + voteAccounts, err := c.rpcClient.GetVoteAccounts(ctx, rpc.CommitmentConfirmed, votePubkey) if err != nil { ch <- prometheus.NewInvalidMetric(c.totalValidatorsDesc, err) ch <- prometheus.NewInvalidMetric(c.validatorActivatedStake, err) @@ -217,7 +217,7 @@ func (c *solanaCollector) collectBalances(ctx context.Context, ch chan<- prometh func fetchBalances(ctx context.Context, client rpc.Provider, addresses []string) (map[string]float64, error) { balances := make(map[string]float64) for _, address := range addresses { - balance, err := client.GetBalance(ctx, address) + balance, err := client.GetBalance(ctx, rpc.CommitmentConfirmed, address) if err != nil { return nil, err } diff --git a/cmd/solana_exporter/exporter_test.go b/cmd/solana_exporter/exporter_test.go index cdc773a..201bb82 100644 --- a/cmd/solana_exporter/exporter_test.go +++ b/cmd/solana_exporter/exporter_test.go @@ -124,7 +124,7 @@ func (c *staticRPCClient) GetEpochInfo(ctx context.Context, commitment rpc.Commi } //goland:noinspection GoUnusedParameter -func (c *staticRPCClient) GetSlot(ctx context.Context) (int64, error) { +func (c *staticRPCClient) GetSlot(ctx context.Context, commitment rpc.Commitment) (int64, error) { return staticEpochInfo.AbsoluteSlot, nil } @@ -143,19 +143,19 @@ func (c *staticRPCClient) GetVoteAccounts( //goland:noinspection GoUnusedParameter func (c *staticRPCClient) GetBlockProduction( - ctx context.Context, identity *string, firstSlot *int64, lastSlot *int64, + ctx context.Context, commitment rpc.Commitment, identity *string, firstSlot *int64, lastSlot *int64, ) (*rpc.BlockProduction, error) { return &staticBlockProduction, nil } //goland:noinspection GoUnusedParameter -func (c *staticRPCClient) GetBalance(ctx context.Context, address string) (float64, error) { +func (c *staticRPCClient) GetBalance(ctx context.Context, commitment rpc.Commitment, address string) (float64, error) { return balances[address], nil } //goland:noinspection GoUnusedParameter func (c *staticRPCClient) GetInflationReward( - ctx context.Context, addresses []string, commitment rpc.Commitment, epoch *int64, minContextSlot *int64, + ctx context.Context, commitment rpc.Commitment, addresses []string, epoch *int64, minContextSlot *int64, ) ([]rpc.InflationReward, error) { return staticInflationRewards, nil } @@ -271,7 +271,7 @@ func (c *dynamicRPCClient) GetEpochInfo(ctx context.Context, commitment rpc.Comm } //goland:noinspection GoUnusedParameter -func (c *dynamicRPCClient) GetSlot(ctx context.Context) (int64, error) { +func (c *dynamicRPCClient) GetSlot(ctx context.Context, commitment rpc.Commitment) (int64, error) { return int64(c.Slot), nil } @@ -308,7 +308,7 @@ func (c *dynamicRPCClient) GetVoteAccounts( //goland:noinspection GoUnusedParameter func (c *dynamicRPCClient) GetBlockProduction( - ctx context.Context, identity *string, firstSlot *int64, lastSlot *int64, + ctx context.Context, commitment rpc.Commitment, identity *string, firstSlot *int64, lastSlot *int64, ) (*rpc.BlockProduction, error) { byIdentity := make(map[string]rpc.HostProduction) for _, identity := range identities { @@ -330,13 +330,13 @@ func (c *dynamicRPCClient) GetBlockProduction( } //goland:noinspection GoUnusedParameter -func (c *dynamicRPCClient) GetBalance(ctx context.Context, address string) (float64, error) { +func (c *dynamicRPCClient) GetBalance(ctx context.Context, client rpc.Commitment, address string) (float64, error) { return balances[address], nil } //goland:noinspection GoUnusedParameter func (c *dynamicRPCClient) GetInflationReward( - ctx context.Context, addresses []string, commitment rpc.Commitment, epoch *int64, minContextSlot *int64, + ctx context.Context, commitment rpc.Commitment, addresses []string, epoch *int64, minContextSlot *int64, ) ([]rpc.InflationReward, error) { return staticInflationRewards, nil } diff --git a/cmd/solana_exporter/slots.go b/cmd/solana_exporter/slots.go index 2c52244..4272f84 100644 --- a/cmd/solana_exporter/slots.go +++ b/cmd/solana_exporter/slots.go @@ -117,7 +117,7 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) { <-ticker.C ctx_, cancel := context.WithTimeout(ctx, httpTimeout) - epochInfo, err := c.client.GetEpochInfo(ctx_, rpc.CommitmentFinalized) + epochInfo, err := c.client.GetEpochInfo(ctx_, rpc.CommitmentConfirmed) if err != nil { klog.Warningf("Failed to get epoch info, bailing out: %v", err) } @@ -236,7 +236,7 @@ func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot i // fetch block production: ctx, cancel := context.WithTimeout(ctx, httpTimeout) defer cancel() - blockProduction, err := c.client.GetBlockProduction(ctx, nil, &startSlot, &endSlot) + blockProduction, err := c.client.GetBlockProduction(ctx, rpc.CommitmentConfirmed, nil, &startSlot, &endSlot) if err != nil { klog.Warningf("Failed to get block production, bailing out: %v", err) } @@ -278,7 +278,7 @@ func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch in defer cancel() rewardInfos, err := c.client.GetInflationReward( - ctx, c.inflationRewardAddresses, rpc.CommitmentFinalized, &epoch, nil, + ctx, rpc.CommitmentConfirmed, c.inflationRewardAddresses, &epoch, nil, ) if err != nil { return err diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index a3a0fad..447f0fe 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -39,7 +39,7 @@ type Provider interface { // The method takes a context for cancellation, and pointers to the first and last slots of the range. // It returns a BlockProduction struct containing the block production details, or an error if the operation fails. GetBlockProduction( - ctx context.Context, identity *string, firstSlot *int64, lastSlot *int64, + ctx context.Context, commitment Commitment, identity *string, firstSlot *int64, lastSlot *int64, ) (*BlockProduction, error) // GetEpochInfo retrieves the information regarding the current epoch. @@ -50,7 +50,7 @@ type Provider interface { // GetSlot retrieves the current slot number. // The method takes a context for cancellation. // It returns the current slot number as an int64, or an error if the operation fails. - GetSlot(ctx context.Context) (int64, error) + GetSlot(ctx context.Context, commitment Commitment) (int64, error) // GetVoteAccounts retrieves the vote accounts information. // The method takes a context for cancellation and a slice of parameters to filter the vote accounts. @@ -64,12 +64,12 @@ type Provider interface { GetVersion(ctx context.Context) (string, error) // GetBalance returns the SOL balance of the account at the provided address - GetBalance(ctx context.Context, address string) (float64, error) + GetBalance(ctx context.Context, commitment Commitment, address string) (float64, error) // GetInflationReward returns the inflation rewards (in lamports) awarded to the given addresses (vote accounts) // during the given epoch. GetInflationReward( - ctx context.Context, addresses []string, commitment Commitment, epoch *int64, minContextSlot *int64, + ctx context.Context, commitment Commitment, addresses []string, epoch *int64, minContextSlot *int64, ) ([]InflationReward, error) } @@ -192,7 +192,7 @@ func (c *Client) GetSlot(ctx context.Context, commitment Commitment) (int64, err // GetBlockProduction returns recent block production information from the current or previous epoch. // See API docs: https://solana.com/docs/rpc/http/getblockproduction func (c *Client) GetBlockProduction( - ctx context.Context, identity *string, firstSlot *int64, lastSlot *int64, + ctx context.Context, commitment Commitment, identity *string, firstSlot *int64, lastSlot *int64, ) (*BlockProduction, error) { // can't provide a last slot without a first: if firstSlot == nil && lastSlot != nil { @@ -200,7 +200,7 @@ func (c *Client) GetBlockProduction( } // format params: - config := make(map[string]any) + config := map[string]any{"commitment": string(commitment)} if identity != nil { config["identity"] = *identity } @@ -217,14 +217,9 @@ func (c *Client) GetBlockProduction( config["range"] = blockRange } - var params []any - if len(config) > 0 { - params = append(params, config) - } - // make request: var resp response[contextualResult[BlockProduction]] - if err := c.getResponse(ctx, "getBlockProduction", params, &resp); err != nil { + if err := c.getResponse(ctx, "getBlockProduction", []any{config}, &resp); err != nil { return nil, err } return &resp.Result.Value, nil @@ -232,9 +227,10 @@ func (c *Client) GetBlockProduction( // GetBalance returns the lamport balance of the account of provided pubkey. // See API docs:https://solana.com/docs/rpc/http/getbalance -func (c *Client) GetBalance(ctx context.Context, address string) (float64, error) { +func (c *Client) GetBalance(ctx context.Context, commitment Commitment, address string) (float64, error) { + config := map[string]string{"commitment": string(commitment)} var resp response[contextualResult[int64]] - if err := c.getResponse(ctx, "getBalance", []any{address}, &resp); err != nil { + if err := c.getResponse(ctx, "getBalance", []any{address, config}, &resp); err != nil { return 0, err } return float64(resp.Result.Value) / float64(LamportsInSol), nil @@ -243,7 +239,7 @@ func (c *Client) GetBalance(ctx context.Context, address string) (float64, error // GetInflationReward returns the inflation / staking reward for a list of addresses for an epoch. // See API docs: https://solana.com/docs/rpc/http/getinflationreward func (c *Client) GetInflationReward( - ctx context.Context, addresses []string, commitment Commitment, epoch *int64, minContextSlot *int64, + ctx context.Context, commitment Commitment, addresses []string, epoch *int64, minContextSlot *int64, ) ([]InflationReward, error) { // format params: config := map[string]any{"commitment": string(commitment)} @@ -274,7 +270,7 @@ func (c *Client) GetLeaderSchedule(ctx context.Context, commitment Commitment) ( // GetBlock returns identity and transaction information about a confirmed block in the ledger. // See API docs: https://solana.com/docs/rpc/http/getblock -func (c *Client) GetBlock(ctx context.Context, slot int64, commitment Commitment) (*Block, error) { +func (c *Client) GetBlock(ctx context.Context, commitment Commitment, slot int64) (*Block, error) { if commitment == CommitmentProcessed { klog.Fatalf("commitment %v is not supported for GetBlock", commitment) } From 328074ea6f369360f0e71d2cacbcda05ec337913 Mon Sep 17 00:00:00 2001 From: Matt Johnstone Date: Mon, 7 Oct 2024 16:55:24 +0200 Subject: [PATCH 4/5] working on fee rewards --- cmd/solana_exporter/exporter.go | 27 ++++++++++++++-- cmd/solana_exporter/exporter_test.go | 30 +++++++++++++++-- cmd/solana_exporter/slots.go | 48 ++++++++++++++++++++++++---- cmd/solana_exporter/slots_test.go | 6 ++-- cmd/solana_exporter/utils.go | 5 +++ pkg/rpc/client.go | 8 +++-- 6 files changed, 109 insertions(+), 15 deletions(-) diff --git a/cmd/solana_exporter/exporter.go b/cmd/solana_exporter/exporter.go index d4ac8e9..d31d40f 100644 --- a/cmd/solana_exporter/exporter.go +++ b/cmd/solana_exporter/exporter.go @@ -36,6 +36,11 @@ var ( "", "Comma-separated list of validator vote accounts to track inflationary rewards for", ) + feeRewardAddresses = flag.String( + "fee-reward-addresses", + "", + "Comma-separated list of validator identity accounts to track fee rewards for.", + ) ) func init() { @@ -50,6 +55,7 @@ type solanaCollector struct { balanceAddresses []string leaderSlotAddresses []string inflationRewardAddresses []string + feeRewardAddresses []string /// descriptors: totalValidatorsDesc *prometheus.Desc @@ -67,6 +73,7 @@ func createSolanaCollector( balanceAddresses []string, leaderSlotAddresses []string, inflationRewardAddresses []string, + feeRewardAddresses []string, ) *solanaCollector { return &solanaCollector{ rpcClient: provider, @@ -74,6 +81,7 @@ func createSolanaCollector( balanceAddresses: balanceAddresses, leaderSlotAddresses: leaderSlotAddresses, inflationRewardAddresses: inflationRewardAddresses, + feeRewardAddresses: feeRewardAddresses, totalValidatorsDesc: prometheus.NewDesc( "solana_active_validators", "Total number of active validators by state", @@ -120,10 +128,19 @@ func createSolanaCollector( } func NewSolanaCollector( - rpcAddr string, balanceAddresses []string, leaderSlotAddresses []string, inflationRewardAddresses []string, + rpcAddr string, + balanceAddresses []string, + leaderSlotAddresses []string, + inflationRewardAddresses []string, + feeRewardAddresses []string, ) *solanaCollector { return createSolanaCollector( - rpc.NewRPCClient(rpcAddr), slotPacerSchedule, balanceAddresses, leaderSlotAddresses, inflationRewardAddresses, + rpc.NewRPCClient(rpcAddr), + slotPacerSchedule, + balanceAddresses, + leaderSlotAddresses, + inflationRewardAddresses, + feeRewardAddresses, ) } @@ -255,6 +272,7 @@ func main() { balAddresses []string lsAddresses []string irAddresses []string + frAddresses []string ) if *balanceAddresses != "" { balAddresses = strings.Split(*balanceAddresses, ",") @@ -265,8 +283,11 @@ func main() { if *inflationRewardAddresses != "" { irAddresses = strings.Split(*inflationRewardAddresses, ",") } + if *feeRewardAddresses != "" { + frAddresses = strings.Split(*feeRewardAddresses, ",") + } - collector := NewSolanaCollector(*rpcAddr, balAddresses, lsAddresses, irAddresses) + collector := NewSolanaCollector(*rpcAddr, balAddresses, lsAddresses, irAddresses, frAddresses) slotWatcher := NewCollectorSlotWatcher(collector) go slotWatcher.WatchSlots(context.Background(), collector.slotPace) diff --git a/cmd/solana_exporter/exporter_test.go b/cmd/solana_exporter/exporter_test.go index 201bb82..8e3defa 100644 --- a/cmd/solana_exporter/exporter_test.go +++ b/cmd/solana_exporter/exporter_test.go @@ -160,6 +160,18 @@ func (c *staticRPCClient) GetInflationReward( return staticInflationRewards, nil } +//goland:noinspection GoUnusedParameter +func (c *staticRPCClient) GetLeaderSchedule( + ctx context.Context, commitment rpc.Commitment, slot int64, +) (map[string][]int64, error) { + return nil, nil +} + +//goland:noinspection GoUnusedParameter +func (c *staticRPCClient) GetBlock(ctx context.Context, commitment rpc.Commitment, slot int64) (*rpc.Block, error) { + return nil, nil +} + /* ===== DYNAMIC CLIENT =====: */ @@ -341,6 +353,18 @@ func (c *dynamicRPCClient) GetInflationReward( return staticInflationRewards, nil } +//goland:noinspection GoUnusedParameter +func (c *dynamicRPCClient) GetLeaderSchedule( + ctx context.Context, commitment rpc.Commitment, slot int64, +) (map[string][]int64, error) { + return nil, nil +} + +//goland:noinspection GoUnusedParameter +func (c *dynamicRPCClient) GetBlock(ctx context.Context, commitment rpc.Commitment, slot int64) (*rpc.Block, error) { + return nil, nil +} + /* ===== OTHER TEST UTILITIES =====: */ @@ -376,7 +400,9 @@ func runCollectionTests(t *testing.T, collector prometheus.Collector, testCases } func TestSolanaCollector_Collect_Static(t *testing.T) { - collector := createSolanaCollector(&staticRPCClient{}, slotPacerSchedule, identities, []string{}, votekeys) + collector := createSolanaCollector( + &staticRPCClient{}, slotPacerSchedule, identities, []string{}, votekeys, identities, + ) prometheus.NewPedanticRegistry().MustRegister(collector) testCases := []collectionTest{ @@ -454,7 +480,7 @@ solana_account_balance{address="ccc"} 3 func TestSolanaCollector_Collect_Dynamic(t *testing.T) { client := newDynamicRPCClient() - collector := createSolanaCollector(client, slotPacerSchedule, identities, []string{}, votekeys) + collector := createSolanaCollector(client, slotPacerSchedule, identities, []string{}, votekeys, identities) prometheus.NewPedanticRegistry().MustRegister(collector) // start off by testing initial state: diff --git a/cmd/solana_exporter/slots.go b/cmd/solana_exporter/slots.go index 4272f84..d37b0d8 100644 --- a/cmd/solana_exporter/slots.go +++ b/cmd/solana_exporter/slots.go @@ -21,6 +21,7 @@ type SlotWatcher struct { // config: leaderSlotAddresses []string inflationRewardAddresses []string + feeRewardAddresses []string // currentEpoch is the current epoch we are watching currentEpoch int64 @@ -30,6 +31,8 @@ type SlotWatcher struct { lastSlot int64 // slotWatermark is the last (most recent) slot we have tracked slotWatermark int64 + + leaderSchedule map[string][]int64 } var ( @@ -81,6 +84,14 @@ var ( }, []string{"votekey", "epoch"}, ) + + feeRewards = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "solana_fee_rewards", + Help: "Transaction fee rewards earned per validator identity account, per epoch", + }, + []string{"nodekey", "epoch"}, + ) ) func NewCollectorSlotWatcher(collector *solanaCollector) *SlotWatcher { @@ -88,6 +99,7 @@ func NewCollectorSlotWatcher(collector *solanaCollector) *SlotWatcher { client: collector.rpcClient, leaderSlotAddresses: collector.leaderSlotAddresses, inflationRewardAddresses: collector.inflationRewardAddresses, + feeRewardAddresses: collector.feeRewardAddresses, } } @@ -100,6 +112,7 @@ func init() { prometheus.MustRegister(leaderSlotsTotal) prometheus.MustRegister(leaderSlotsByEpoch) prometheus.MustRegister(inflationRewards) + prometheus.MustRegister(feeRewards) } func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) { @@ -246,12 +259,11 @@ func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot i valid := float64(production.BlocksProduced) skipped := float64(production.LeaderSlots - production.BlocksProduced) - epochStr := fmt.Sprintf("%d", c.currentEpoch) - leaderSlotsTotal.WithLabelValues("valid", address).Add(valid) leaderSlotsTotal.WithLabelValues("skipped", address).Add(skipped) if len(c.leaderSlotAddresses) == 0 || slices.Contains(c.leaderSlotAddresses, address) { + epochStr := toString(c.currentEpoch) leaderSlotsByEpoch.WithLabelValues("valid", address, epochStr).Add(valid) leaderSlotsByEpoch.WithLabelValues("skipped", address, epochStr).Add(skipped) } @@ -271,8 +283,7 @@ func getEpochBounds(info *rpc.EpochInfo) (int64, int64) { // fetchAndEmitInflationRewards fetches and emits the inflation rewards for the configured inflationRewardAddresses // at the provided epoch func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch int64) error { - epochStr := fmt.Sprintf("%d", epoch) - klog.Infof("Fetching inflation reward for epoch %v ...", epochStr) + klog.Infof("Fetching inflation reward for epoch %v ...", toString(epoch)) ctx, cancel := context.WithTimeout(ctx, httpTimeout) defer cancel() @@ -287,8 +298,33 @@ func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch in for i, rewardInfo := range rewardInfos { address := c.inflationRewardAddresses[i] reward := float64(rewardInfo.Amount) / float64(rpc.LamportsInSol) - inflationRewards.WithLabelValues(address, epochStr).Set(reward) + inflationRewards.WithLabelValues(address, toString(epoch)).Set(reward) } - klog.Infof("Fetched inflation reward for epoch %v.", epochStr) + klog.Infof("Fetched inflation reward for epoch %v.", epoch) + return nil +} + +func (c *SlotWatcher) fetchAndEmitFeeReward( + ctx context.Context, identity string, epoch int64, slot int64, +) error { + block, err := c.client.GetBlock(ctx, rpc.CommitmentConfirmed, slot) + if err != nil { + return err + } + + for _, reward := range block.Rewards { + if reward.RewardType == "fee" { + // make sure we haven't made a logic issue or something: + assertf( + reward.Pubkey == identity, + "fetching fee reward for %v but got fee reward for %v", + identity, + reward.Pubkey, + ) + amount := float64(reward.Lamports) / float64(rpc.LamportsInSol) + feeRewards.WithLabelValues(identity, toString(epoch)).Add(amount) + } + } + return nil } diff --git a/cmd/solana_exporter/slots_test.go b/cmd/solana_exporter/slots_test.go index 7c88ce1..daa3564 100644 --- a/cmd/solana_exporter/slots_test.go +++ b/cmd/solana_exporter/slots_test.go @@ -92,7 +92,9 @@ func TestSolanaCollector_WatchSlots_Static(t *testing.T) { leaderSlotsTotal.Reset() leaderSlotsByEpoch.Reset() - collector := createSolanaCollector(&staticRPCClient{}, 100*time.Millisecond, identities, []string{}, votekeys) + collector := createSolanaCollector( + &staticRPCClient{}, 100*time.Millisecond, identities, []string{}, votekeys, identities, + ) watcher := NewCollectorSlotWatcher(collector) prometheus.NewPedanticRegistry().MustRegister(collector) ctx, cancel := context.WithCancel(context.Background()) @@ -161,7 +163,7 @@ func TestSolanaCollector_WatchSlots_Dynamic(t *testing.T) { // create clients: client := newDynamicRPCClient() - collector := createSolanaCollector(client, 300*time.Millisecond, identities, []string{}, votekeys) + collector := createSolanaCollector(client, 300*time.Millisecond, identities, []string{}, votekeys, identities) watcher := NewCollectorSlotWatcher(collector) prometheus.NewPedanticRegistry().MustRegister(collector) diff --git a/cmd/solana_exporter/utils.go b/cmd/solana_exporter/utils.go index 0ea2505..5c9f5e9 100644 --- a/cmd/solana_exporter/utils.go +++ b/cmd/solana_exporter/utils.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "k8s.io/klog/v2" ) @@ -9,3 +10,7 @@ func assertf(condition bool, format string, args ...any) { klog.Fatalf(format, args...) } } + +func toString(i int64) string { + return fmt.Sprintf("%v", i) +} diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 447f0fe..b69417b 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -71,6 +71,10 @@ type Provider interface { GetInflationReward( ctx context.Context, commitment Commitment, addresses []string, epoch *int64, minContextSlot *int64, ) ([]InflationReward, error) + + GetLeaderSchedule(ctx context.Context, commitment Commitment, slot int64) (map[string][]int64, error) + + GetBlock(ctx context.Context, commitment Commitment, slot int64) (*Block, error) } func (c Commitment) MarshalJSON() ([]byte, error) { @@ -259,10 +263,10 @@ func (c *Client) GetInflationReward( // GetLeaderSchedule returns the leader schedule for an epoch. // See API docs: https://solana.com/docs/rpc/http/getleaderschedule -func (c *Client) GetLeaderSchedule(ctx context.Context, commitment Commitment) (map[string][]int64, error) { +func (c *Client) GetLeaderSchedule(ctx context.Context, commitment Commitment, slot int64) (map[string][]int64, error) { config := map[string]any{"commitment": string(commitment)} var resp response[map[string][]int64] - if err := c.getResponse(ctx, "getLeaderSchedule", []any{nil, config}, &resp); err != nil { + if err := c.getResponse(ctx, "getLeaderSchedule", []any{slot, config}, &resp); err != nil { return nil, err } return resp.Result, nil From c65c347504706703f9e0ba1071b417033d3fa725 Mon Sep 17 00:00:00 2001 From: Matt Johnstone Date: Tue, 8 Oct 2024 16:33:41 +0200 Subject: [PATCH 5/5] added fee rewards --- cmd/solana_exporter/exporter.go | 5 ++ cmd/solana_exporter/exporter_test.go | 34 +++---- cmd/solana_exporter/slots.go | 128 +++++++++++++++++++-------- cmd/solana_exporter/utils.go | 46 ++++++++++ cmd/solana_exporter/utils_test.go | 24 +++++ 5 files changed, 179 insertions(+), 58 deletions(-) create mode 100644 cmd/solana_exporter/utils_test.go diff --git a/cmd/solana_exporter/exporter.go b/cmd/solana_exporter/exporter.go index d31d40f..1d2325d 100644 --- a/cmd/solana_exporter/exporter.go +++ b/cmd/solana_exporter/exporter.go @@ -276,15 +276,20 @@ func main() { ) if *balanceAddresses != "" { balAddresses = strings.Split(*balanceAddresses, ",") + klog.Infof("Monitoring balances for %v", balAddresses) } if *leaderSlotAddresses != "" { lsAddresses = strings.Split(*leaderSlotAddresses, ",") + klog.Infof("Monitoring leader-slot by epoch for %v", lsAddresses) + } if *inflationRewardAddresses != "" { irAddresses = strings.Split(*inflationRewardAddresses, ",") + klog.Infof("Monitoring inflation reward by epoch for %v", irAddresses) } if *feeRewardAddresses != "" { frAddresses = strings.Split(*feeRewardAddresses, ",") + klog.Infof("Monitoring fee reward by epoch for %v", frAddresses) } collector := NewSolanaCollector(*rpcAddr, balAddresses, lsAddresses, irAddresses, frAddresses) diff --git a/cmd/solana_exporter/exporter_test.go b/cmd/solana_exporter/exporter_test.go index 8e3defa..455869e 100644 --- a/cmd/solana_exporter/exporter_test.go +++ b/cmd/solana_exporter/exporter_test.go @@ -47,7 +47,7 @@ var ( identityVotes = map[string]string{"aaa": "AAA", "bbb": "BBB", "ccc": "CCC"} nv = len(identities) staticEpochInfo = rpc.EpochInfo{ - AbsoluteSlot: 166598, + AbsoluteSlot: 166599, BlockHeight: 166500, Epoch: 27, SlotIndex: 2790, @@ -70,12 +70,9 @@ var ( staticVoteAccounts = rpc.VoteAccounts{ Current: []rpc.VoteAccount{ { - ActivatedStake: 42, - Commission: 0, - EpochCredits: [][]int{ - {1, 64, 0}, - {2, 192, 64}, - }, + ActivatedStake: 42, + Commission: 0, + EpochCredits: [][]int{{1, 64, 0}, {2, 192, 64}}, EpochVoteAccount: true, LastVote: 147, NodePubkey: "bbb", @@ -83,12 +80,9 @@ var ( VotePubkey: "BBB", }, { - ActivatedStake: 43, - Commission: 1, - EpochCredits: [][]int{ - {2, 65, 1}, - {3, 193, 65}, - }, + ActivatedStake: 43, + Commission: 1, + EpochCredits: [][]int{{2, 65, 1}, {3, 193, 65}}, EpochVoteAccount: true, LastVote: 148, NodePubkey: "ccc", @@ -98,12 +92,9 @@ var ( }, Delinquent: []rpc.VoteAccount{ { - ActivatedStake: 49, - Commission: 2, - EpochCredits: [][]int{ - {10, 594, 6}, - {9, 98, 4}, - }, + ActivatedStake: 49, + Commission: 2, + EpochCredits: [][]int{{10, 594, 6}, {9, 98, 4}}, EpochVoteAccount: true, LastVote: 92, NodePubkey: "aaa", @@ -112,6 +103,9 @@ var ( }, }, } + staticLeaderSchedule = map[string][]int64{ + "aaa": {0, 3, 6, 9, 12}, "bbb": {1, 4, 7, 10, 13}, "ccc": {2, 5, 8, 11, 14}, + } ) /* @@ -164,7 +158,7 @@ func (c *staticRPCClient) GetInflationReward( func (c *staticRPCClient) GetLeaderSchedule( ctx context.Context, commitment rpc.Commitment, slot int64, ) (map[string][]int64, error) { - return nil, nil + return staticLeaderSchedule, nil } //goland:noinspection GoUnusedParameter diff --git a/cmd/solana_exporter/slots.go b/cmd/solana_exporter/slots.go index d37b0d8..5f88c36 100644 --- a/cmd/solana_exporter/slots.go +++ b/cmd/solana_exporter/slots.go @@ -130,15 +130,17 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) { <-ticker.C ctx_, cancel := context.WithTimeout(ctx, httpTimeout) - epochInfo, err := c.client.GetEpochInfo(ctx_, rpc.CommitmentConfirmed) - if err != nil { - klog.Warningf("Failed to get epoch info, bailing out: %v", err) - } + // TODO: separate fee-rewards watching from general slot watching, such that general slot watching commitment level can be dropped to confirmed + epochInfo, err := c.client.GetEpochInfo(ctx_, rpc.CommitmentFinalized) cancel() + if err != nil { + klog.Errorf("Failed to get epoch info, bailing out: %v", err) + continue + } // if we are running for the first time, then we need to set our tracking numbers: if c.currentEpoch == 0 { - c.trackEpoch(epochInfo) + c.trackEpoch(ctx, epochInfo) } totalTransactionsTotal.Set(float64(epochInfo.TransactionCount)) @@ -163,14 +165,15 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) { } // update block production metrics up until the current slot: - c.fetchAndEmitBlockProduction(ctx, epochInfo.AbsoluteSlot) + c.moveSlotWatermark(ctx, epochInfo.AbsoluteSlot) } } } // trackEpoch takes in a new rpc.EpochInfo and sets the SlotWatcher tracking metrics accordingly, // and updates the prometheus gauges associated with those metrics. -func (c *SlotWatcher) trackEpoch(epoch *rpc.EpochInfo) { +func (c *SlotWatcher) trackEpoch(ctx context.Context, epoch *rpc.EpochInfo) { + klog.Infof("Tracking epoch %v (from %v)", epoch.Epoch, c.currentEpoch) firstSlot, lastSlot := getEpochBounds(epoch) // if we haven't yet set c.currentEpoch, that (hopefully) means this is the initial setup, // and so we can simply store the tracking numbers @@ -207,16 +210,29 @@ func (c *SlotWatcher) trackEpoch(epoch *rpc.EpochInfo) { } // emit epoch bounds: + klog.Infof("Emitting epoch bounds: %v (slots %v -> %v)", c.currentEpoch, c.firstSlot, c.lastSlot) currentEpochNumber.Set(float64(c.currentEpoch)) epochFirstSlot.Set(float64(c.firstSlot)) epochLastSlot.Set(float64(c.lastSlot)) + + // update leader schedule: + ctx, cancel := context.WithTimeout(ctx, httpTimeout) + defer cancel() + klog.Infof("Updating leader schedule for epoch %v ...", c.currentEpoch) + leaderSchedule, err := GetTrimmedLeaderSchedule( + ctx, c.client, c.feeRewardAddresses, epoch.AbsoluteSlot, c.firstSlot, + ) + if err != nil { + klog.Errorf("Failed to get trimmed leader schedule, bailing out: %v", err) + } + c.leaderSchedule = leaderSchedule } // closeCurrentEpoch is called when an epoch change-over happens, and we need to make sure we track the last // remaining slots in the "current" epoch before we start tracking the new one. func (c *SlotWatcher) closeCurrentEpoch(ctx context.Context, newEpoch *rpc.EpochInfo) { - c.fetchAndEmitBlockProduction(ctx, c.lastSlot) - c.trackEpoch(newEpoch) + c.moveSlotWatermark(ctx, c.lastSlot) + c.trackEpoch(ctx, newEpoch) } // checkValidSlotRange makes sure that the slot range we are going to query is within the current epoch we are tracking. @@ -234,6 +250,13 @@ func (c *SlotWatcher) checkValidSlotRange(from, to int64) error { return nil } +// moveSlotWatermark performs all the slot-watching tasks required to move the slotWatermark to the provided 'to' slot. +func (c *SlotWatcher) moveSlotWatermark(ctx context.Context, to int64) { + c.fetchAndEmitBlockProduction(ctx, to) + c.fetchAndEmitFeeRewards(ctx, to) + c.slotWatermark = to +} + // fetchAndEmitBlockProduction fetches block production up to the provided endSlot, emits the prometheus metrics, // and updates the SlotWatcher.slotWatermark accordingly func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot int64) { @@ -249,9 +272,10 @@ func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot i // fetch block production: ctx, cancel := context.WithTimeout(ctx, httpTimeout) defer cancel() - blockProduction, err := c.client.GetBlockProduction(ctx, rpc.CommitmentConfirmed, nil, &startSlot, &endSlot) + blockProduction, err := c.client.GetBlockProduction(ctx, rpc.CommitmentFinalized, nil, &startSlot, &endSlot) if err != nil { - klog.Warningf("Failed to get block production, bailing out: %v", err) + klog.Errorf("Failed to get block production, bailing out: %v", err) + return } // emit the metrics: @@ -270,8 +294,61 @@ func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot i } klog.Infof("Fetched block production in [%v -> %v]", startSlot, endSlot) - // update the slot watermark: - c.slotWatermark = endSlot +} + +// fetchAndEmitFeeRewards fetches and emits all the fee rewards for the tracked addresses between the +// slotWatermark and endSlot +func (c *SlotWatcher) fetchAndEmitFeeRewards(ctx context.Context, endSlot int64) { + startSlot := c.slotWatermark + 1 + klog.Infof("Fetching fee rewards in [%v -> %v]", startSlot, endSlot) + + if err := c.checkValidSlotRange(startSlot, endSlot); err != nil { + klog.Fatalf("invalid slot range: %v", err) + } + scheduleToFetch := SelectFromSchedule(c.leaderSchedule, startSlot, endSlot) + for identity, leaderSlots := range scheduleToFetch { + if len(leaderSlots) == 0 { + continue + } + + klog.Infof("Fetching fee rewards for %v in [%v -> %v]: %v ...", identity, startSlot, endSlot, leaderSlots) + for _, slot := range leaderSlots { + ctx, cancel := context.WithTimeout(ctx, httpTimeout) + err := c.fetchAndEmitSingleFeeReward(ctx, identity, c.currentEpoch, slot) + cancel() + if err != nil { + klog.Errorf("Failed to fetch fee rewards for %v at %v: %v", identity, slot, err) + } + } + } + + klog.Infof("Fetched fee rewards in [%v -> %v]", startSlot, endSlot) +} + +// fetchAndEmitSingleFeeReward fetches and emits the fee reward for a single block. +func (c *SlotWatcher) fetchAndEmitSingleFeeReward( + ctx context.Context, identity string, epoch int64, slot int64, +) error { + block, err := c.client.GetBlock(ctx, rpc.CommitmentConfirmed, slot) + if err != nil { + return err + } + + for _, reward := range block.Rewards { + if reward.RewardType == "fee" { + // make sure we haven't made a logic issue or something: + assertf( + reward.Pubkey == identity, + "fetching fee reward for %v but got fee reward for %v", + identity, + reward.Pubkey, + ) + amount := float64(reward.Lamports) / float64(rpc.LamportsInSol) + feeRewards.WithLabelValues(identity, toString(epoch)).Add(amount) + } + } + + return nil } // getEpochBounds returns the first slot and last slot within an [inclusive] Epoch @@ -303,28 +380,3 @@ func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch in klog.Infof("Fetched inflation reward for epoch %v.", epoch) return nil } - -func (c *SlotWatcher) fetchAndEmitFeeReward( - ctx context.Context, identity string, epoch int64, slot int64, -) error { - block, err := c.client.GetBlock(ctx, rpc.CommitmentConfirmed, slot) - if err != nil { - return err - } - - for _, reward := range block.Rewards { - if reward.RewardType == "fee" { - // make sure we haven't made a logic issue or something: - assertf( - reward.Pubkey == identity, - "fetching fee reward for %v but got fee reward for %v", - identity, - reward.Pubkey, - ) - amount := float64(reward.Lamports) / float64(rpc.LamportsInSol) - feeRewards.WithLabelValues(identity, toString(epoch)).Add(amount) - } - } - - return nil -} diff --git a/cmd/solana_exporter/utils.go b/cmd/solana_exporter/utils.go index 5c9f5e9..aaf06b9 100644 --- a/cmd/solana_exporter/utils.go +++ b/cmd/solana_exporter/utils.go @@ -1,7 +1,9 @@ package main import ( + "context" "fmt" + "github.com/asymmetric-research/solana_exporter/pkg/rpc" "k8s.io/klog/v2" ) @@ -11,6 +13,50 @@ func assertf(condition bool, format string, args ...any) { } } +// toString is just a simple utility function for converting int -> string func toString(i int64) string { return fmt.Sprintf("%v", i) } + +// SelectFromSchedule takes a leader-schedule and returns a trimmed leader-schedule +// containing only the slots within the provided range +func SelectFromSchedule(schedule map[string][]int64, startSlot, endSlot int64) map[string][]int64 { + selected := make(map[string][]int64) + for key, values := range schedule { + var selectedValues []int64 + for _, value := range values { + if value >= startSlot && value <= endSlot { + selectedValues = append(selectedValues, value) + } + } + selected[key] = selectedValues + } + return selected +} + +// GetTrimmedLeaderSchedule fetches the leader schedule, but only for the validators we are interested in. +// Additionally, it adjusts the leader schedule to the current epoch offset. +func GetTrimmedLeaderSchedule( + ctx context.Context, client rpc.Provider, identities []string, slot, epochFirstSlot int64, +) (map[string][]int64, error) { + leaderSchedule, err := client.GetLeaderSchedule(ctx, rpc.CommitmentConfirmed, slot) + if err != nil { + return nil, fmt.Errorf("failed to get leader schedule: %w", err) + } + + trimmedLeaderSchedule := make(map[string][]int64) + for _, id := range identities { + if leaderSlots, ok := leaderSchedule[id]; ok { + // when you fetch the leader schedule, it gives you slot indexes, we want absolute slots: + absoluteSlots := make([]int64, len(leaderSlots)) + for i, slotIndex := range leaderSlots { + absoluteSlots[i] = slotIndex + epochFirstSlot + } + trimmedLeaderSchedule[id] = absoluteSlots + } else { + klog.Warningf("failed to find leader slots for %v", id) + } + } + + return trimmedLeaderSchedule, nil +} diff --git a/cmd/solana_exporter/utils_test.go b/cmd/solana_exporter/utils_test.go new file mode 100644 index 0000000..5925ae6 --- /dev/null +++ b/cmd/solana_exporter/utils_test.go @@ -0,0 +1,24 @@ +package main + +import ( + "context" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestSelectFromSchedule(t *testing.T) { + selected := SelectFromSchedule(staticLeaderSchedule, 5, 10) + assert.Equal(t, + map[string][]int64{"aaa": {6, 9}, "bbb": {7, 10}, "ccc": {5, 8}}, + selected, + ) +} + +func TestGetTrimmedLeaderSchedule(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + schedule, err := GetTrimmedLeaderSchedule(ctx, &staticRPCClient{}, []string{"aaa", "bbb"}, 10, 10) + assert.NoError(t, err) + + assert.Equal(t, map[string][]int64{"aaa": {10, 13, 16, 19, 22}, "bbb": {11, 14, 17, 20, 23}}, schedule) +}