From fd988bfbd3cf8e327fb8c1ec36862354bfb46cbc Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Fri, 5 Apr 2024 16:59:47 +0200 Subject: [PATCH] Add StakesStore to cache validator stakes (#2) Needed for special treatment of top-stake nodes. --- core/src/stores/data_cache.rs | 7 +- core/src/stores/mod.rs | 1 + core/src/stores/stakes_store.rs | 102 ++++++++++++++++++ .../custom-tpu-send-transactions/src/main.rs | 5 +- lite-rpc/src/main.rs | 7 +- services/src/data_caching_service.rs | 9 +- services/src/tpu_utils/tpu_service.rs | 2 +- 7 files changed, 116 insertions(+), 17 deletions(-) create mode 100644 core/src/stores/stakes_store.rs diff --git a/core/src/stores/data_cache.rs b/core/src/stores/data_cache.rs index da6a76f5..0a6f8b10 100644 --- a/core/src/stores/data_cache.rs +++ b/core/src/stores/data_cache.rs @@ -9,11 +9,10 @@ use tokio::sync::RwLock; use crate::{ stores::{ block_information_store::BlockInformationStore, cluster_info_store::ClusterInfo, - subscription_store::SubscriptionStore, tx_store::TxStore, + stakes_store::StakesStore, subscription_store::SubscriptionStore, tx_store::TxStore, }, structures::{ epoch::{Epoch, EpochCache}, - identity_stakes::IdentityStakes, slot_notification::{AtomicSlot, SlotNotification}, transaction_sent_info::SentTransactionInfo, }, @@ -35,7 +34,7 @@ pub struct DataCache { pub txs: TxStore, pub tx_subs: SubscriptionStore, pub slot_cache: SlotCache, - pub identity_stakes: IdentityStakes, + pub stakes_store: StakesStore, pub cluster_info: ClusterInfo, pub epoch_data: EpochCache, pub leader_schedule: Arc>, @@ -84,7 +83,7 @@ impl DataCache { block_time: 0, }), cluster_info: ClusterInfo::default(), - identity_stakes: IdentityStakes::new(Pubkey::new_unique()), + stakes_store: StakesStore::new(Pubkey::default()), slot_cache: SlotCache::new(0), tx_subs: SubscriptionStore::default(), txs: TxStore { diff --git a/core/src/stores/mod.rs b/core/src/stores/mod.rs index 1f1b8c51..f0128923 100644 --- a/core/src/stores/mod.rs +++ b/core/src/stores/mod.rs @@ -3,5 +3,6 @@ pub mod block_information_store; pub mod cluster_info_store; pub mod data_cache; +pub mod stakes_store; pub mod subscription_store; pub mod tx_store; diff --git a/core/src/stores/stakes_store.rs b/core/src/stores/stakes_store.rs new file mode 100644 index 00000000..7e445aa2 --- /dev/null +++ b/core/src/stores/stakes_store.rs @@ -0,0 +1,102 @@ +use std::{collections::HashMap, str::FromStr, sync::Arc}; + +use crate::structures::identity_stakes::IdentityStakesData; +use log::error; +use solana_rpc_client_api::response::RpcVoteAccountStatus; +use solana_sdk::pubkey::{ParsePubkeyError, Pubkey}; +use solana_streamer::nonblocking::quic::ConnectionPeerType; +use tokio::sync::RwLock; + +#[derive(Debug, Clone, Copy, Default)] +pub struct StakeSummary { + pub total_stakes: u64, + pub min_stakes: u64, + pub max_stakes: u64, +} + +#[derive(Debug, Clone, Default)] +pub struct StakeData { + pub identity_to_stake: HashMap, + pub stakes_desc: Vec<(Pubkey, u64)>, + pub summary: StakeSummary, +} + +#[derive(Debug, Clone)] +pub struct StakesStore { + own_identity: Pubkey, + data: Arc>, +} + +impl StakesStore { + pub fn new(identity: Pubkey) -> Self { + Self { + own_identity: identity, + data: Arc::new(RwLock::new(StakeData::default())), + } + } + + pub async fn get_summary(&self) -> StakeSummary { + self.data.read().await.summary + } + + /// Stake information for own_identity + pub async fn get_identity_stakes(&self) -> IdentityStakesData { + let read_lock = self.data.read().await; + let own_stake = read_lock.identity_to_stake.get(&self.own_identity); + let summary = &read_lock.summary; + own_stake + .map(|stake| IdentityStakesData { + peer_type: ConnectionPeerType::Staked, + stakes: *stake, + total_stakes: summary.total_stakes, + min_stakes: summary.min_stakes, + max_stakes: summary.max_stakes, + }) + .unwrap_or_default() + } + + pub async fn get_node_stake(&self, identity: &Pubkey) -> Option { + self.data + .read() + .await + .identity_to_stake + .get(identity) + .cloned() + } + + pub async fn get_stake_per_node(&self) -> HashMap { + self.data.read().await.identity_to_stake.clone() + } + + pub async fn get_all_stakes_desc(&self) -> Vec<(Pubkey, u64)> { + self.data.read().await.stakes_desc.clone() + } + + pub async fn update_stakes(&self, vote_accounts: RpcVoteAccountStatus) { + let Ok(mut stakes_desc) = vote_accounts + .current + .iter() + .chain(vote_accounts.delinquent.iter()) + .map(|va| Ok((Pubkey::from_str(&va.node_pubkey)?, va.activated_stake))) + .collect::, ParsePubkeyError>>() + else { + error!("rpc vote account result contained bad pubkey"); + return; + }; + + stakes_desc.sort_by_key(|(_pk, stake)| std::cmp::Reverse(*stake)); + + let id_to_stake: HashMap = stakes_desc.iter().copied().collect(); + + let summary = StakeSummary { + total_stakes: id_to_stake.values().sum(), + min_stakes: id_to_stake.values().min().copied().unwrap_or(0), + max_stakes: id_to_stake.values().max().copied().unwrap_or(0), + }; + + let mut write_lock = self.data.write().await; + write_lock.summary = summary; + write_lock.identity_to_stake = id_to_stake; + write_lock.stakes_desc = stakes_desc; + } +} diff --git a/examples/custom-tpu-send-transactions/src/main.rs b/examples/custom-tpu-send-transactions/src/main.rs index 20d20a1d..0fccb000 100644 --- a/examples/custom-tpu-send-transactions/src/main.rs +++ b/examples/custom-tpu-send-transactions/src/main.rs @@ -19,11 +19,12 @@ use solana_lite_rpc_core::{ block_information_store::{BlockInformation, BlockInformationStore}, cluster_info_store::ClusterInfo, data_cache::{DataCache, SlotCache}, + stakes_store::StakesStore, subscription_store::SubscriptionStore, tx_store::TxStore, }, structures::{ - epoch::EpochCache, identity_stakes::IdentityStakes, leaderschedule::CalculatedSchedule, + epoch::EpochCache, leaderschedule::CalculatedSchedule, transaction_sent_info::SentTransactionInfo, }, utils::wait_till_block_of_commitment_is_recieved, @@ -320,7 +321,7 @@ async fn main() -> anyhow::Result<()> { let data_cache = DataCache { block_information_store, cluster_info: ClusterInfo::default(), - identity_stakes: IdentityStakes::new(validator_identity.pubkey()), + stakes_store: StakesStore::new(validator_identity.pubkey()), slot_cache: SlotCache::new(finalize_slot), tx_subs: SubscriptionStore::default(), txs: TxStore { diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index 466879e8..1020343e 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -35,14 +35,13 @@ use solana_lite_rpc_core::stores::{ block_information_store::{BlockInformation, BlockInformationStore}, cluster_info_store::ClusterInfo, data_cache::{DataCache, SlotCache}, + stakes_store::StakesStore, subscription_store::SubscriptionStore, tx_store::TxStore, }; use solana_lite_rpc_core::structures::account_filter::AccountFilters; use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule; -use solana_lite_rpc_core::structures::{ - epoch::EpochCache, identity_stakes::IdentityStakes, notifications::NotificationSender, -}; +use solana_lite_rpc_core::structures::{epoch::EpochCache, notifications::NotificationSender}; use solana_lite_rpc_core::traits::address_lookup_table_interface::AddressLookupTableInterface; use solana_lite_rpc_core::types::BlockStream; use solana_lite_rpc_core::utils::wait_till_block_of_commitment_is_recieved; @@ -243,7 +242,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: let data_cache = DataCache { block_information_store, cluster_info: ClusterInfo::default(), - identity_stakes: IdentityStakes::new(validator_identity.pubkey()), + stakes_store: StakesStore::new(validator_identity.pubkey()), slot_cache: SlotCache::new(finalized_block_info.slot), tx_subs: SubscriptionStore::default(), txs: TxStore { diff --git a/services/src/data_caching_service.rs b/services/src/data_caching_service.rs index 826ddef5..b21eb2ec 100644 --- a/services/src/data_caching_service.rs +++ b/services/src/data_caching_service.rs @@ -171,17 +171,14 @@ impl DataCachingService { }); let data_cache: DataCache = self.data_cache.clone(); - let identity_stakes_jh = tokio::spawn(async move { + let stakes_cache_jh = tokio::spawn(async move { let mut va_notification = va_notification; loop { let vote_accounts = va_notification .recv() .await .context("Could not get vote accounts")?; - data_cache - .identity_stakes - .update_stakes_for_identity(vote_accounts) - .await; + data_cache.stakes_store.update_stakes(vote_accounts).await; } }); @@ -199,7 +196,7 @@ impl DataCachingService { block_cache_jh, blockinfo_cache_jh, cluster_info_jh, - identity_stakes_jh, + stakes_cache_jh, cleaning_service, ] } diff --git a/services/src/tpu_utils/tpu_service.rs b/services/src/tpu_utils/tpu_service.rs index 0d770db5..9c0b3c62 100644 --- a/services/src/tpu_utils/tpu_service.rs +++ b/services/src/tpu_utils/tpu_service.rs @@ -129,7 +129,7 @@ impl TpuService { .get_slot_leaders(current_slot, last_slot) .await?; - let identity_stakes = self.data_cache.identity_stakes.get_stakes().await; + let identity_stakes = self.data_cache.stakes_store.get_identity_stakes().await; let enable_tpu_forwards = { match identity_stakes.peer_type {