big mock server test refactor

This commit is contained in:
Matt Johnstone 2024-10-28 18:19:07 +02:00
parent dda909aecc
commit fa71eeb20b
No known key found for this signature in database
GPG Key ID: BE985FBB9BE7D3BB
16 changed files with 829 additions and 779 deletions

View File

@ -7,9 +7,7 @@ import (
"github.com/asymmetric-research/solana_exporter/pkg/rpc"
"github.com/asymmetric-research/solana_exporter/pkg/slog"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"net/http"
)
const (
@ -32,7 +30,7 @@ const (
)
type SolanaCollector struct {
rpcClient rpc.Provider
rpcClient *rpc.Client
logger *zap.SugaredLogger
config *ExporterConfig
@ -50,9 +48,9 @@ type SolanaCollector struct {
NodeFirstAvailableBlock *GaugeDesc
}
func NewSolanaCollector(provider rpc.Provider, config *ExporterConfig) *SolanaCollector {
func NewSolanaCollector(client *rpc.Client, config *ExporterConfig) *SolanaCollector {
collector := &SolanaCollector{
rpcClient: provider,
rpcClient: client,
logger: slog.Get(),
config: config,
ValidatorActiveStake: NewGaugeDesc(
@ -124,7 +122,7 @@ func (c *SolanaCollector) collectVoteAccounts(ctx context.Context, ch chan<- pro
return
}
c.logger.Info("Collecting vote accounts...")
voteAccounts, err := c.rpcClient.GetVoteAccounts(ctx, rpc.CommitmentConfirmed, nil)
voteAccounts, err := c.rpcClient.GetVoteAccounts(ctx, rpc.CommitmentConfirmed)
if err != nil {
c.logger.Errorf("failed to get vote accounts: %v", err)
ch <- c.ValidatorActiveStake.NewInvalidMetric(err)
@ -263,32 +261,3 @@ func (c *SolanaCollector) Collect(ch chan<- prometheus.Metric) {
c.logger.Info("=========== END COLLECTION ===========")
}
func main() {
logger := slog.Get()
ctx := context.Background()
config, err := NewExporterConfigFromCLI(ctx)
if err != nil {
logger.Fatal(err)
}
if config.ComprehensiveSlotTracking {
logger.Warn(
"Comprehensive slot tracking will lead to potentially thousands of new " +
"Prometheus metrics being created every epoch.",
)
}
client := rpc.NewRPCClient(config.RpcUrl, config.HttpTimeout)
collector := NewSolanaCollector(client, config)
slotWatcher := NewSlotWatcher(client, config)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go slotWatcher.WatchSlots(ctx)
prometheus.MustRegister(collector)
http.Handle("/metrics", promhttp.Handler())
logger.Infof("listening on %s", config.ListenAddress)
logger.Fatal(http.ListenAndServe(config.ListenAddress, nil))
}

View File

@ -3,108 +3,72 @@ package main
import (
"bytes"
"context"
"fmt"
"github.com/asymmetric-research/solana_exporter/pkg/rpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"math"
"math/rand"
"regexp"
"slices"
"strings"
"testing"
"time"
)
type (
staticRPCClient struct{}
dynamicRPCClient struct {
DynamicServer struct {
Server *rpc.MockServer
Slot int
BlockHeight int
Epoch int
EpochSize int
SlotTime time.Duration
TransactionCount int
Version string
SlotInfos map[int]slotInfo
LeaderIndex int
ValidatorInfos map[string]validatorInfo
Balances map[string]float64
}
slotInfo struct {
leader string
blockProduced bool
}
validatorInfo struct {
Stake int
LastVote int
Commission int
Delinquent bool
LeaderSchedule map[string][]int
}
)
var (
identities = []string{"aaa", "bbb", "ccc"}
nodekeys = []string{"aaa", "bbb", "ccc"}
votekeys = []string{"AAA", "BBB", "CCC"}
balances = map[string]float64{"aaa": 1, "bbb": 2, "ccc": 3, "AAA": 4, "BBB": 5, "CCC": 6}
identityVotes = map[string]string{"aaa": "AAA", "bbb": "BBB", "ccc": "CCC"}
nv = len(identities)
staticEpochInfo = rpc.EpochInfo{
AbsoluteSlot: 166599,
BlockHeight: 166500,
Epoch: 27,
SlotIndex: 2790,
SlotsInEpoch: 8192,
TransactionCount: 22661093,
}
staticBlockProduction = rpc.BlockProduction{
ByIdentity: map[string]rpc.HostProduction{
"aaa": {300, 100},
"bbb": {400, 360},
"ccc": {300, 296},
},
Range: rpc.BlockProductionRange{FirstSlot: 1000, LastSlot: 2000},
}
staticInflationRewards = []rpc.InflationReward{
{Amount: 1000, EffectiveSlot: 166598, Epoch: 27, PostBalance: 2000},
{Amount: 2000, EffectiveSlot: 166598, Epoch: 27, PostBalance: 4000},
{Amount: 3000, EffectiveSlot: 166598, Epoch: 27, PostBalance: 6000},
}
staticVoteAccounts = rpc.VoteAccounts{
Current: []rpc.VoteAccount{
rawVoteAccounts = map[string]any{
"current": []map[string]any{
{
ActivatedStake: 42,
Commission: 0,
EpochCredits: [][]int{{1, 64, 0}, {2, 192, 64}},
EpochVoteAccount: true,
LastVote: 147,
NodePubkey: "bbb",
RootSlot: 18,
VotePubkey: "BBB",
"activatedStake": 42,
"lastVote": 147,
"nodePubkey": "bbb",
"rootSlot": 18,
"votePubkey": "BBB",
},
{
ActivatedStake: 43,
Commission: 1,
EpochCredits: [][]int{{2, 65, 1}, {3, 193, 65}},
EpochVoteAccount: true,
LastVote: 148,
NodePubkey: "ccc",
RootSlot: 19,
VotePubkey: "CCC",
"activatedStake": 43,
"lastVote": 148,
"nodePubkey": "ccc",
"rootSlot": 19,
"votePubkey": "CCC",
},
},
Delinquent: []rpc.VoteAccount{
"delinquent": []map[string]any{
{
ActivatedStake: 49,
Commission: 2,
EpochCredits: [][]int{{10, 594, 6}, {9, 98, 4}},
EpochVoteAccount: true,
LastVote: 92,
NodePubkey: "aaa",
RootSlot: 3,
VotePubkey: "AAA",
"activatedStake": 49,
"lastVote": 92,
"nodePubkey": "aaa",
"rootSlot": 3,
"votePubkey": "AAA",
},
},
}
staticLeaderSchedule = map[string][]int64{
"aaa": {0, 3, 6, 9, 12}, "bbb": {1, 4, 7, 10, 13}, "ccc": {2, 5, 8, 11, 14},
rawBalances = map[string]int{
"aaa": 1 * rpc.LamportsInSol,
"bbb": 2 * rpc.LamportsInSol,
"ccc": 3 * rpc.LamportsInSol,
"AAA": 4 * rpc.LamportsInSol,
"BBB": 5 * rpc.LamportsInSol,
"CCC": 6 * rpc.LamportsInSol,
}
balanceMetricResponse = `
# HELP solana_account_balance Solana account balances, grouped by address
@ -115,120 +79,69 @@ solana_account_balance{address="CCC"} 6
solana_account_balance{address="aaa"} 1
solana_account_balance{address="bbb"} 2
solana_account_balance{address="ccc"} 3
`
`
dynamicLeaderSchedule = map[string][]int{
"aaa": {0, 1, 2, 3, 12, 13, 14, 15},
"bbb": {4, 5, 6, 7, 16, 17, 18, 19},
"ccc": {8, 9, 10, 11, 20, 21, 22, 23},
}
)
/*
===== STATIC CLIENT =====:
*/
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetEpochInfo(ctx context.Context, commitment rpc.Commitment) (*rpc.EpochInfo, error) {
return &staticEpochInfo, nil
}
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetSlot(ctx context.Context, commitment rpc.Commitment) (int64, error) {
return staticEpochInfo.AbsoluteSlot, nil
}
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetVersion(ctx context.Context) (string, error) {
version := "1.16.7"
return version, nil
}
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetVoteAccounts(
ctx context.Context, commitment rpc.Commitment, votePubkey *string,
) (*rpc.VoteAccounts, error) {
return &staticVoteAccounts, nil
}
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetBlockProduction(
ctx context.Context, commitment rpc.Commitment, identity *string, firstSlot *int64, lastSlot *int64,
) (*rpc.BlockProduction, error) {
return &staticBlockProduction, nil
}
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetBalance(ctx context.Context, commitment rpc.Commitment, address string) (float64, error) {
return balances[address], nil
}
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetInflationReward(
ctx context.Context, commitment rpc.Commitment, addresses []string, epoch *int64, minContextSlot *int64,
) ([]rpc.InflationReward, error) {
return staticInflationRewards, nil
}
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetLeaderSchedule(
ctx context.Context, commitment rpc.Commitment, slot int64,
) (map[string][]int64, error) {
return staticLeaderSchedule, nil
}
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetBlock(
ctx context.Context, commitment rpc.Commitment, slot int64, transactionDetails string,
) (*rpc.Block, error) {
return nil, nil
}
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetHealth(ctx context.Context) (string, error) {
return "ok", nil
}
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetFirstAvailableBlock(ctx context.Context) (int64, error) {
return 33, nil
}
//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetMinimumLedgerSlot(ctx context.Context) (int64, error) {
return 23, nil
}
/*
===== DYNAMIC CLIENT =====:
*/
func newDynamicRPCClient() *dynamicRPCClient {
validatorInfos := make(map[string]validatorInfo)
for identity := range identityVotes {
validatorInfos[identity] = validatorInfo{
func voteTx(nodekey string) []string {
return []string{nodekey, strings.ToUpper(nodekey), VoteProgram}
}
func NewDynamicRpcClient(t *testing.T, slot int) (*DynamicServer, *rpc.Client) {
validatorInfos := make(map[string]rpc.MockValidatorInfo)
for _, nodekey := range nodekeys {
validatorInfos[nodekey] = rpc.MockValidatorInfo{
Votekey: strings.ToUpper(nodekey),
Stake: 1_000_000,
LastVote: 0,
Commission: 5,
Delinquent: false,
}
}
return &dynamicRPCClient{
Slot: 0,
BlockHeight: 0,
Epoch: 0,
EpochSize: 20,
SlotTime: 100 * time.Millisecond,
TransactionCount: 0,
Version: "v1.0.0",
SlotInfos: map[int]slotInfo{},
LeaderIndex: 0,
ValidatorInfos: validatorInfos,
mockServer, client := rpc.NewMockClient(t,
map[string]any{
"getVersion": map[string]string{"solana-core": "v1.0.0"},
"getLeaderSchedule": dynamicLeaderSchedule,
"getHealth": "ok",
},
rawBalances,
map[string]int{"AAA": 10, "BBB": 10, "CCC": 10},
nil,
validatorInfos,
)
server := DynamicServer{
Slot: 0,
Server: mockServer,
EpochSize: 24,
SlotTime: 100 * time.Millisecond,
LeaderSchedule: dynamicLeaderSchedule,
}
server.PopulateSlot(0)
for {
server.Slot++
server.PopulateSlot(server.Slot)
if server.Slot == slot {
break
}
}
return &server, client
}
func (c *dynamicRPCClient) Run(ctx context.Context) {
func (c *DynamicServer) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
c.newSlot()
c.Slot++
c.PopulateSlot(c.Slot)
// add 5% noise to the slot time:
noiseRange := float64(c.SlotTime) * 0.05
noise := (rand.Float64()*2 - 1) * noiseRange
@ -237,170 +150,71 @@ func (c *dynamicRPCClient) Run(ctx context.Context) {
}
}
func (c *dynamicRPCClient) newSlot() {
c.Slot++
// leader changes every 4 slots
if c.Slot%4 == 0 {
c.LeaderIndex = (c.LeaderIndex + 1) % nv
func (c *DynamicServer) getLeader() string {
index := c.Slot % c.EpochSize
for leader, slots := range c.LeaderSchedule {
if slices.Contains(slots, index) {
return leader
}
}
panic(fmt.Sprintf("leader not found at slot %d", c.Slot))
}
if c.Slot%c.EpochSize == 0 {
c.Epoch++
}
func (c *DynamicServer) PopulateSlot(slot int) {
leader := c.getLeader()
// assume 90% chance of block produced:
blockProduced := rand.Intn(100) <= 90
// add slot info:
c.SlotInfos[c.Slot] = slotInfo{leader: identities[c.LeaderIndex], blockProduced: blockProduced}
if blockProduced {
var block *rpc.MockBlockInfo
// every 4th slot is skipped
if slot%4 != 3 {
c.BlockHeight++
// only add some transactions if a block was produced
c.TransactionCount += rand.Intn(10)
// assume both other validators voted
for i := 1; i < 3; i++ {
otherValidatorIndex := (c.LeaderIndex + i) % nv
identity := identities[otherValidatorIndex]
info := c.ValidatorInfos[identity]
info.LastVote = c.Slot
c.ValidatorInfos[identity] = info
transactions := [][]string{
{"aaa", "bbb", "ccc"},
{"xxx", "yyy", "zzz"},
}
}
}
func (c *dynamicRPCClient) UpdateVersion(version string) {
c.Version = version
}
func (c *dynamicRPCClient) UpdateStake(validator string, amount int) {
info := c.ValidatorInfos[validator]
info.Stake = amount
c.ValidatorInfos[validator] = info
}
func (c *dynamicRPCClient) UpdateCommission(validator string, newCommission int) {
info := c.ValidatorInfos[validator]
info.Commission = newCommission
c.ValidatorInfos[validator] = info
}
func (c *dynamicRPCClient) UpdateDelinquency(validator string, newDelinquent bool) {
info := c.ValidatorInfos[validator]
info.Delinquent = newDelinquent
c.ValidatorInfos[validator] = info
}
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetEpochInfo(ctx context.Context, commitment rpc.Commitment) (*rpc.EpochInfo, error) {
return &rpc.EpochInfo{
AbsoluteSlot: int64(c.Slot),
BlockHeight: int64(c.BlockHeight),
Epoch: int64(c.Epoch),
SlotIndex: int64(c.Slot % c.EpochSize),
SlotsInEpoch: int64(c.EpochSize),
TransactionCount: int64(c.TransactionCount),
}, nil
}
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetSlot(ctx context.Context, commitment rpc.Commitment) (int64, error) {
return int64(c.Slot), nil
}
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetVersion(ctx context.Context) (string, error) {
return c.Version, nil
}
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetVoteAccounts(
ctx context.Context, commitment rpc.Commitment, votePubkey *string,
) (*rpc.VoteAccounts, error) {
var currentVoteAccounts, delinquentVoteAccounts []rpc.VoteAccount
for identity, vote := range identityVotes {
info := c.ValidatorInfos[identity]
voteAccount := rpc.VoteAccount{
ActivatedStake: int64(info.Stake),
Commission: info.Commission,
EpochCredits: [][]int{},
EpochVoteAccount: true,
LastVote: info.LastVote,
NodePubkey: identity,
RootSlot: 0,
VotePubkey: vote,
}
if info.Delinquent {
delinquentVoteAccounts = append(delinquentVoteAccounts, voteAccount)
} else {
currentVoteAccounts = append(currentVoteAccounts, voteAccount)
// assume all validators voted
for _, nodekey := range nodekeys {
transactions = append(transactions, voteTx(nodekey))
info := c.Server.GetValidatorInfo(nodekey)
info.LastVote = slot
c.Server.SetOpt(rpc.ValidatorInfoOpt, nodekey, info)
}
c.TransactionCount += len(transactions)
block = &rpc.MockBlockInfo{Fee: 100, Transactions: transactions}
}
return &rpc.VoteAccounts{Current: currentVoteAccounts, Delinquent: delinquentVoteAccounts}, nil
}
// add slot info:
c.Server.SetOpt(rpc.SlotInfosOpt, slot, rpc.MockSlotInfo{Leader: leader, Block: block})
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetBlockProduction(
ctx context.Context, commitment rpc.Commitment, identity *string, firstSlot *int64, lastSlot *int64,
) (*rpc.BlockProduction, error) {
byIdentity := make(map[string]rpc.HostProduction)
for _, identity := range identities {
byIdentity[identity] = rpc.HostProduction{LeaderSlots: 0, BlocksProduced: 0}
}
for i := *firstSlot; i <= *lastSlot; i++ {
info := c.SlotInfos[int(i)]
production := byIdentity[info.leader]
production.LeaderSlots++
if info.blockProduced {
production.BlocksProduced++
}
byIdentity[info.leader] = production
}
blockProduction := rpc.BlockProduction{
ByIdentity: byIdentity, Range: rpc.BlockProductionRange{FirstSlot: *firstSlot, LastSlot: *lastSlot},
}
return &blockProduction, nil
}
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetBalance(ctx context.Context, client rpc.Commitment, address string) (float64, error) {
return balances[address], nil
}
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetInflationReward(
ctx context.Context, commitment rpc.Commitment, addresses []string, epoch *int64, minContextSlot *int64,
) ([]rpc.InflationReward, error) {
return staticInflationRewards, nil
}
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetLeaderSchedule(
ctx context.Context, commitment rpc.Commitment, slot int64,
) (map[string][]int64, error) {
return nil, nil
}
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetBlock(
ctx context.Context, commitment rpc.Commitment, slot int64, transactionDetails string,
) (*rpc.Block, error) {
return nil, nil
}
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetHealth(ctx context.Context) (string, error) {
return "ok", nil
}
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetFirstAvailableBlock(ctx context.Context) (int64, error) {
return 33, nil
}
//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetMinimumLedgerSlot(ctx context.Context) (int64, error) {
return 23, nil
// now update the server:
c.Epoch = int(math.Floor(float64(slot) / float64(c.EpochSize)))
c.Server.SetOpt(
rpc.EasyResultsOpt,
"getSlot",
slot,
)
c.Server.SetOpt(
rpc.EasyResultsOpt,
"getEpochInfo",
map[string]int{
"absoluteSlot": slot,
"blockHeight": c.BlockHeight,
"epoch": c.Epoch,
"slotIndex": slot % c.EpochSize,
"slotsInEpoch": c.EpochSize,
"transactionCount": c.TransactionCount,
},
)
c.Server.SetOpt(
rpc.EasyResultsOpt,
"minimumLedgerSlot",
int(math.Max(0, float64(slot-c.EpochSize))),
)
c.Server.SetOpt(
rpc.EasyResultsOpt,
"getFirstAvailableBlock",
int(math.Max(0, float64(slot-c.EpochSize))),
)
}
/*
@ -446,7 +260,7 @@ func newTestConfig(fast bool) *ExporterConfig {
time.Second * time.Duration(1),
"http://localhost:8899",
":8080",
identities,
nodekeys,
votekeys,
nil,
true,
@ -457,55 +271,18 @@ func newTestConfig(fast bool) *ExporterConfig {
return &config
}
func TestSolanaCollector_Collect_Static(t *testing.T) {
collector := NewSolanaCollector(&staticRPCClient{}, newTestConfig(false))
prometheus.NewPedanticRegistry().MustRegister(collector)
testCases := []collectionTest{
collector.ValidatorActiveStake.makeCollectionTest(abcValues(49, 42, 43)...),
collector.ValidatorLastVote.makeCollectionTest(abcValues(92, 147, 148)...),
collector.ValidatorRootSlot.makeCollectionTest(abcValues(3, 18, 19)...),
collector.ValidatorDelinquent.makeCollectionTest(abcValues(1, 0, 0)...),
{Name: "solana_account_balance", ExpectedResponse: balanceMetricResponse},
collector.NodeVersion.makeCollectionTest(NewLV(1, "1.16.7")),
collector.NodeIsHealthy.makeCollectionTest(NewLV(1)),
collector.NodeNumSlotsBehind.makeCollectionTest(NewLV(0)),
}
runCollectionTests(t, collector, testCases)
}
func TestSolanaCollector_Collect_Dynamic(t *testing.T) {
client := newDynamicRPCClient()
func TestSolanaCollector(t *testing.T) {
_, client := NewDynamicRpcClient(t, 35)
collector := NewSolanaCollector(client, newTestConfig(false))
prometheus.NewPedanticRegistry().MustRegister(collector)
// start off by testing initial state:
testCases := []collectionTest{
collector.ValidatorActiveStake.makeCollectionTest(abcValues(1_000_000, 1_000_000, 1_000_000)...),
collector.ValidatorLastVote.makeCollectionTest(abcValues(34, 34, 34)...),
collector.ValidatorRootSlot.makeCollectionTest(abcValues(0, 0, 0)...),
collector.ValidatorDelinquent.makeCollectionTest(abcValues(0, 0, 0)...),
{Name: "solana_account_balance", ExpectedResponse: balanceMetricResponse},
collector.NodeVersion.makeCollectionTest(NewLV(1, "v1.0.0")),
{Name: "solana_account_balance", ExpectedResponse: balanceMetricResponse},
collector.NodeIsHealthy.makeCollectionTest(NewLV(1)),
collector.NodeNumSlotsBehind.makeCollectionTest(NewLV(0)),
}
runCollectionTests(t, collector, testCases)
// now make some changes:
client.UpdateStake("aaa", 2_000_000)
client.UpdateStake("bbb", 500_000)
client.UpdateDelinquency("ccc", true)
client.UpdateVersion("v1.2.3")
// now test the final state
testCases = []collectionTest{
collector.ValidatorActiveStake.makeCollectionTest(abcValues(2_000_000, 500_000, 1_000_000)...),
collector.ValidatorRootSlot.makeCollectionTest(abcValues(0, 0, 0)...),
collector.ValidatorDelinquent.makeCollectionTest(abcValues(0, 0, 1)...),
collector.NodeVersion.makeCollectionTest(NewLV(1, "v1.2.3")),
{Name: "solana_account_balance", ExpectedResponse: balanceMetricResponse},
collector.NodeIsHealthy.makeCollectionTest(NewLV(1)),
collector.NodeNumSlotsBehind.makeCollectionTest(NewLV(0)),
}

View File

@ -0,0 +1,40 @@
package main
import (
"context"
"github.com/asymmetric-research/solana_exporter/pkg/rpc"
"github.com/asymmetric-research/solana_exporter/pkg/slog"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
)
func main() {
slog.Init()
logger := slog.Get()
ctx := context.Background()
config, err := NewExporterConfigFromCLI(ctx)
if err != nil {
logger.Fatal(err)
}
if config.ComprehensiveSlotTracking {
logger.Warn(
"Comprehensive slot tracking will lead to potentially thousands of new " +
"Prometheus metrics being created every epoch.",
)
}
client := rpc.NewRPCClient(config.RpcUrl, config.HttpTimeout)
collector := NewSolanaCollector(client, config)
slotWatcher := NewSlotWatcher(client, config)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go slotWatcher.WatchSlots(ctx)
prometheus.MustRegister(collector)
http.Handle("/metrics", promhttp.Handler())
logger.Infof("listening on %s", config.ListenAddress)
logger.Fatal(http.ListenAndServe(config.ListenAddress, nil))
}

View File

@ -0,0 +1,13 @@
package main
import (
"github.com/asymmetric-research/solana_exporter/pkg/slog"
"os"
"testing"
)
func TestMain(m *testing.M) {
slog.Init()
code := m.Run()
os.Exit(code)
}

View File

@ -15,7 +15,7 @@ import (
)
type SlotWatcher struct {
client rpc.Provider
client *rpc.Client
logger *zap.SugaredLogger
config *ExporterConfig
@ -45,7 +45,7 @@ type SlotWatcher struct {
BlockHeightMetric prometheus.Gauge
}
func NewSlotWatcher(client rpc.Provider, config *ExporterConfig) *SlotWatcher {
func NewSlotWatcher(client *rpc.Client, config *ExporterConfig) *SlotWatcher {
logger := slog.Get()
watcher := SlotWatcher{
client: client,
@ -303,7 +303,7 @@ func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot i
}
// fetch block production:
blockProduction, err := c.client.GetBlockProduction(ctx, rpc.CommitmentFinalized, nil, &startSlot, &endSlot)
blockProduction, err := c.client.GetBlockProduction(ctx, rpc.CommitmentFinalized, startSlot, endSlot)
if err != nil {
c.logger.Errorf("Failed to get block production, bailing out: %v", err)
return
@ -341,16 +341,16 @@ func (c *SlotWatcher) fetchAndEmitBlockInfos(ctx context.Context, endSlot int64)
c.logger.Fatalf("invalid slot range: %v", err)
}
scheduleToFetch := SelectFromSchedule(c.leaderSchedule, startSlot, endSlot)
for identity, leaderSlots := range scheduleToFetch {
for nodekey, leaderSlots := range scheduleToFetch {
if len(leaderSlots) == 0 {
continue
}
c.logger.Infof("Fetching fee rewards for %v in [%v -> %v]: %v ...", identity, startSlot, endSlot, leaderSlots)
c.logger.Infof("Fetching fee rewards for %v in [%v -> %v]: %v ...", nodekey, startSlot, endSlot, leaderSlots)
for _, slot := range leaderSlots {
err := c.fetchAndEmitSingleBlockInfo(ctx, identity, c.currentEpoch, slot)
err := c.fetchAndEmitSingleBlockInfo(ctx, nodekey, c.currentEpoch, slot)
if err != nil {
c.logger.Errorf("Failed to fetch fee rewards for %v at %v: %v", identity, slot, err)
c.logger.Errorf("Failed to fetch fee rewards for %v at %v: %v", nodekey, slot, err)
}
}
}
@ -415,7 +415,7 @@ func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch in
return nil
}
c.logger.Infof("Fetching inflation reward for epoch %v ...", toString(epoch))
rewardInfos, err := c.client.GetInflationReward(ctx, rpc.CommitmentConfirmed, c.config.VoteKeys, &epoch, nil)
rewardInfos, err := c.client.GetInflationReward(ctx, rpc.CommitmentConfirmed, c.config.VoteKeys, epoch)
if err != nil {
return fmt.Errorf("error fetching inflation rewards: %w", err)
}

View File

@ -29,37 +29,6 @@ func getSlotMetricValues(watcher *SlotWatcher) slotMetricValues {
}
}
func testBlockProductionMetric(
t *testing.T,
watcher *SlotWatcher,
metric *prometheus.CounterVec,
host string,
status string,
) {
hostInfo := staticBlockProduction.ByIdentity[host]
// get expected value depending on status:
var expectedValue float64
switch status {
case "valid":
expectedValue = float64(hostInfo.BlocksProduced)
case "skipped":
expectedValue = float64(hostInfo.LeaderSlots - hostInfo.BlocksProduced)
}
// get labels (leaderSlotsByEpoch requires an extra one)
labels := []string{status, host}
if metric == watcher.LeaderSlotsByEpochMetric {
labels = append(labels, fmt.Sprintf("%d", staticEpochInfo.Epoch))
}
// now we can do the assertion:
assert.Equalf(
t,
expectedValue,
testutil.ToFloat64(metric.WithLabelValues(labels...)),
"wrong value for block-production metric with labels: %s",
labels,
)
}
func assertSlotMetricsChangeCorrectly(t *testing.T, initial slotMetricValues, final slotMetricValues) {
// make sure that things have increased
assert.Greaterf(
@ -88,47 +57,54 @@ func assertSlotMetricsChangeCorrectly(t *testing.T, initial slotMetricValues, fi
)
}
func TestSolanaCollector_WatchSlots_Static(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func TestSlotWatcher_WatchSlots_Static(t *testing.T) {
ctx := context.Background()
config := newTestConfig(true)
collector := NewSolanaCollector(&staticRPCClient{}, config)
watcher := NewSlotWatcher(&staticRPCClient{}, config)
_, client := NewDynamicRpcClient(t, 35)
watcher := NewSlotWatcher(client, config)
// reset metrics before running tests:
watcher.LeaderSlotsMetric.Reset()
watcher.LeaderSlotsByEpochMetric.Reset()
prometheus.NewPedanticRegistry().MustRegister(collector)
go watcher.WatchSlots(ctx)
// make sure inflation rewards are collected:
err := watcher.fetchAndEmitInflationRewards(ctx, staticEpochInfo.Epoch)
epochInfo, err := client.GetEpochInfo(ctx, rpc.CommitmentFinalized)
assert.NoError(t, err)
err = watcher.fetchAndEmitInflationRewards(ctx, epochInfo.Epoch)
assert.NoError(t, err)
time.Sleep(1 * time.Second)
firstSlot, lastSlot := GetEpochBounds(&staticEpochInfo)
type testCase struct {
expectedValue float64
metric prometheus.Gauge
}
// epoch info tests:
firstSlot, lastSlot := GetEpochBounds(epochInfo)
tests := []testCase{
{expectedValue: float64(staticEpochInfo.AbsoluteSlot), metric: watcher.SlotHeightMetric},
{expectedValue: float64(staticEpochInfo.TransactionCount), metric: watcher.TotalTransactionsMetric},
{expectedValue: float64(staticEpochInfo.Epoch), metric: watcher.EpochNumberMetric},
{expectedValue: float64(epochInfo.AbsoluteSlot), metric: watcher.SlotHeightMetric},
{expectedValue: float64(epochInfo.TransactionCount), metric: watcher.TotalTransactionsMetric},
{expectedValue: float64(epochInfo.Epoch), metric: watcher.EpochNumberMetric},
{expectedValue: float64(firstSlot), metric: watcher.EpochFirstSlotMetric},
{expectedValue: float64(lastSlot), metric: watcher.EpochLastSlotMetric},
}
// add inflation reward tests:
for i, rewardInfo := range staticInflationRewards {
epoch := fmt.Sprintf("%v", staticEpochInfo.Epoch)
test := testCase{
expectedValue: float64(rewardInfo.Amount) / float64(rpc.LamportsInSol),
metric: watcher.InflationRewardsMetric.WithLabelValues(votekeys[i], epoch),
}
tests = append(tests, test)
inflationRewards, err := client.GetInflationReward(ctx, rpc.CommitmentFinalized, votekeys, 2)
assert.NoError(t, err)
for i, rewardInfo := range inflationRewards {
epoch := fmt.Sprintf("%v", epochInfo.Epoch)
tests = append(
tests,
testCase{
expectedValue: float64(rewardInfo.Amount) / float64(rpc.LamportsInSol),
metric: watcher.InflationRewardsMetric.WithLabelValues(votekeys[i], epoch),
},
)
}
for _, testCase := range tests {
@ -137,30 +113,11 @@ func TestSolanaCollector_WatchSlots_Static(t *testing.T) {
assert.Equal(t, testCase.expectedValue, testutil.ToFloat64(testCase.metric))
})
}
metrics := map[string]*prometheus.CounterVec{
"solana_leader_slots_total": watcher.LeaderSlotsMetric,
"solana_leader_slots_by_epoch": watcher.LeaderSlotsByEpochMetric,
}
statuses := []string{"valid", "skipped"}
for name, metric := range metrics {
// subtest for each metric:
t.Run(name, func(t *testing.T) {
for _, status := range statuses {
// sub subtest for each status (as each one requires a different calc)
t.Run(status, func(t *testing.T) {
for _, identity := range identities {
testBlockProductionMetric(t, watcher, metric, identity, status)
}
})
}
})
}
}
func TestSolanaCollector_WatchSlots_Dynamic(t *testing.T) {
func TestSlotWatcher_WatchSlots_Dynamic(t *testing.T) {
// create clients:
client := newDynamicRPCClient()
server, client := NewDynamicRpcClient(t, 35)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
config := newTestConfig(true)
@ -173,7 +130,7 @@ func TestSolanaCollector_WatchSlots_Dynamic(t *testing.T) {
// start client/collector and wait a bit:
go client.Run(ctx)
go server.Run(ctx)
time.Sleep(time.Second)
go watcher.WatchSlots(ctx)
@ -195,26 +152,26 @@ func TestSolanaCollector_WatchSlots_Dynamic(t *testing.T) {
assert.LessOrEqualf(
t,
int(final.SlotHeight),
client.Slot,
server.Slot,
"Exporter slot (%v) ahead of client slot (%v)!",
int(final.SlotHeight),
client.Slot,
server.Slot,
)
assert.LessOrEqualf(
t,
int(final.TotalTransactions),
client.TransactionCount,
server.TransactionCount,
"Exporter transaction count (%v) ahead of client transaction count (%v)!",
int(final.TotalTransactions),
client.TransactionCount,
server.TransactionCount,
)
assert.LessOrEqualf(
t,
int(final.EpochNumber),
client.Epoch,
server.Epoch,
"Exporter epoch (%v) ahead of client epoch (%v)!",
int(final.EpochNumber),
client.Epoch,
server.Epoch,
)
// check if epoch changed

View File

@ -42,7 +42,7 @@ func SelectFromSchedule(schedule map[string][]int64, startSlot, endSlot int64) m
// GetTrimmedLeaderSchedule fetches the leader schedule, but only for the validators we are interested in.
// Additionally, it adjusts the leader schedule to the current epoch offset.
func GetTrimmedLeaderSchedule(
ctx context.Context, client rpc.Provider, identities []string, slot, epochFirstSlot int64,
ctx context.Context, client *rpc.Client, identities []string, slot, epochFirstSlot int64,
) (map[string][]int64, error) {
logger := slog.Get()
leaderSchedule, err := client.GetLeaderSchedule(ctx, rpc.CommitmentConfirmed, slot)
@ -69,9 +69,9 @@ func GetTrimmedLeaderSchedule(
// GetAssociatedVoteAccounts returns the votekeys associated with a given list of nodekeys
func GetAssociatedVoteAccounts(
ctx context.Context, client rpc.Provider, commitment rpc.Commitment, nodekeys []string,
ctx context.Context, client *rpc.Client, commitment rpc.Commitment, nodekeys []string,
) ([]string, error) {
voteAccounts, err := client.GetVoteAccounts(ctx, commitment, nil)
voteAccounts, err := client.GetVoteAccounts(ctx, commitment)
if err != nil {
return nil, err
}
@ -94,7 +94,7 @@ func GetAssociatedVoteAccounts(
}
// FetchBalances fetches SOL balances for a list of addresses
func FetchBalances(ctx context.Context, client rpc.Provider, addresses []string) (map[string]float64, error) {
func FetchBalances(ctx context.Context, client *rpc.Client, addresses []string) (map[string]float64, error) {
balances := make(map[string]float64)
for _, address := range addresses {
balance, err := client.GetBalance(ctx, rpc.CommitmentConfirmed, address)

View File

@ -7,8 +7,24 @@ import (
"testing"
)
var (
rawLeaderSchedule = map[string]any{
"aaa": []int{0, 3, 6, 9, 12},
"bbb": []int{1, 4, 7, 10, 13},
"ccc": []int{2, 5, 8, 11, 14},
}
)
func TestSelectFromSchedule(t *testing.T) {
selected := SelectFromSchedule(staticLeaderSchedule, 5, 10)
selected := SelectFromSchedule(
map[string][]int64{
"aaa": {0, 3, 6, 9, 12},
"bbb": {1, 4, 7, 10, 13},
"ccc": {2, 5, 8, 11, 14},
},
5,
10,
)
assert.Equal(t,
map[string][]int64{"aaa": {6, 9}, "bbb": {7, 10}, "ccc": {5, 8}},
selected,
@ -16,11 +32,14 @@ func TestSelectFromSchedule(t *testing.T) {
}
func TestGetTrimmedLeaderSchedule(t *testing.T) {
_, client := rpc.NewMockClient(t,
map[string]any{"getLeaderSchedule": rawLeaderSchedule}, nil, nil, nil, nil,
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
schedule, err := GetTrimmedLeaderSchedule(ctx, &staticRPCClient{}, []string{"aaa", "bbb"}, 10, 10)
assert.NoError(t, err)
schedule, err := GetTrimmedLeaderSchedule(ctx, client, []string{"aaa", "bbb"}, 10, 10)
assert.NoError(t, err)
assert.Equal(t, map[string][]int64{"aaa": {10, 13, 16, 19, 22}, "bbb": {11, 14, 17, 20, 23}}, schedule)
}
@ -38,21 +57,29 @@ func TestCombineUnique(t *testing.T) {
}
func TestFetchBalances(t *testing.T) {
_, client := rpc.NewMockClient(t, nil, rawBalances, nil, nil, nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := staticRPCClient{}
fetchedBalances, err := FetchBalances(ctx, &client, CombineUnique(identities, votekeys))
fetchedBalances, err := FetchBalances(ctx, client, CombineUnique(nodekeys, votekeys))
assert.NoError(t, err)
assert.Equal(t, balances, fetchedBalances)
}
func TestGetAssociatedVoteAccounts(t *testing.T) {
_, client := rpc.NewMockClient(t,
map[string]any{"getVoteAccounts": rawVoteAccounts},
nil,
nil,
nil,
nil,
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := staticRPCClient{}
voteAccounts, err := GetAssociatedVoteAccounts(ctx, &client, rpc.CommitmentFinalized, identities)
voteAccounts, err := GetAssociatedVoteAccounts(ctx, client, rpc.CommitmentFinalized, nodekeys)
assert.NoError(t, err)
assert.Equal(t, votekeys, voteAccounts)
}

View File

@ -31,64 +31,6 @@ type (
Commitment string
)
// Provider is an interface that defines the methods required to interact with the Solana blockchain.
// It provides methods to retrieve block production information, epoch info, slot info, vote accounts, and node version.
type Provider interface {
// GetBlockProduction returns recent block production information from the current or previous epoch.
// See API docs: https://solana.com/docs/rpc/http/getblockproduction
GetBlockProduction(
ctx context.Context, commitment Commitment, identity *string, firstSlot *int64, lastSlot *int64,
) (*BlockProduction, error)
// GetEpochInfo returns information about the current epoch.
// See API docs: https://solana.com/docs/rpc/http/getepochinfo
GetEpochInfo(ctx context.Context, commitment Commitment) (*EpochInfo, error)
// GetSlot returns the slot that has reached the given or default commitment level.
// See API docs: https://solana.com/docs/rpc/http/getslot
GetSlot(ctx context.Context, commitment Commitment) (int64, error)
// GetVoteAccounts returns the account info and associated stake for all the voting accounts in the current bank.
// See API docs: https://solana.com/docs/rpc/http/getvoteaccounts
GetVoteAccounts(ctx context.Context, commitment Commitment, votePubkey *string) (*VoteAccounts, error)
// GetVersion returns the current Solana version running on the node.
// See API docs: https://solana.com/docs/rpc/http/getversion
GetVersion(ctx context.Context) (string, error)
// GetBalance returns the lamport balance of the account of provided pubkey.
// See API docs:https://solana.com/docs/rpc/http/getbalance
GetBalance(ctx context.Context, commitment Commitment, address string) (float64, error)
// GetInflationReward returns the inflation / staking reward for a list of addresses for an epoch.
// See API docs: https://solana.com/docs/rpc/http/getinflationreward
GetInflationReward(
ctx context.Context, commitment Commitment, addresses []string, epoch *int64, minContextSlot *int64,
) ([]InflationReward, error)
// GetLeaderSchedule returns the leader schedule for an epoch.
// See API docs: https://solana.com/docs/rpc/http/getleaderschedule
GetLeaderSchedule(ctx context.Context, commitment Commitment, slot int64) (map[string][]int64, error)
// GetBlock returns identity and transaction information about a confirmed block in the ledger.
// See API docs: https://solana.com/docs/rpc/http/getblock
GetBlock(ctx context.Context, commitment Commitment, slot int64, transactionDetails string) (*Block, error)
// GetHealth returns the current health of the node. A healthy node is one that is within a blockchain-configured slots
// of the latest cluster confirmed slot.
// See API docs: https://solana.com/docs/rpc/http/gethealth
GetHealth(ctx context.Context) (string, error)
// GetMinimumLedgerSlot returns the lowest slot that the node has information about in its ledger.
// See API docs: https://solana.com/docs/rpc/http/minimumledgerslot
GetMinimumLedgerSlot(ctx context.Context) (int64, error)
// GetFirstAvailableBlock returns the slot of the lowest confirmed block that has not been purged from the ledger
// See API docs: https://solana.com/docs/rpc/http/getfirstavailableblock
GetFirstAvailableBlock(ctx context.Context) (int64, error)
}
func (c Commitment) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]string{"commitment": string(c)})
}
@ -134,7 +76,7 @@ func getResponse[T any](
resp, err := client.HttpClient.Do(req)
if err != nil {
return fmt.Errorf("%s RPC call failed: %w", method, err)
return fmt.Errorf("%s rpc call failed: %w", method, err)
}
//goland:noinspection GoUnhandledErrorResult
defer resp.Body.Close()
@ -171,14 +113,9 @@ func (c *Client) GetEpochInfo(ctx context.Context, commitment Commitment) (*Epoc
// GetVoteAccounts returns the account info and associated stake for all the voting accounts in the current bank.
// See API docs: https://solana.com/docs/rpc/http/getvoteaccounts
func (c *Client) GetVoteAccounts(
ctx context.Context, commitment Commitment, votePubkey *string,
) (*VoteAccounts, error) {
func (c *Client) GetVoteAccounts(ctx context.Context, commitment Commitment) (*VoteAccounts, error) {
// format params:
config := map[string]string{"commitment": string(commitment)}
if votePubkey != nil {
config["votePubkey"] = *votePubkey
}
var resp Response[VoteAccounts]
if err := getResponse(ctx, c, "getVoteAccounts", []any{config}, &resp); err != nil {
@ -213,29 +150,12 @@ func (c *Client) GetSlot(ctx context.Context, commitment Commitment) (int64, err
// GetBlockProduction returns recent block production information from the current or previous epoch.
// See API docs: https://solana.com/docs/rpc/http/getblockproduction
func (c *Client) GetBlockProduction(
ctx context.Context, commitment Commitment, identity *string, firstSlot *int64, lastSlot *int64,
ctx context.Context, commitment Commitment, firstSlot int64, lastSlot int64,
) (*BlockProduction, error) {
// can't provide a last slot without a first:
if firstSlot == nil && lastSlot != nil {
c.logger.Fatalf("can't provide a last slot without a first!")
}
// format params:
config := map[string]any{"commitment": string(commitment)}
if identity != nil {
config["identity"] = *identity
}
if firstSlot != nil {
blockRange := map[string]int64{"firstSlot": *firstSlot}
if lastSlot != nil {
// make sure first and last slot are in order:
if *firstSlot > *lastSlot {
err := fmt.Errorf("last slot %v is greater than first slot %v", *lastSlot, *firstSlot)
c.logger.Fatalf("%v", err)
}
blockRange["lastSlot"] = *lastSlot
}
config["range"] = blockRange
config := map[string]any{
"commitment": string(commitment),
"range": map[string]int64{"firstSlot": firstSlot, "lastSlot": lastSlot},
}
// make request:
@ -260,16 +180,10 @@ func (c *Client) GetBalance(ctx context.Context, commitment Commitment, address
// GetInflationReward returns the inflation / staking reward for a list of addresses for an epoch.
// See API docs: https://solana.com/docs/rpc/http/getinflationreward
func (c *Client) GetInflationReward(
ctx context.Context, commitment Commitment, addresses []string, epoch *int64, minContextSlot *int64,
ctx context.Context, commitment Commitment, addresses []string, epoch int64,
) ([]InflationReward, error) {
// format params:
config := map[string]any{"commitment": string(commitment)}
if epoch != nil {
config["epoch"] = *epoch
}
if minContextSlot != nil {
config["minContextSlot"] = *minContextSlot
}
config := map[string]any{"commitment": string(commitment), "epoch": epoch}
var resp Response[[]InflationReward]
if err := getResponse(ctx, c, "getInflationReward", []any{addresses, config}, &resp); err != nil {
@ -294,7 +208,7 @@ func (c *Client) GetLeaderSchedule(ctx context.Context, commitment Commitment, s
func (c *Client) GetBlock(
ctx context.Context, commitment Commitment, slot int64, transactionDetails string,
) (*Block, error) {
detailsOptions := []string{"full", "accounts", "none"}
detailsOptions := []string{"full", "none"}
if !slices.Contains(detailsOptions, transactionDetails) {
c.logger.Fatalf(
"%s is not a valid transaction-details option, must be one of %v", transactionDetails, detailsOptions,

View File

@ -7,7 +7,7 @@ import (
)
func newMethodTester(t *testing.T, method string, result any) (*MockServer, *Client) {
return NewTestClient(t, map[string]any{method: result})
return NewMockClient(t, map[string]any{method: result}, nil, nil, nil, nil)
}
func TestClient_GetBalance(t *testing.T) {
@ -23,6 +23,38 @@ func TestClient_GetBalance(t *testing.T) {
assert.Equal(t, float64(5), balance)
}
func TestClient_GetBlock(t *testing.T) {
_, client := newMethodTester(t,
"getBlock",
map[string]any{
"rewards": []map[string]any{
{"pubkey": "aaa", "lamports": 10, "rewardType": "fee"},
},
"transactions": []map[string]map[string]map[string][]string{
{"transaction": {"message": {"accountKeys": {"aaa", "bbb", "ccc"}}}},
},
},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
block, err := client.GetBlock(ctx, CommitmentFinalized, 0, "full")
assert.NoError(t, err)
assert.Equal(t,
&Block{
Rewards: []BlockReward{
{Pubkey: "aaa", Lamports: 10, RewardType: "fee"},
},
// note the test will fail if we don't type it exactly like this:
Transactions: []map[string]any{
{"transaction": map[string]any{"message": map[string]any{"accountKeys": []any{"aaa", "bbb", "ccc"}}}},
},
},
block,
)
}
func TestClient_GetBlockProduction(t *testing.T) {
_, client := newMethodTester(t,
"getBlockProduction",
@ -44,17 +76,12 @@ func TestClient_GetBlockProduction(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
blockProduction, err := client.GetBlockProduction(ctx, CommitmentFinalized, nil, nil, nil)
blockProduction, err := client.GetBlockProduction(ctx, CommitmentFinalized, 0, 0)
assert.NoError(t, err)
assert.Equal(t,
&BlockProduction{
ByIdentity: map[string]HostProduction{
"85iYT5RuzRTDgjyRa3cP8SYhM2j21fj7NhfJ3peu1DPr": {9888, 9886},
},
Range: BlockProductionRange{
FirstSlot: 0,
LastSlot: 9887,
},
ByIdentity: map[string]HostProduction{"85iYT5RuzRTDgjyRa3cP8SYhM2j21fj7NhfJ3peu1DPr": {9888, 9886}},
Range: BlockProductionRange{FirstSlot: 0, LastSlot: 9887},
},
blockProduction,
)
@ -125,17 +152,10 @@ func TestClient_GetInflationReward(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
inflationReward, err := client.GetInflationReward(ctx, CommitmentFinalized, nil, nil, nil)
inflationReward, err := client.GetInflationReward(ctx, CommitmentFinalized, nil, 2)
assert.NoError(t, err)
assert.Equal(t,
[]InflationReward{
{
Amount: 2_500,
EffectiveSlot: 224,
Epoch: 2,
PostBalance: 499_999_442_500,
},
},
[]InflationReward{{Amount: 2_500, Epoch: 2}},
inflationReward,
)
}
@ -207,19 +227,16 @@ func TestClient_GetVoteAccounts(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
voteAccounts, err := client.GetVoteAccounts(ctx, CommitmentFinalized, nil)
voteAccounts, err := client.GetVoteAccounts(ctx, CommitmentFinalized)
assert.NoError(t, err)
assert.Equal(t,
&VoteAccounts{
Current: []VoteAccount{
{
Commission: 0,
EpochVoteAccount: true,
EpochCredits: [][]int{{1, 64, 0}, {2, 192, 64}},
NodePubkey: "B97CCUW3AEZFGy6uUg6zUdnNYvnVq5VG8PUtb2HayTDD",
LastVote: 147,
ActivatedStake: 42,
VotePubkey: "3ZT31jkAGhUaw8jsy4bTknwBMP8i4Eueh52By4zXcsVw",
NodePubkey: "B97CCUW3AEZFGy6uUg6zUdnNYvnVq5VG8PUtb2HayTDD",
LastVote: 147,
ActivatedStake: 42,
VotePubkey: "3ZT31jkAGhUaw8jsy4bTknwBMP8i4Eueh52By4zXcsVw",
},
},
},

321
pkg/rpc/mock.go Normal file
View File

@ -0,0 +1,321 @@
package rpc
import (
"context"
"encoding/json"
"fmt"
"github.com/asymmetric-research/solana_exporter/pkg/slog"
"go.uber.org/zap"
"net"
"net/http"
"sync"
"testing"
"time"
)
type MockOpt int
const (
BalanceOpt MockOpt = iota
InflationRewardsOpt
EasyResultsOpt
SlotInfosOpt
ValidatorInfoOpt
)
type (
// MockServer represents a mock Solana RPC server for testing
MockServer struct {
server *http.Server
listener net.Listener
mu sync.RWMutex
logger *zap.SugaredLogger
balances map[string]int
inflationRewards map[string]int
easyResults map[string]any
slotInfos map[int]MockSlotInfo
validatorInfos map[string]MockValidatorInfo
}
MockBlockInfo struct {
Fee int
Transactions [][]string
}
MockSlotInfo struct {
Leader string
Block *MockBlockInfo
}
MockValidatorInfo struct {
Votekey string
Stake int
LastVote int
Delinquent bool
}
)
// NewMockServer creates a new mock server instance
func NewMockServer(
easyResults map[string]any,
balances map[string]int,
inflationRewards map[string]int,
slotInfos map[int]MockSlotInfo,
validatorInfos map[string]MockValidatorInfo,
) (*MockServer, error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, fmt.Errorf("failed to create listener: %v", err)
}
ms := &MockServer{
listener: listener,
logger: slog.Get(),
easyResults: easyResults,
balances: balances,
inflationRewards: inflationRewards,
slotInfos: slotInfos,
validatorInfos: validatorInfos,
}
mux := http.NewServeMux()
mux.HandleFunc("/", ms.handleRPCRequest)
ms.server = &http.Server{Handler: mux}
go func() {
_ = ms.server.Serve(listener)
}()
return ms, nil
}
// URL returns the URL of the mock server
func (s *MockServer) URL() string {
return fmt.Sprintf("http://%s", s.listener.Addr().String())
}
// Close shuts down the mock server
func (s *MockServer) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return s.server.Shutdown(ctx)
}
func (s *MockServer) MustClose() {
if err := s.Close(); err != nil {
panic(err)
}
}
func (s *MockServer) SetOpt(opt MockOpt, key any, value any) {
s.mu.Lock()
defer s.mu.Unlock()
switch opt {
case BalanceOpt:
if s.balances == nil {
s.balances = make(map[string]int)
}
s.balances[key.(string)] = value.(int)
case InflationRewardsOpt:
if s.inflationRewards == nil {
s.inflationRewards = make(map[string]int)
}
s.inflationRewards[key.(string)] = value.(int)
case EasyResultsOpt:
if s.easyResults == nil {
s.easyResults = make(map[string]any)
}
s.easyResults[key.(string)] = value
case SlotInfosOpt:
if s.slotInfos == nil {
s.slotInfos = make(map[int]MockSlotInfo)
}
s.slotInfos[key.(int)] = value.(MockSlotInfo)
case ValidatorInfoOpt:
if s.validatorInfos == nil {
s.validatorInfos = make(map[string]MockValidatorInfo)
}
s.validatorInfos[key.(string)] = value.(MockValidatorInfo)
}
}
func (s *MockServer) GetValidatorInfo(nodekey string) MockValidatorInfo {
s.mu.RLock()
defer s.mu.RUnlock()
return s.validatorInfos[nodekey]
}
func (s *MockServer) getResult(method string, params ...any) (any, *RPCError) {
s.mu.RLock()
defer s.mu.RUnlock()
if method == "getBalance" && s.balances != nil {
address := params[0].(string)
result := map[string]any{
"context": map[string]int{"slot": 1},
"value": s.balances[address],
}
return result, nil
}
if method == "getInflationReward" && s.inflationRewards != nil {
addresses := params[0].([]any)
config := params[1].(map[string]any)
epoch := int(config["epoch"].(float64))
rewards := make([]map[string]int, len(addresses))
for i, item := range addresses {
address := item.(string)
// TODO: make inflation rewards fetchable by epoch
rewards[i] = map[string]int{"amount": s.inflationRewards[address], "epoch": epoch}
}
return rewards, nil
}
if method == "getBlock" && s.slotInfos != nil {
// get params:
slot := int(params[0].(float64))
config := params[1].(map[string]any)
transactionDetails, rewardsIncluded := config["transactionDetails"].(string), config["rewards"].(bool)
slotInfo, ok := s.slotInfos[slot]
if !ok {
s.logger.Warnf("no slot info for slot %d", slot)
return nil, &RPCError{Code: BlockCleanedUpCode, Message: "Block cleaned up."}
}
if slotInfo.Block == nil {
return nil, &RPCError{Code: SlotSkippedCode, Message: "Slot skipped."}
}
var (
transactions []map[string]any
rewards []map[string]any
)
if transactionDetails == "full" {
for _, tx := range slotInfo.Block.Transactions {
transactions = append(
transactions,
map[string]any{
"transaction": map[string]map[string][]string{"message": {"accountKeys": tx}},
},
)
}
}
if rewardsIncluded {
rewards = append(
rewards,
map[string]any{"pubkey": slotInfo.Leader, "lamports": slotInfo.Block.Fee, "rewardType": "fee"},
)
}
return map[string]any{"rewards": rewards, "transactions": transactions}, nil
}
if method == "getBlockProduction" && s.slotInfos != nil {
// get params:
config := params[0].(map[string]any)
slotRange := config["range"].(map[string]any)
firstSlot, lastSlot := int(slotRange["firstSlot"].(float64)), int(slotRange["lastSlot"].(float64))
byIdentity := make(map[string][]int)
for nodekey := range s.validatorInfos {
byIdentity[nodekey] = []int{0, 0}
}
for i := firstSlot; i <= lastSlot; i++ {
info := s.slotInfos[i]
production := byIdentity[info.Leader]
production[0]++
if info.Block != nil {
production[1]++
}
byIdentity[info.Leader] = production
}
blockProduction := map[string]any{
"context": map[string]int{"slot": 1},
"value": map[string]any{"byIdentity": byIdentity, "range": slotRange},
}
return blockProduction, nil
}
if method == "getVoteAccounts" && s.validatorInfos != nil {
var currentVoteAccounts, delinquentVoteAccounts []map[string]any
for nodekey, info := range s.validatorInfos {
voteAccount := map[string]any{
"activatedStake": int64(info.Stake),
"lastVote": info.LastVote,
"nodePubkey": nodekey,
"rootSlot": 0,
"votePubkey": info.Votekey,
}
if info.Delinquent {
delinquentVoteAccounts = append(delinquentVoteAccounts, voteAccount)
} else {
currentVoteAccounts = append(currentVoteAccounts, voteAccount)
}
}
voteAccounts := map[string][]map[string]any{
"current": currentVoteAccounts,
"delinquent": delinquentVoteAccounts,
}
return voteAccounts, nil
}
// default is use easy results:
result, ok := s.easyResults[method]
if !ok {
return nil, &RPCError{Code: -32601, Message: "Method not found"}
}
return result, nil
}
func (s *MockServer) handleRPCRequest(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
return
}
var request Request
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
response := Response[any]{Jsonrpc: "2.0", Id: request.Id}
result, rpcErr := s.getResult(request.Method, request.Params...)
if rpcErr != nil {
response.Error = *rpcErr
} else {
response.Result = result
}
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
// NewMockClient creates a new test client with a running mock server
func NewMockClient(
t *testing.T,
easyResults map[string]any,
balances map[string]int,
inflationRewards map[string]int,
slotInfos map[int]MockSlotInfo,
validatorInfos map[string]MockValidatorInfo,
) (*MockServer, *Client) {
server, err := NewMockServer(easyResults, balances, inflationRewards, slotInfos, validatorInfos)
if err != nil {
t.Fatalf("failed to create mock server: %v", err)
}
t.Cleanup(func() {
if err := server.Close(); err != nil {
t.Errorf("failed to close mock server: %v", err)
}
})
client := NewRPCClient(server.URL(), time.Second)
return server, client
}

146
pkg/rpc/mock_test.go Normal file
View File

@ -0,0 +1,146 @@
package rpc
import (
"context"
"github.com/stretchr/testify/assert"
"testing"
)
func TestMockServer_getBalance(t *testing.T) {
_, client := NewMockClient(
t, nil, map[string]int{"aaa": 2 * LamportsInSol}, nil, nil, nil,
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
balance, err := client.GetBalance(ctx, CommitmentFinalized, "aaa")
assert.NoError(t, err)
assert.Equal(t, float64(2), balance)
}
func TestMockServer_getBlock(t *testing.T) {
_, client := NewMockClient(t,
nil,
nil,
nil,
map[int]MockSlotInfo{
1: {"aaa", &MockBlockInfo{Fee: 10, Transactions: [][]string{{"bbb"}}}},
2: {"bbb", &MockBlockInfo{Fee: 5, Transactions: [][]string{{"ccc", "ddd"}}}},
},
map[string]MockValidatorInfo{"aaa": {}, "bbb": {}},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
block, err := client.GetBlock(ctx, CommitmentFinalized, 1, "full")
assert.NoError(t, err)
assert.Equal(t,
Block{
Rewards: []BlockReward{{Pubkey: "aaa", Lamports: 10, RewardType: "fee"}},
Transactions: []map[string]any{
{"transaction": map[string]any{"message": map[string]any{"accountKeys": []any{"bbb"}}}},
},
},
*block,
)
block, err = client.GetBlock(ctx, CommitmentFinalized, 2, "none")
assert.NoError(t, err)
assert.Equal(t,
Block{
Rewards: []BlockReward{{Pubkey: "bbb", Lamports: 5, RewardType: "fee"}},
Transactions: nil,
},
*block,
)
}
func TestMockServer_getBlockProduction(t *testing.T) {
_, client := NewMockClient(
t,
nil,
nil,
nil,
map[int]MockSlotInfo{
1: {"aaa", &MockBlockInfo{}},
2: {"aaa", &MockBlockInfo{}},
3: {"aaa", &MockBlockInfo{}},
4: {"aaa", nil},
5: {"bbb", &MockBlockInfo{}},
6: {"bbb", nil},
7: {"bbb", &MockBlockInfo{}},
8: {"bbb", nil},
},
map[string]MockValidatorInfo{"aaa": {}, "bbb": {}},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
firstSlot, lastSlot := int64(1), int64(6)
blockProduction, err := client.GetBlockProduction(ctx, CommitmentFinalized, firstSlot, lastSlot)
assert.NoError(t, err)
assert.Equal(t,
BlockProduction{
ByIdentity: map[string]HostProduction{
"aaa": {4, 3},
"bbb": {2, 1},
},
Range: BlockProductionRange{FirstSlot: firstSlot, LastSlot: lastSlot},
},
*blockProduction,
)
}
func TestMockServer_getInflationReward(t *testing.T) {
_, client := NewMockClient(t,
nil,
nil,
map[string]int{"AAA": 2_500, "BBB": 2_501, "CCC": 2_502},
nil,
nil,
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rewards, err := client.GetInflationReward(ctx, CommitmentFinalized, []string{"AAA", "BBB"}, 2)
assert.NoError(t, err)
assert.Equal(t,
[]InflationReward{{Amount: 2_500, Epoch: 2}, {Amount: 2_501, Epoch: 2}},
rewards,
)
}
func TestMockServer_getVoteAccounts(t *testing.T) {
_, client := NewMockClient(t,
nil,
nil,
nil,
nil,
map[string]MockValidatorInfo{
"aaa": {"AAA", 1, 2, false},
"bbb": {"BBB", 3, 4, false},
"ccc": {"CCC", 5, 6, true},
},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
voteAccounts, err := client.GetVoteAccounts(ctx, CommitmentFinalized)
assert.NoError(t, err)
// TODO: this test sometimes (albeit rarely) fails because the ordering gets mixed up, fix!
assert.Equal(t,
VoteAccounts{
Current: []VoteAccount{
{1, 2, "aaa", 0, "AAA"},
{3, 4, "bbb", 0, "BBB"},
},
Delinquent: []VoteAccount{
{5, 6, "ccc", 0, "CCC"},
},
},
*voteAccounts,
)
}

View File

@ -29,29 +29,20 @@ type (
}
EpochInfo struct {
// Current absolute slot in epoch
AbsoluteSlot int64 `json:"absoluteSlot"`
// Current block height
BlockHeight int64 `json:"blockHeight"`
// Current epoch number
Epoch int64 `json:"epoch"`
// Current slot relative to the start of the current epoch
SlotIndex int64 `json:"slotIndex"`
// Number of slots in this epoch
SlotsInEpoch int64 `json:"slotsInEpoch"`
// Total number of transactions
AbsoluteSlot int64 `json:"absoluteSlot"`
BlockHeight int64 `json:"blockHeight"`
Epoch int64 `json:"epoch"`
SlotIndex int64 `json:"slotIndex"`
SlotsInEpoch int64 `json:"slotsInEpoch"`
TransactionCount int64 `json:"transactionCount"`
}
VoteAccount struct {
ActivatedStake int64 `json:"activatedStake"`
Commission int `json:"commission"`
EpochCredits [][]int `json:"epochCredits"`
EpochVoteAccount bool `json:"epochVoteAccount"`
LastVote int `json:"lastVote"`
NodePubkey string `json:"nodePubkey"`
RootSlot int `json:"rootSlot"`
VotePubkey string `json:"votePubkey"`
ActivatedStake int64 `json:"activatedStake"`
LastVote int `json:"lastVote"`
NodePubkey string `json:"nodePubkey"`
RootSlot int `json:"rootSlot"`
VotePubkey string `json:"votePubkey"`
}
VoteAccounts struct {
@ -75,28 +66,19 @@ type (
}
InflationReward struct {
Amount int64 `json:"amount"`
EffectiveSlot int64 `json:"effectiveSlot"`
Epoch int64 `json:"epoch"`
PostBalance int64 `json:"postBalance"`
Amount int64 `json:"amount"`
Epoch int64 `json:"epoch"`
}
Block struct {
BlockHeight int64 `json:"blockHeight"`
BlockTime int64 `json:"blockTime,omitempty"`
Blockhash string `json:"blockhash"`
ParentSlot int64 `json:"parentSlot"`
PreviousBlockhash string `json:"previousBlockhash"`
Rewards []BlockReward `json:"rewards"`
Transactions []map[string]any `json:"transactions"`
Rewards []BlockReward `json:"rewards"`
Transactions []map[string]any `json:"transactions"`
}
BlockReward struct {
Pubkey string `json:"pubkey"`
Lamports int64 `json:"lamports"`
PostBalance int64 `json:"postBalance"`
RewardType string `json:"rewardType"`
Commission uint8 `json:"commission"`
Pubkey string `json:"pubkey"`
Lamports int64 `json:"lamports"`
RewardType string `json:"rewardType"`
}
FullTransaction struct {

13
pkg/rpc/rpc_test.go Normal file
View File

@ -0,0 +1,13 @@
package rpc
import (
"github.com/asymmetric-research/solana_exporter/pkg/slog"
"os"
"testing"
)
func TestMain(m *testing.M) {
slog.Init()
code := m.Run()
os.Exit(code)
}

View File

@ -1,126 +0,0 @@
package rpc
import (
"context"
"encoding/json"
"fmt"
"github.com/asymmetric-research/solana_exporter/pkg/slog"
"go.uber.org/zap"
"net"
"net/http"
"sync"
"testing"
"time"
)
type (
// MockServer represents a mock Solana RPC server for testing
MockServer struct {
easyResults map[string]any
server *http.Server
listener net.Listener
mu sync.RWMutex
logger *zap.SugaredLogger
}
)
// NewMockServer creates a new mock server instance
func NewMockServer(easyResults map[string]any) (*MockServer, error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, fmt.Errorf("failed to create listener: %v", err)
}
ms := &MockServer{listener: listener, easyResults: easyResults, logger: slog.Get()}
mux := http.NewServeMux()
mux.HandleFunc("/", ms.handleRPCRequest)
ms.server = &http.Server{Handler: mux}
go func() {
_ = ms.server.Serve(listener)
}()
return ms, nil
}
// URL returns the URL of the mock server
func (s *MockServer) URL() string {
return fmt.Sprintf("http://%s", s.listener.Addr().String())
}
// Close shuts down the mock server
func (s *MockServer) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return s.server.Shutdown(ctx)
}
func (s *MockServer) MustClose() {
if err := s.Close(); err != nil {
panic(err)
}
}
// SetEasyResult sets a custom response for a specific method
func (s *MockServer) SetEasyResult(method string, result any) {
s.mu.Lock()
defer s.mu.Unlock()
s.easyResults[method] = result
}
func (s *MockServer) GetResult(method string) (any, *RPCError) {
s.mu.RLock()
defer s.mu.RUnlock()
result, ok := s.easyResults[method]
if !ok {
return nil, &RPCError{Code: -32601, Message: "Method not found"}
}
return result, nil
}
func (s *MockServer) handleRPCRequest(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
return
}
var request Request
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
response := Response[any]{Jsonrpc: "2.0", Id: request.Id}
result, rpcErr := s.GetResult(request.Method)
if rpcErr != nil {
response.Error = *rpcErr
} else {
response.Result = result
}
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
// NewTestClient creates a new test helper with a running mock server
func NewTestClient(t *testing.T, easyResponses map[string]any) (*MockServer, *Client) {
server, err := NewMockServer(easyResponses)
if err != nil {
t.Fatalf("Failed to create mock server: %v", err)
}
t.Cleanup(func() {
if err := server.Close(); err != nil {
t.Errorf("Failed to close mock server: %v", err)
}
})
client := NewRPCClient(server.URL(), time.Second)
return server, client
}

View File

@ -10,8 +10,8 @@ import (
var log *zap.SugaredLogger
// init initializes the logger
func init() {
// Init initializes the logger
func Init() {
config := zap.NewProductionConfig()
// configure:
@ -50,7 +50,7 @@ func getEnvLogLevel() zapcore.Level {
case "error":
return zapcore.ErrorLevel
default:
fmt.Println(fmt.Sprintf("Unrecognised 'LOG_LEVEL' environment variable '%s', using 'info'", level))
fmt.Printf("Unrecognised 'LOG_LEVEL' environment variable '%s', using 'info'\n", level)
return zapcore.InfoLevel
}
}