From fd515097d8e0d212bb26f5d07f4cbb2304b9c45c Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Tue, 15 Mar 2022 18:44:53 -0500 Subject: [PATCH] leader qos part 2: add stage to find sender stake, set to packet meta --- core/src/cluster_nodes.rs | 69 +++++++++++++-- core/src/find_packet_sender_stake_stage.rs | 97 ++++++++++++++++++++++ core/src/lib.rs | 2 +- core/src/sigverify_stage.rs | 2 +- core/src/tpu.rs | 22 ++--- core/src/transaction_weighting_stage.rs | 77 ----------------- sdk/src/packet.rs | 4 +- 7 files changed, 176 insertions(+), 97 deletions(-) create mode 100644 core/src/find_packet_sender_stake_stage.rs delete mode 100644 core/src/transaction_weighting_stage.rs diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 1df141607a..459ddd4a65 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -1,5 +1,9 @@ use { - crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage}, + crate::{ + broadcast_stage::BroadcastStage, + find_packet_sender_stake_stage::FindPacketSenderStakeStage, + retransmit_stage::RetransmitStage, + }, itertools::Itertools, lru::LruCache, rand::{seq::SliceRandom, Rng, SeedableRng}, @@ -28,7 +32,7 @@ use { collections::HashMap, iter::repeat_with, marker::PhantomData, - net::SocketAddr, + net::{IpAddr, Ipv4Addr, SocketAddr}, ops::Deref, sync::{Arc, Mutex}, time::{Duration, Instant}, @@ -313,6 +317,19 @@ impl ClusterNodes { } } +impl ClusterNodes { + pub(crate) fn get_ip_to_stakes(&self) -> HashMap { + self.compat_index + .iter() + .filter_map(|(_, i)| { + let node = &self.nodes[*i]; + let contact_info = node.contact_info()?; + Some((contact_info.tvu.ip(), node.stake)) + }) + .collect() + } +} + pub fn new_cluster_nodes( cluster_info: &ClusterInfo, stakes: &HashMap, @@ -488,9 +505,20 @@ pub fn make_test_cluster( ClusterInfo, ) { let (unstaked_numerator, unstaked_denominator) = unstaked_ratio.unwrap_or((1, 7)); - let mut nodes: Vec<_> = repeat_with(|| ContactInfo::new_rand(rng, None)) - .take(num_nodes) - .collect(); + let mut ip_addr_octet: usize = 0; + let mut nodes: Vec<_> = repeat_with(|| { + let mut contact_info = ContactInfo::new_rand(rng, None); + contact_info.tvu.set_ip(IpAddr::V4(Ipv4Addr::new( + 127, + 0, + 0, + (ip_addr_octet % 256) as u8, + ))); + ip_addr_octet += 1; + contact_info + }) + .take(num_nodes) + .collect(); nodes.shuffle(rng); let this_node = nodes[0].clone(); let mut stakes: HashMap = nodes @@ -685,4 +713,35 @@ mod tests { assert_eq!(*peer, peers[index]); } } + + #[test] + fn test_cluster_nodes_transaction_weight() { + solana_logger::setup(); + let mut rng = rand::thread_rng(); + let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 14, None); + let cluster_nodes = new_cluster_nodes::(&cluster_info, &stakes); + + // All nodes with contact-info should be in the index. + assert_eq!(cluster_nodes.compat_index.len(), nodes.len()); + // Staked nodes with no contact-info should be included. + assert!(cluster_nodes.nodes.len() > nodes.len()); + + let ip_to_stake = cluster_nodes.get_ip_to_stakes(); + + // Only staked nodes with contact_info should be in the ip_to_stake + let stacked_nodes_with_contact_info: HashMap<_, _> = stakes + .iter() + .filter_map(|(pubkey, stake)| { + let node = nodes.iter().find(|node| node.id == *pubkey)?; + Some((node.tvu.ip(), stake)) + }) + .collect(); + ip_to_stake.iter().for_each(|(ip, stake)| { + // ignoring the 0 staked, because non-stacked nodes are defaulted into 0 stake. + if *stake > 0 { + let expected_stake = stacked_nodes_with_contact_info.get(ip).unwrap(); + assert_eq!(stake, *expected_stake); + } + }); + } } diff --git a/core/src/find_packet_sender_stake_stage.rs b/core/src/find_packet_sender_stake_stage.rs new file mode 100644 index 0000000000..cfa43de24e --- /dev/null +++ b/core/src/find_packet_sender_stake_stage.rs @@ -0,0 +1,97 @@ +use { + crate::cluster_nodes::ClusterNodesCache, + crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, + rayon::{prelude::*, ThreadPool}, + solana_gossip::cluster_info::ClusterInfo, + solana_perf::packet::PacketBatch, + solana_rayon_threadlimit::get_thread_count, + solana_runtime::bank_forks::BankForks, + solana_streamer::streamer::{self, StreamerError}, + std::{ + cell::RefCell, + collections::HashMap, + net::IpAddr, + sync::{Arc, RwLock}, + thread::{self, Builder, JoinHandle}, + time::{Duration, Instant}, + }, +}; + +const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; +const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); +const STAKES_REFRESH_PERIOD_IN_MS: u128 = 1000; + +thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() + .num_threads(get_thread_count()) + .thread_name(|ix| format!("transaction_sender_stake_stage_{}", ix)) + .build() + .unwrap())); + +pub struct FindPacketSenderStakeStage { + thread_hdl: JoinHandle<()>, +} + +impl FindPacketSenderStakeStage { + pub fn new( + packet_receiver: Receiver, + sender: Sender>, + bank_forks: Arc>, + cluster_info: Arc, + ) -> Self { + let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( + CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, + CLUSTER_NODES_CACHE_TTL, + )); + let thread_hdl = Builder::new() + .name("sol-tx-sender_stake".to_string()) + .spawn(move || { + let mut last_stakes = Instant::now(); + let mut ip_to_stake: HashMap = HashMap::new(); + loop { + if last_stakes.elapsed().as_millis() > STAKES_REFRESH_PERIOD_IN_MS { + let (root_bank, working_bank) = { + let bank_forks = bank_forks.read().unwrap(); + (bank_forks.root_bank(), bank_forks.working_bank()) + }; + ip_to_stake = cluster_nodes_cache + .get(root_bank.slot(), &root_bank, &working_bank, &cluster_info) + .get_ip_to_stakes(); + last_stakes = Instant::now(); + } + match streamer::recv_packet_batches(&packet_receiver) { + Ok((mut batches, _num_packets, _recv_duration)) => { + Self::apply_sender_stakes(&mut batches, &ip_to_stake); + if let Err(e) = sender.send(batches) { + info!("Sender error: {:?}", e); + } + } + Err(e) => match e { + StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break, + StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (), + _ => error!("error: {:?}", e), + }, + } + } + }) + .unwrap(); + Self { thread_hdl } + } + + fn apply_sender_stakes(batches: &mut [PacketBatch], ip_to_stake: &HashMap) { + PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + batches + .into_par_iter() + .flat_map(|batch| batch.packets.par_iter_mut()) + .for_each(|packet| { + packet.meta.sender_stake = + *ip_to_stake.get(&packet.meta.addr().ip()).unwrap_or(&0); + }); + }) + }); + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index ae7029cfdf..e4032def45 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -24,6 +24,7 @@ pub mod cost_update_service; pub mod drop_bank_service; pub mod duplicate_repair_status; pub mod fetch_stage; +pub mod find_packet_sender_stake_stage; pub mod fork_choice; pub mod gen_keys; pub mod heaviest_subtree_fork_choice; @@ -61,7 +62,6 @@ pub mod system_monitor_service; mod tower1_7_14; pub mod tower_storage; pub mod tpu; -pub mod transaction_weighting_stage; pub mod tree_diff; pub mod tvu; pub mod unfrozen_gossip_verified_vote_hashes; diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index cdbe0a96bc..0002fa84da 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -445,7 +445,7 @@ mod tests { for _ in 0..batches.len() { if let Some(batch) = batches.pop() { sent_len += batch.packets.len(); - packet_s.send(batch).unwrap(); + packet_s.send(vec![batch]).unwrap(); } } let mut received = 0; diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 4c6489e3d2..aa995b0787 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -10,9 +10,9 @@ use { GossipVerifiedVoteHashSender, VerifiedVoteSender, VoteTracker, }, fetch_stage::FetchStage, + find_packet_sender_stake_stage::FindPacketSenderStakeStage, sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, - transaction_weighting_stage::TransactionWeightStage, }, crossbeam_channel::{unbounded, Receiver}, solana_gossip::cluster_info::ClusterInfo, @@ -56,8 +56,8 @@ pub struct Tpu { cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_stage: BroadcastStage, tpu_quic_t: thread::JoinHandle<()>, - transaction_weight_stage: TransactionWeightStage, - vote_transaction_weight_stage: TransactionWeightStage, + transaction_weight_stage: FindPacketSenderStakeStage, + vote_transaction_weight_stage: FindPacketSenderStakeStage, } impl Tpu { @@ -107,20 +107,20 @@ impl Tpu { tpu_coalesce_ms, ); - let (weighted_sender, weighted_receiver) = unbounded(); + let (weighted_packet_sender, weighted_packet_receiver) = unbounded(); - let transaction_weight_stage = TransactionWeightStage::new( + let transaction_weight_stage = FindPacketSenderStakeStage::new( packet_receiver, - weighted_sender, + weighted_packet_sender, bank_forks.clone(), cluster_info.clone(), ); - let (vote_weighted_sender, vote_weighted_receiver) = unbounded(); + let (vote_weighted_packet_sender, vote_weighted_packet_receiver) = unbounded(); - let vote_transaction_weight_stage = TransactionWeightStage::new( + let vote_transaction_weight_stage = FindPacketSenderStakeStage::new( vote_packet_receiver, - vote_weighted_sender, + vote_weighted_packet_sender, bank_forks.clone(), cluster_info.clone(), ); @@ -139,7 +139,7 @@ impl Tpu { let sigverify_stage = { let verifier = TransactionSigVerifier::default(); - SigVerifyStage::new(weighted_receiver, verified_sender, verifier) + SigVerifyStage::new(weighted_packet_receiver, verified_sender, verifier) }; let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded(); @@ -147,7 +147,7 @@ impl Tpu { let vote_sigverify_stage = { let verifier = TransactionSigVerifier::new_reject_non_vote(); SigVerifyStage::new( - vote_weighted_receiver, + vote_weighted_packet_receiver, verified_tpu_vote_packets_sender, verifier, ) diff --git a/core/src/transaction_weighting_stage.rs b/core/src/transaction_weighting_stage.rs deleted file mode 100644 index 6dba1fc0db..0000000000 --- a/core/src/transaction_weighting_stage.rs +++ /dev/null @@ -1,77 +0,0 @@ -use { - crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, - rayon::prelude::*, - solana_gossip::cluster_info::ClusterInfo, - solana_perf::packet::PacketBatch, - solana_runtime::bank_forks::BankForks, - solana_streamer::streamer::{self, StreamerError}, - std::{ - collections::HashMap, - net::IpAddr, - sync::{Arc, RwLock}, - thread::{self, Builder, JoinHandle}, - time::Instant, - }, -}; - -pub struct TransactionWeightStage { - thread_hdl: JoinHandle<()>, -} - -impl TransactionWeightStage { - pub fn new( - packet_receiver: Receiver, - sender: Sender>, - bank_forks: Arc>, - cluster_info: Arc, - ) -> Self { - let thread_hdl = Builder::new() - .name("sol-tx-weight".to_string()) - .spawn(move || { - let mut last_stakes = Instant::now(); - let mut ip_to_stake: HashMap = HashMap::new(); - loop { - if last_stakes.elapsed().as_millis() > 1000 { - let root_bank = bank_forks.read().unwrap().root_bank(); - let staked_nodes = root_bank.staked_nodes(); - ip_to_stake = cluster_info - .tvu_peers() - .into_iter() - .filter_map(|node| { - let stake = staked_nodes.get(&node.id)?; - Some((node.tvu.ip(), *stake)) - }) - .collect(); - last_stakes = Instant::now(); - } - match streamer::recv_packet_batches(&packet_receiver) { - Ok((mut batches, _num_packets, _recv_duration)) => { - Self::apply_weights(&mut batches, &ip_to_stake); - if let Err(e) = sender.send(batches) { - info!("Sender error: {:?}", e); - } - } - Err(e) => match e { - StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break, - StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (), - _ => error!("error: {:?}", e), - }, - } - } - }) - .unwrap(); - Self { thread_hdl } - } - - fn apply_weights(batches: &mut [PacketBatch], ip_to_stake: &HashMap) { - batches.into_par_iter().for_each(|batch| { - batch.packets.par_iter_mut().for_each(|packet| { - packet.meta.weight = *ip_to_stake.get(&packet.meta.addr().ip()).unwrap_or(&0); - }); - }); - } - - pub fn join(self) -> thread::Result<()> { - self.thread_hdl.join() - } -} diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index e79620dbe7..b73590da19 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -32,7 +32,7 @@ pub struct Meta { pub addr: IpAddr, pub port: u16, pub flags: PacketFlags, - pub weight: u64, + pub sender_stake: u64, } #[derive(Clone)] @@ -146,7 +146,7 @@ impl Default for Meta { addr: IpAddr::V4(Ipv4Addr::UNSPECIFIED), port: 0, flags: PacketFlags::empty(), - weight: 0, + sender_stake: 0, } } }