Add StakesStore to cache validator stakes (#2)

Needed for special treatment of top-stake nodes.
This commit is contained in:
Christian Kamm 2024-04-05 16:59:47 +02:00 committed by GitHub
parent 45a8ac79aa
commit fd988bfbd3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 116 additions and 17 deletions

View File

@ -9,11 +9,10 @@ use tokio::sync::RwLock;
use crate::{
stores::{
block_information_store::BlockInformationStore, cluster_info_store::ClusterInfo,
subscription_store::SubscriptionStore, tx_store::TxStore,
stakes_store::StakesStore, subscription_store::SubscriptionStore, tx_store::TxStore,
},
structures::{
epoch::{Epoch, EpochCache},
identity_stakes::IdentityStakes,
slot_notification::{AtomicSlot, SlotNotification},
transaction_sent_info::SentTransactionInfo,
},
@ -35,7 +34,7 @@ pub struct DataCache {
pub txs: TxStore,
pub tx_subs: SubscriptionStore,
pub slot_cache: SlotCache,
pub identity_stakes: IdentityStakes,
pub stakes_store: StakesStore,
pub cluster_info: ClusterInfo,
pub epoch_data: EpochCache,
pub leader_schedule: Arc<RwLock<CalculatedSchedule>>,
@ -84,7 +83,7 @@ impl DataCache {
block_time: 0,
}),
cluster_info: ClusterInfo::default(),
identity_stakes: IdentityStakes::new(Pubkey::new_unique()),
stakes_store: StakesStore::new(Pubkey::default()),
slot_cache: SlotCache::new(0),
tx_subs: SubscriptionStore::default(),
txs: TxStore {

View File

@ -3,5 +3,6 @@
pub mod block_information_store;
pub mod cluster_info_store;
pub mod data_cache;
pub mod stakes_store;
pub mod subscription_store;
pub mod tx_store;

View File

@ -0,0 +1,102 @@
use std::{collections::HashMap, str::FromStr, sync::Arc};
use crate::structures::identity_stakes::IdentityStakesData;
use log::error;
use solana_rpc_client_api::response::RpcVoteAccountStatus;
use solana_sdk::pubkey::{ParsePubkeyError, Pubkey};
use solana_streamer::nonblocking::quic::ConnectionPeerType;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Copy, Default)]
pub struct StakeSummary {
pub total_stakes: u64,
pub min_stakes: u64,
pub max_stakes: u64,
}
#[derive(Debug, Clone, Default)]
pub struct StakeData {
pub identity_to_stake: HashMap<Pubkey, u64>,
pub stakes_desc: Vec<(Pubkey, u64)>,
pub summary: StakeSummary,
}
#[derive(Debug, Clone)]
pub struct StakesStore {
own_identity: Pubkey,
data: Arc<RwLock<StakeData>>,
}
impl StakesStore {
pub fn new(identity: Pubkey) -> Self {
Self {
own_identity: identity,
data: Arc::new(RwLock::new(StakeData::default())),
}
}
pub async fn get_summary(&self) -> StakeSummary {
self.data.read().await.summary
}
/// Stake information for own_identity
pub async fn get_identity_stakes(&self) -> IdentityStakesData {
let read_lock = self.data.read().await;
let own_stake = read_lock.identity_to_stake.get(&self.own_identity);
let summary = &read_lock.summary;
own_stake
.map(|stake| IdentityStakesData {
peer_type: ConnectionPeerType::Staked,
stakes: *stake,
total_stakes: summary.total_stakes,
min_stakes: summary.min_stakes,
max_stakes: summary.max_stakes,
})
.unwrap_or_default()
}
pub async fn get_node_stake(&self, identity: &Pubkey) -> Option<u64> {
self.data
.read()
.await
.identity_to_stake
.get(identity)
.cloned()
}
pub async fn get_stake_per_node(&self) -> HashMap<Pubkey, u64> {
self.data.read().await.identity_to_stake.clone()
}
pub async fn get_all_stakes_desc(&self) -> Vec<(Pubkey, u64)> {
self.data.read().await.stakes_desc.clone()
}
pub async fn update_stakes(&self, vote_accounts: RpcVoteAccountStatus) {
let Ok(mut stakes_desc) = vote_accounts
.current
.iter()
.chain(vote_accounts.delinquent.iter())
.map(|va| Ok((Pubkey::from_str(&va.node_pubkey)?, va.activated_stake)))
.collect::<Result<Vec<(Pubkey, u64)>, ParsePubkeyError>>()
else {
error!("rpc vote account result contained bad pubkey");
return;
};
stakes_desc.sort_by_key(|(_pk, stake)| std::cmp::Reverse(*stake));
let id_to_stake: HashMap<Pubkey, u64> = stakes_desc.iter().copied().collect();
let summary = StakeSummary {
total_stakes: id_to_stake.values().sum(),
min_stakes: id_to_stake.values().min().copied().unwrap_or(0),
max_stakes: id_to_stake.values().max().copied().unwrap_or(0),
};
let mut write_lock = self.data.write().await;
write_lock.summary = summary;
write_lock.identity_to_stake = id_to_stake;
write_lock.stakes_desc = stakes_desc;
}
}

View File

@ -19,11 +19,12 @@ use solana_lite_rpc_core::{
block_information_store::{BlockInformation, BlockInformationStore},
cluster_info_store::ClusterInfo,
data_cache::{DataCache, SlotCache},
stakes_store::StakesStore,
subscription_store::SubscriptionStore,
tx_store::TxStore,
},
structures::{
epoch::EpochCache, identity_stakes::IdentityStakes, leaderschedule::CalculatedSchedule,
epoch::EpochCache, leaderschedule::CalculatedSchedule,
transaction_sent_info::SentTransactionInfo,
},
utils::wait_till_block_of_commitment_is_recieved,
@ -320,7 +321,7 @@ async fn main() -> anyhow::Result<()> {
let data_cache = DataCache {
block_information_store,
cluster_info: ClusterInfo::default(),
identity_stakes: IdentityStakes::new(validator_identity.pubkey()),
stakes_store: StakesStore::new(validator_identity.pubkey()),
slot_cache: SlotCache::new(finalize_slot),
tx_subs: SubscriptionStore::default(),
txs: TxStore {

View File

@ -35,14 +35,13 @@ use solana_lite_rpc_core::stores::{
block_information_store::{BlockInformation, BlockInformationStore},
cluster_info_store::ClusterInfo,
data_cache::{DataCache, SlotCache},
stakes_store::StakesStore,
subscription_store::SubscriptionStore,
tx_store::TxStore,
};
use solana_lite_rpc_core::structures::account_filter::AccountFilters;
use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule;
use solana_lite_rpc_core::structures::{
epoch::EpochCache, identity_stakes::IdentityStakes, notifications::NotificationSender,
};
use solana_lite_rpc_core::structures::{epoch::EpochCache, notifications::NotificationSender};
use solana_lite_rpc_core::traits::address_lookup_table_interface::AddressLookupTableInterface;
use solana_lite_rpc_core::types::BlockStream;
use solana_lite_rpc_core::utils::wait_till_block_of_commitment_is_recieved;
@ -243,7 +242,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
let data_cache = DataCache {
block_information_store,
cluster_info: ClusterInfo::default(),
identity_stakes: IdentityStakes::new(validator_identity.pubkey()),
stakes_store: StakesStore::new(validator_identity.pubkey()),
slot_cache: SlotCache::new(finalized_block_info.slot),
tx_subs: SubscriptionStore::default(),
txs: TxStore {

View File

@ -171,17 +171,14 @@ impl DataCachingService {
});
let data_cache: DataCache = self.data_cache.clone();
let identity_stakes_jh = tokio::spawn(async move {
let stakes_cache_jh = tokio::spawn(async move {
let mut va_notification = va_notification;
loop {
let vote_accounts = va_notification
.recv()
.await
.context("Could not get vote accounts")?;
data_cache
.identity_stakes
.update_stakes_for_identity(vote_accounts)
.await;
data_cache.stakes_store.update_stakes(vote_accounts).await;
}
});
@ -199,7 +196,7 @@ impl DataCachingService {
block_cache_jh,
blockinfo_cache_jh,
cluster_info_jh,
identity_stakes_jh,
stakes_cache_jh,
cleaning_service,
]
}

View File

@ -129,7 +129,7 @@ impl TpuService {
.get_slot_leaders(current_slot, last_slot)
.await?;
let identity_stakes = self.data_cache.identity_stakes.get_stakes().await;
let identity_stakes = self.data_cache.stakes_store.get_identity_stakes().await;
let enable_tpu_forwards = {
match identity_stakes.peer_type {