diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 459ddd4a65..1df141607a 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -1,9 +1,5 @@ use { - crate::{ - broadcast_stage::BroadcastStage, - find_packet_sender_stake_stage::FindPacketSenderStakeStage, - retransmit_stage::RetransmitStage, - }, + crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage}, itertools::Itertools, lru::LruCache, rand::{seq::SliceRandom, Rng, SeedableRng}, @@ -32,7 +28,7 @@ use { collections::HashMap, iter::repeat_with, marker::PhantomData, - net::{IpAddr, Ipv4Addr, SocketAddr}, + net::SocketAddr, ops::Deref, sync::{Arc, Mutex}, time::{Duration, Instant}, @@ -317,19 +313,6 @@ impl ClusterNodes { } } -impl ClusterNodes { - pub(crate) fn get_ip_to_stakes(&self) -> HashMap { - self.compat_index - .iter() - .filter_map(|(_, i)| { - let node = &self.nodes[*i]; - let contact_info = node.contact_info()?; - Some((contact_info.tvu.ip(), node.stake)) - }) - .collect() - } -} - pub fn new_cluster_nodes( cluster_info: &ClusterInfo, stakes: &HashMap, @@ -505,20 +488,9 @@ pub fn make_test_cluster( ClusterInfo, ) { let (unstaked_numerator, unstaked_denominator) = unstaked_ratio.unwrap_or((1, 7)); - let mut ip_addr_octet: usize = 0; - let mut nodes: Vec<_> = repeat_with(|| { - let mut contact_info = ContactInfo::new_rand(rng, None); - contact_info.tvu.set_ip(IpAddr::V4(Ipv4Addr::new( - 127, - 0, - 0, - (ip_addr_octet % 256) as u8, - ))); - ip_addr_octet += 1; - contact_info - }) - .take(num_nodes) - .collect(); + let mut nodes: Vec<_> = repeat_with(|| ContactInfo::new_rand(rng, None)) + .take(num_nodes) + .collect(); nodes.shuffle(rng); let this_node = nodes[0].clone(); let mut stakes: HashMap = nodes @@ -713,35 +685,4 @@ mod tests { assert_eq!(*peer, peers[index]); } } - - #[test] - fn test_cluster_nodes_transaction_weight() { - solana_logger::setup(); - let mut rng = rand::thread_rng(); - let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 14, None); - let cluster_nodes = new_cluster_nodes::(&cluster_info, &stakes); - - // All nodes with contact-info should be in the index. - assert_eq!(cluster_nodes.compat_index.len(), nodes.len()); - // Staked nodes with no contact-info should be included. - assert!(cluster_nodes.nodes.len() > nodes.len()); - - let ip_to_stake = cluster_nodes.get_ip_to_stakes(); - - // Only staked nodes with contact_info should be in the ip_to_stake - let stacked_nodes_with_contact_info: HashMap<_, _> = stakes - .iter() - .filter_map(|(pubkey, stake)| { - let node = nodes.iter().find(|node| node.id == *pubkey)?; - Some((node.tvu.ip(), stake)) - }) - .collect(); - ip_to_stake.iter().for_each(|(ip, stake)| { - // ignoring the 0 staked, because non-stacked nodes are defaulted into 0 stake. - if *stake > 0 { - let expected_stake = stacked_nodes_with_contact_info.get(ip).unwrap(); - assert_eq!(stake, *expected_stake); - } - }); - } } diff --git a/core/src/find_packet_sender_stake_stage.rs b/core/src/find_packet_sender_stake_stage.rs index cfa43de24e..d584f74b70 100644 --- a/core/src/find_packet_sender_stake_stage.rs +++ b/core/src/find_packet_sender_stake_stage.rs @@ -1,11 +1,12 @@ use { - crate::cluster_nodes::ClusterNodesCache, crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, rayon::{prelude::*, ThreadPool}, solana_gossip::cluster_info::ClusterInfo, + solana_measure::measure::Measure, solana_perf::packet::PacketBatch, solana_rayon_threadlimit::get_thread_count, solana_runtime::bank_forks::BankForks, + solana_sdk::timing::timestamp, solana_streamer::streamer::{self, StreamerError}, std::{ cell::RefCell, @@ -17,9 +18,7 @@ use { }, }; -const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; -const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); -const STAKES_REFRESH_PERIOD_IN_MS: u128 = 1000; +const IP_TO_STAKE_REFRESH_DURATION: Duration = Duration::from_secs(5); thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) @@ -27,43 +26,109 @@ thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon:: .build() .unwrap())); +pub type FindPacketSenderStakeSender = Sender>; +pub type FindPacketSenderStakeReceiver = Receiver>; + +#[derive(Debug, Default)] +struct FindPacketSenderStakeStats { + last_print: u64, + refresh_ip_to_stake_time: u64, + apply_sender_stakes_time: u64, + send_batches_time: u64, + receive_batches_time: u64, + total_batches: u64, + total_packets: u64, +} + +impl FindPacketSenderStakeStats { + fn report(&mut self) { + let now = timestamp(); + let elapsed_ms = now - self.last_print; + if elapsed_ms > 2000 { + datapoint_info!( + "find_packet_sender_stake-services_stats", + ( + "refresh_ip_to_stake_time", + self.refresh_ip_to_stake_time as i64, + i64 + ), + ( + "apply_sender_stakes_time", + self.apply_sender_stakes_time as i64, + i64 + ), + ("send_batches_time", self.send_batches_time as i64, i64), + ( + "receive_batches_time", + self.receive_batches_time as i64, + i64 + ), + ("total_batches", self.total_batches as i64, i64), + ("total_packets", self.total_packets as i64, i64), + ); + *self = FindPacketSenderStakeStats::default(); + self.last_print = now; + } + } +} + pub struct FindPacketSenderStakeStage { thread_hdl: JoinHandle<()>, } impl FindPacketSenderStakeStage { pub fn new( - packet_receiver: Receiver, - sender: Sender>, + packet_receiver: streamer::PacketBatchReceiver, + sender: FindPacketSenderStakeSender, bank_forks: Arc>, cluster_info: Arc, ) -> Self { - let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( - CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, - CLUSTER_NODES_CACHE_TTL, - )); + let mut stats = FindPacketSenderStakeStats::default(); let thread_hdl = Builder::new() - .name("sol-tx-sender_stake".to_string()) + .name("find-packet-sender-stake".to_string()) .spawn(move || { let mut last_stakes = Instant::now(); let mut ip_to_stake: HashMap = HashMap::new(); loop { - if last_stakes.elapsed().as_millis() > STAKES_REFRESH_PERIOD_IN_MS { - let (root_bank, working_bank) = { - let bank_forks = bank_forks.read().unwrap(); - (bank_forks.root_bank(), bank_forks.working_bank()) - }; - ip_to_stake = cluster_nodes_cache - .get(root_bank.slot(), &root_bank, &working_bank, &cluster_info) - .get_ip_to_stakes(); - last_stakes = Instant::now(); - } + let mut refresh_ip_to_stake_time = Measure::start("refresh_ip_to_stake_time"); + Self::try_refresh_ip_to_stake( + &mut last_stakes, + &mut ip_to_stake, + bank_forks.clone(), + cluster_info.clone(), + ); + refresh_ip_to_stake_time.stop(); + stats.refresh_ip_to_stake_time = stats + .refresh_ip_to_stake_time + .saturating_add(refresh_ip_to_stake_time.as_us()); + match streamer::recv_packet_batches(&packet_receiver) { - Ok((mut batches, _num_packets, _recv_duration)) => { + Ok((mut batches, num_packets, recv_duration)) => { + let num_batches = batches.len(); + let mut apply_sender_stakes_time = + Measure::start("apply_sender_stakes_time"); Self::apply_sender_stakes(&mut batches, &ip_to_stake); + apply_sender_stakes_time.stop(); + + let mut send_batches_time = Measure::start("send_batches_time"); if let Err(e) = sender.send(batches) { info!("Sender error: {:?}", e); } + send_batches_time.stop(); + + stats.apply_sender_stakes_time = stats + .apply_sender_stakes_time + .saturating_add(apply_sender_stakes_time.as_us()); + stats.send_batches_time = stats + .send_batches_time + .saturating_add(send_batches_time.as_us()); + stats.receive_batches_time = stats + .receive_batches_time + .saturating_add(recv_duration.as_nanos() as u64); + stats.total_batches = + stats.total_batches.saturating_add(num_batches as u64); + stats.total_packets = + stats.total_packets.saturating_add(num_packets as u64); } Err(e) => match e { StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break, @@ -71,12 +136,35 @@ impl FindPacketSenderStakeStage { _ => error!("error: {:?}", e), }, } + + stats.report(); } }) .unwrap(); Self { thread_hdl } } + fn try_refresh_ip_to_stake( + last_stakes: &mut Instant, + ip_to_stake: &mut HashMap, + bank_forks: Arc>, + cluster_info: Arc, + ) { + if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION { + let root_bank = bank_forks.read().unwrap().root_bank(); + let staked_nodes = root_bank.staked_nodes(); + *ip_to_stake = cluster_info + .tvu_peers() + .into_iter() + .filter_map(|node| { + let stake = staked_nodes.get(&node.id)?; + Some((node.tvu.ip(), *stake)) + }) + .collect(); + *last_stakes = Instant::now(); + } + } + fn apply_sender_stakes(batches: &mut [PacketBatch], ip_to_stake: &HashMap) { PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 0002fa84da..b138827166 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -6,9 +6,9 @@ //! if perf-libs are available use { - crate::sigverify, + crate::{find_packet_sender_stake_stage, sigverify}, core::time::Duration, - crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, + crossbeam_channel::{RecvTimeoutError, SendError, Sender}, itertools::Itertools, solana_measure::measure::Measure, solana_perf::{ @@ -192,7 +192,7 @@ impl SigVerifier for DisabledSigVerifier { impl SigVerifyStage { #[allow(clippy::new_ret_no_self)] pub fn new( - packet_receiver: Receiver>, + packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver, verified_sender: Sender>, verifier: T, ) -> Self { @@ -227,7 +227,7 @@ impl SigVerifyStage { fn verifier( deduper: &Deduper, - recvr: &Receiver>, + recvr: &find_packet_sender_stake_stage::FindPacketSenderStakeReceiver, sendr: &Sender>, verifier: &T, stats: &mut SigVerifierStats, @@ -312,7 +312,7 @@ impl SigVerifyStage { } fn verifier_service( - packet_receiver: Receiver>, + packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver, verified_sender: Sender>, verifier: &T, ) -> JoinHandle<()> { @@ -358,7 +358,7 @@ impl SigVerifyStage { } fn verifier_services( - packet_receiver: Receiver>, + packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver, verified_sender: Sender>, verifier: T, ) -> JoinHandle<()> { diff --git a/core/src/tpu.rs b/core/src/tpu.rs index aa995b0787..929bbb4d25 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -56,8 +56,8 @@ pub struct Tpu { cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_stage: BroadcastStage, tpu_quic_t: thread::JoinHandle<()>, - transaction_weight_stage: FindPacketSenderStakeStage, - vote_transaction_weight_stage: FindPacketSenderStakeStage, + find_packet_sender_stake_stage: FindPacketSenderStakeStage, + vote_find_packet_sender_stake_stage: FindPacketSenderStakeStage, } impl Tpu { @@ -107,20 +107,21 @@ impl Tpu { tpu_coalesce_ms, ); - let (weighted_packet_sender, weighted_packet_receiver) = unbounded(); + let (find_packet_sender_stake_sender, find_packet_sender_stake_receiver) = unbounded(); - let transaction_weight_stage = FindPacketSenderStakeStage::new( + let find_packet_sender_stake_stage = FindPacketSenderStakeStage::new( packet_receiver, - weighted_packet_sender, + find_packet_sender_stake_sender, bank_forks.clone(), cluster_info.clone(), ); - let (vote_weighted_packet_sender, vote_weighted_packet_receiver) = unbounded(); + let (vote_find_packet_sender_stake_sender, vote_find_packet_sender_stake_receiver) = + unbounded(); - let vote_transaction_weight_stage = FindPacketSenderStakeStage::new( + let vote_find_packet_sender_stake_stage = FindPacketSenderStakeStage::new( vote_packet_receiver, - vote_weighted_packet_sender, + vote_find_packet_sender_stake_sender, bank_forks.clone(), cluster_info.clone(), ); @@ -139,7 +140,7 @@ impl Tpu { let sigverify_stage = { let verifier = TransactionSigVerifier::default(); - SigVerifyStage::new(weighted_packet_receiver, verified_sender, verifier) + SigVerifyStage::new(find_packet_sender_stake_receiver, verified_sender, verifier) }; let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded(); @@ -147,7 +148,7 @@ impl Tpu { let vote_sigverify_stage = { let verifier = TransactionSigVerifier::new_reject_non_vote(); SigVerifyStage::new( - vote_weighted_packet_receiver, + vote_find_packet_sender_stake_receiver, verified_tpu_vote_packets_sender, verifier, ) @@ -201,8 +202,8 @@ impl Tpu { cluster_info_vote_listener, broadcast_stage, tpu_quic_t, - transaction_weight_stage, - vote_transaction_weight_stage, + find_packet_sender_stake_stage, + vote_find_packet_sender_stake_stage, } } @@ -213,8 +214,8 @@ impl Tpu { self.vote_sigverify_stage.join(), self.cluster_info_vote_listener.join(), self.banking_stage.join(), - self.transaction_weight_stage.join(), - self.vote_transaction_weight_stage.join(), + self.find_packet_sender_stake_stage.join(), + self.vote_find_packet_sender_stake_stage.join(), ]; self.tpu_quic_t.join()?; let broadcast_result = self.broadcast_stage.join();