From ce21a58b6578ffa291b0bcf743014ecbf121a3c0 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 10 Apr 2023 17:07:40 +0000 Subject: [PATCH] reworks streamer::StakedNodes (#31082) {min,max}_stake are computed but never assigned: https://github.com/solana-labs/solana/blob/4564bcdc1/core/src/staked_nodes_updater_service.rs#L54-L57 The updater code is also inefficient and verbose. --- bench-tps/src/main.rs | 13 ++- core/src/staked_nodes_updater_service.rs | 112 +++-------------------- core/src/tpu.rs | 1 - quic-client/src/lib.rs | 35 +++---- streamer/src/nonblocking/quic.rs | 32 +++---- streamer/src/streamer.rs | 53 ++++++++++- 6 files changed, 98 insertions(+), 148 deletions(-) diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index ed132932c2..b38a9d2a09 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -98,11 +98,14 @@ fn create_connection_cache( let (stake, total_stake) = find_node_activated_stake(rpc_client, client_node_id.pubkey()).unwrap_or_default(); info!("Stake for specified client_node_id: {stake}, total stake: {total_stake}"); - let staked_nodes = Arc::new(RwLock::new(StakedNodes { - total_stake, - pubkey_stake_map: HashMap::from([(client_node_id.pubkey(), stake)]), - ..StakedNodes::default() - })); + let stakes = HashMap::from([ + (client_node_id.pubkey(), stake), + (Pubkey::new_unique(), total_stake - stake), + ]); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::new( + Arc::new(stakes), + HashMap::::default(), // overrides + ))); ConnectionCache::new_with_client_options( tpu_connection_pool_size, None, diff --git a/core/src/staked_nodes_updater_service.rs b/core/src/staked_nodes_updater_service.rs index e65526f704..e4bffd8666 100644 --- a/core/src/staked_nodes_updater_service.rs +++ b/core/src/staked_nodes_updater_service.rs @@ -1,21 +1,19 @@ use { - solana_gossip::cluster_info::ClusterInfo, solana_runtime::bank_forks::BankForks, solana_sdk::pubkey::Pubkey, solana_streamer::streamer::StakedNodes, std::{ collections::HashMap, - net::IpAddr, sync::{ atomic::{AtomicBool, Ordering}, - Arc, RwLock, RwLockReadGuard, + Arc, RwLock, }, - thread::{self, sleep, Builder, JoinHandle}, - time::{Duration, Instant}, + thread::{self, Builder, JoinHandle}, + time::Duration, }, }; -const IP_TO_STAKE_REFRESH_DURATION: Duration = Duration::from_secs(5); +const STAKE_REFRESH_CYCLE: Duration = Duration::from_secs(5); pub struct StakedNodesUpdaterService { thread_hdl: JoinHandle<()>, @@ -24,35 +22,21 @@ pub struct StakedNodesUpdaterService { impl StakedNodesUpdaterService { pub fn new( exit: Arc, - cluster_info: Arc, bank_forks: Arc>, - shared_staked_nodes: Arc>, - shared_staked_nodes_overrides: Arc>>, + staked_nodes: Arc>, + staked_nodes_overrides: Arc>>, ) -> Self { let thread_hdl = Builder::new() .name("solStakedNodeUd".to_string()) .spawn(move || { - let mut last_stakes = Instant::now(); while !exit.load(Ordering::Relaxed) { - let overrides = shared_staked_nodes_overrides.read().unwrap(); - let mut new_id_to_stake = HashMap::new(); - let mut total_stake = 0; - let mut max_stake: u64 = 0; - let mut min_stake: u64 = u64::MAX; - if Self::try_refresh_stake_maps( - &mut last_stakes, - &mut new_id_to_stake, - &mut total_stake, - &mut max_stake, - &mut min_stake, - &bank_forks, - &cluster_info, - &overrides, - ) { - let mut shared = shared_staked_nodes.write().unwrap(); - shared.total_stake = total_stake; - shared.pubkey_stake_map = new_id_to_stake; - } + let stakes = { + let root_bank = bank_forks.read().unwrap().root_bank(); + root_bank.staked_nodes() + }; + let overrides = staked_nodes_overrides.read().unwrap().clone(); + *staked_nodes.write().unwrap() = StakedNodes::new(stakes, overrides); + std::thread::sleep(STAKE_REFRESH_CYCLE); } }) .unwrap(); @@ -60,76 +44,6 @@ impl StakedNodesUpdaterService { Self { thread_hdl } } - fn try_refresh_stake_maps( - last_stakes: &mut Instant, - id_to_stake: &mut HashMap, - total_stake: &mut u64, - max_stake: &mut u64, - min_stake: &mut u64, - bank_forks: &RwLock, - cluster_info: &ClusterInfo, - overrides: &RwLockReadGuard>, - ) -> bool { - if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION { - let root_bank = bank_forks.read().unwrap().root_bank(); - let staked_nodes = root_bank.staked_nodes(); - - for stake in staked_nodes.values() { - *total_stake += stake; - *max_stake = *stake.max(max_stake); - *min_stake = *stake.min(min_stake); - } - - *id_to_stake = cluster_info - .tvu_peers() - .into_iter() - .filter_map(|node| { - let stake = staked_nodes.get(&node.id)?; - Some((node.id, *stake)) - }) - .collect(); - let my_pubkey = *cluster_info.my_contact_info().pubkey(); - if let Some(stake) = staked_nodes.get(&my_pubkey) { - id_to_stake.insert(my_pubkey, *stake); - } - Self::override_stake(cluster_info, total_stake, id_to_stake, overrides); - - *last_stakes = Instant::now(); - true - } else { - sleep(Duration::from_millis(1)); - false - } - } - - fn override_stake( - cluster_info: &ClusterInfo, - total_stake: &mut u64, - id_to_stake_map: &mut HashMap, - staked_map_overrides: &HashMap, - ) { - let nodes: HashMap = cluster_info - .all_peers() - .into_iter() - .map(|(node, _)| (node.id, node.tvu.ip())) - .collect(); - for (id_override, stake_override) in staked_map_overrides { - if nodes.contains_key(id_override) { - if let Some(previous_stake) = id_to_stake_map.get(id_override) { - *total_stake -= previous_stake; - } - *total_stake += stake_override; - id_to_stake_map.insert(*id_override, *stake_override); - } else { - error!( - "staked nodes overrides configuration for id \ - {id_override} with stake {stake_override} does not \ - match existing IP. Skipping", - ); - } - } - } - pub fn join(self) -> thread::Result<()> { self.thread_hdl.join() } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index b36c4d77ec..f19340a27f 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -132,7 +132,6 @@ impl Tpu { let staked_nodes_updater_service = StakedNodesUpdaterService::new( exit.clone(), - cluster_info.clone(), bank_forks.clone(), staked_nodes.clone(), shared_staked_nodes_overrides, diff --git a/quic-client/src/lib.rs b/quic-client/src/lib.rs index 182c4ed6bc..2af3dddf2f 100644 --- a/quic-client/src/lib.rs +++ b/quic-client/src/lib.rs @@ -122,9 +122,9 @@ impl QuicConfig { (ConnectionPeerType::Unstaked, 0, 0), |stakes| { let rstakes = stakes.read().unwrap(); - rstakes.pubkey_stake_map.get(&pubkey).map_or( - (ConnectionPeerType::Unstaked, 0, rstakes.total_stake), - |stake| (ConnectionPeerType::Staked, *stake, rstakes.total_stake), + rstakes.get_node_stake(&pubkey).map_or( + (ConnectionPeerType::Unstaked, 0, rstakes.total_stake()), + |stake| (ConnectionPeerType::Staked, stake, rstakes.total_stake()), ) }, ) @@ -233,6 +233,7 @@ mod tests { QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, }, + std::collections::HashMap, }; #[test] @@ -252,19 +253,18 @@ mod tests { connection_config.compute_max_parallel_streams(), QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); - - staked_nodes.write().unwrap().total_stake = 10000; + let overrides = HashMap::::default(); + let mut stakes = HashMap::from([(Pubkey::new_unique(), 10_000)]); + *staked_nodes.write().unwrap() = + StakedNodes::new(Arc::new(stakes.clone()), overrides.clone()); assert_eq!( connection_config.compute_max_parallel_streams(), QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); - staked_nodes - .write() - .unwrap() - .pubkey_stake_map - .insert(pubkey, 1); - + stakes.insert(pubkey, 1); + *staked_nodes.write().unwrap() = + StakedNodes::new(Arc::new(stakes.clone()), overrides.clone()); let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; @@ -272,17 +272,8 @@ mod tests { connection_config.compute_max_parallel_streams(), (QUIC_MIN_STAKED_CONCURRENT_STREAMS as f64 + (1f64 / 10000f64) * delta) as usize ); - - staked_nodes - .write() - .unwrap() - .pubkey_stake_map - .remove(&pubkey); - staked_nodes - .write() - .unwrap() - .pubkey_stake_map - .insert(pubkey, 1000); + stakes.insert(pubkey, 1_000); + *staked_nodes.write().unwrap() = StakedNodes::new(Arc::new(stakes.clone()), overrides); assert_ne!( connection_config.compute_max_parallel_streams(), QUIC_MIN_STAKED_CONCURRENT_STREAMS diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 0c495a0037..a49413cfb8 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -211,10 +211,10 @@ fn get_connection_stake( let staked_nodes = staked_nodes.read().unwrap(); Some(( pubkey, - staked_nodes.pubkey_stake_map.get(&pubkey).copied()?, - staked_nodes.total_stake, - staked_nodes.max_stake, - staked_nodes.min_stake, + staked_nodes.get_node_stake(&pubkey)?, + staked_nodes.total_stake(), + staked_nodes.max_stake(), + staked_nodes.min_stake(), )) } @@ -1102,7 +1102,7 @@ pub mod test { signature::Keypair, signer::Signer, }, - std::net::Ipv4Addr, + std::{collections::HashMap, net::Ipv4Addr}, tokio::time::sleep, }; @@ -1518,12 +1518,11 @@ pub mod test { solana_logger::setup(); let client_keypair = Keypair::new(); - let mut staked_nodes = StakedNodes::default(); - staked_nodes - .pubkey_stake_map - .insert(client_keypair.pubkey(), 100000); - staked_nodes.total_stake = 100000; - + let stakes = HashMap::from([(client_keypair.pubkey(), 100_000)]); + let staked_nodes = StakedNodes::new( + Arc::new(stakes), + HashMap::::default(), // overrides + ); let (t, exit, receiver, server_address, stats) = setup_quic_server(Some(staked_nodes), 1); check_multiple_writes(receiver, server_address, Some(&client_keypair)).await; exit.store(true, Ordering::Relaxed); @@ -1545,12 +1544,11 @@ pub mod test { solana_logger::setup(); let client_keypair = Keypair::new(); - let mut staked_nodes = StakedNodes::default(); - staked_nodes - .pubkey_stake_map - .insert(client_keypair.pubkey(), 0); - staked_nodes.total_stake = 0; - + let stakes = HashMap::from([(client_keypair.pubkey(), 0)]); + let staked_nodes = StakedNodes::new( + Arc::new(stakes), + HashMap::::default(), // overrides + ); let (t, exit, receiver, server_address, stats) = setup_quic_server(Some(staked_nodes), 1); check_multiple_writes(receiver, server_address, Some(&client_keypair)).await; exit.store(true, Ordering::Relaxed); diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index e82d5683b1..5bf233572c 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -9,6 +9,7 @@ use { }, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, histogram::Histogram, + itertools::Itertools, solana_sdk::{packet::Packet, pubkey::Pubkey, timing::timestamp}, std::{ cmp::Reverse, @@ -27,10 +28,11 @@ use { // Total stake and nodes => stake map #[derive(Default)] pub struct StakedNodes { - pub total_stake: u64, - pub max_stake: u64, - pub min_stake: u64, - pub pubkey_stake_map: HashMap, + stakes: Arc>, + overrides: HashMap, + total_stake: u64, + max_stake: u64, + min_stake: u64, } pub type PacketBatchReceiver = Receiver; @@ -292,6 +294,49 @@ impl StreamerSendStats { } } +impl StakedNodes { + pub fn new(stakes: Arc>, overrides: HashMap) -> Self { + let values = stakes + .iter() + .filter(|(pubkey, _)| !overrides.contains_key(pubkey)) + .map(|(_, &stake)| stake) + .chain(overrides.values().copied()) + .filter(|&stake| stake > 0); + let total_stake = values.clone().sum(); + let (min_stake, max_stake) = values.minmax().into_option().unwrap_or_default(); + Self { + stakes, + overrides, + total_stake, + max_stake, + min_stake, + } + } + + pub fn get_node_stake(&self, pubkey: &Pubkey) -> Option { + self.overrides + .get(pubkey) + .or_else(|| self.stakes.get(pubkey)) + .filter(|&&stake| stake > 0) + .copied() + } + + #[inline] + pub fn total_stake(&self) -> u64 { + self.total_stake + } + + #[inline] + pub(super) fn min_stake(&self) -> u64 { + self.min_stake + } + + #[inline] + pub(super) fn max_stake(&self) -> u64 { + self.max_stake + } +} + fn recv_send( sock: &UdpSocket, r: &PacketBatchReceiver,