From 4b0514d9b1789d50d68017ddf838a170de0168b2 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Fri, 2 Jun 2023 09:25:23 -0700 Subject: [PATCH] Fixing send-transaction-service using quic, tpu address is wrong (#31899) * Fixing send-transaction-service using quic, tpu address is wrong * Use Protocol field instead of bool for passing protocol info * Address some code review comment from Behzad: get_leader_tpus per protocol --- rpc/src/cluster_tpu_info.rs | 61 +++++++++++++------ .../src/send_transaction_service.rs | 22 +++++-- send-transaction-service/src/tpu_info.rs | 6 +- 3 files changed, 65 insertions(+), 24 deletions(-) diff --git a/rpc/src/cluster_tpu_info.rs b/rpc/src/cluster_tpu_info.rs index 80abbe749..c7b6436c8 100644 --- a/rpc/src/cluster_tpu_info.rs +++ b/rpc/src/cluster_tpu_info.rs @@ -14,7 +14,7 @@ use { pub struct ClusterTpuInfo { cluster_info: Arc, poh_recorder: Arc>, - recent_peers: HashMap, + recent_peers: HashMap, // values are socket address for UDP and QUIC protocols } impl ClusterTpuInfo { @@ -33,11 +33,19 @@ impl TpuInfo for ClusterTpuInfo { .cluster_info .tpu_peers() .into_iter() - .filter_map(|node| Some((*node.pubkey(), node.tpu(Protocol::UDP).ok()?))) + .filter_map(|node| { + Some(( + *node.pubkey(), + ( + node.tpu(Protocol::UDP).ok()?, + node.tpu(Protocol::QUIC).ok()?, + ), + )) + }) .collect(); } - fn get_leader_tpus(&self, max_count: u64) -> Vec<&SocketAddr> { + fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> { let recorder = self.poh_recorder.read().unwrap(); let leaders: Vec<_> = (0..max_count) .filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS)) @@ -45,7 +53,10 @@ impl TpuInfo for ClusterTpuInfo { drop(recorder); let mut unique_leaders = vec![]; for leader in leaders.iter() { - if let Some(addr) = self.recent_peers.get(leader) { + if let Some(addr) = self.recent_peers.get(leader).map(|addr| match protocol { + Protocol::UDP => &addr.0, + Protocol::QUIC => &addr.1, + }) { if !unique_leaders.contains(&addr) { unique_leaders.push(addr); } @@ -71,6 +82,7 @@ mod test { }, solana_sdk::{ poh_config::PohConfig, + quic::QUIC_PORT_OFFSET, signature::{Keypair, Signer}, timing::timestamp, }, @@ -119,9 +131,18 @@ mod test { SocketAddrSpace::Unspecified, )); - let validator0_socket = SocketAddr::from((Ipv4Addr::LOCALHOST, 1111)); - let validator1_socket = SocketAddr::from((Ipv4Addr::LOCALHOST, 2222)); - let validator2_socket = SocketAddr::from((Ipv4Addr::LOCALHOST, 3333)); + let validator0_socket = ( + SocketAddr::from((Ipv4Addr::LOCALHOST, 1111)), + SocketAddr::from((Ipv4Addr::LOCALHOST, 1111 + QUIC_PORT_OFFSET)), + ); + let validator1_socket = ( + SocketAddr::from((Ipv4Addr::LOCALHOST, 2222)), + SocketAddr::from((Ipv4Addr::LOCALHOST, 2222 + QUIC_PORT_OFFSET)), + ); + let validator2_socket = ( + SocketAddr::from((Ipv4Addr::LOCALHOST, 3333)), + SocketAddr::from((Ipv4Addr::LOCALHOST, 3333 + QUIC_PORT_OFFSET)), + ); let recent_peers: HashMap<_, _> = vec![ ( validator_vote_keypairs0.node_keypair.pubkey(), @@ -149,8 +170,8 @@ mod test { let first_leader = solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap(); assert_eq!( - leader_info.get_leader_tpus(1), - vec![recent_peers.get(&first_leader).unwrap()] + leader_info.get_leader_tpus(1, Protocol::UDP), + vec![&recent_peers.get(&first_leader).unwrap().0] ); let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at( @@ -159,11 +180,14 @@ mod test { ) .unwrap(); let mut expected_leader_sockets = vec![ - recent_peers.get(&first_leader).unwrap(), - recent_peers.get(&second_leader).unwrap(), + &recent_peers.get(&first_leader).unwrap().0, + &recent_peers.get(&second_leader).unwrap().0, ]; expected_leader_sockets.dedup(); - assert_eq!(leader_info.get_leader_tpus(2), expected_leader_sockets); + assert_eq!( + leader_info.get_leader_tpus(2, Protocol::UDP), + expected_leader_sockets + ); let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at( slot + (2 * NUM_CONSECUTIVE_LEADER_SLOTS), @@ -171,15 +195,18 @@ mod test { ) .unwrap(); let mut expected_leader_sockets = vec![ - recent_peers.get(&first_leader).unwrap(), - recent_peers.get(&second_leader).unwrap(), - recent_peers.get(&third_leader).unwrap(), + &recent_peers.get(&first_leader).unwrap().0, + &recent_peers.get(&second_leader).unwrap().0, + &recent_peers.get(&third_leader).unwrap().0, ]; expected_leader_sockets.dedup(); - assert_eq!(leader_info.get_leader_tpus(3), expected_leader_sockets); + assert_eq!( + leader_info.get_leader_tpus(3, Protocol::UDP), + expected_leader_sockets + ); for x in 4..8 { - assert!(leader_info.get_leader_tpus(x).len() <= recent_peers.len()); + assert!(leader_info.get_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len()); } } Blockstore::destroy(&ledger_path).unwrap(); diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 629432ff8..85a57141b 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -2,7 +2,10 @@ use { crate::tpu_info::TpuInfo, crossbeam_channel::{Receiver, RecvTimeoutError}, log::*, - solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, + solana_client::{ + connection_cache::{ConnectionCache, Protocol}, + tpu_connection::TpuConnection, + }, solana_measure::measure::Measure, solana_metrics::datapoint_warn, solana_runtime::{bank::Bank, bank_forks::BankForks}, @@ -562,7 +565,12 @@ impl SendTransactionService { stats: &SendTransactionServiceStats, ) { // Processing the transactions in batch - let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config); + let addresses = Self::get_tpu_addresses( + tpu_address, + leader_info, + config, + connection_cache.protocol(), + ); let wire_transactions = transactions .iter() @@ -689,7 +697,12 @@ impl SendTransactionService { for chunk in iter { let mut leader_info_provider = leader_info_provider.lock().unwrap(); let leader_info = leader_info_provider.get_leader_info(); - let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config); + let addresses = Self::get_tpu_addresses( + tpu_address, + leader_info, + config, + connection_cache.protocol(), + ); for address in &addresses { Self::send_transactions(address, chunk, connection_cache, stats); @@ -748,10 +761,11 @@ impl SendTransactionService { tpu_address: &'a SocketAddr, leader_info: Option<&'a T>, config: &'a Config, + protocol: Protocol, ) -> Vec<&'a SocketAddr> { let addresses = leader_info .as_ref() - .map(|leader_info| leader_info.get_leader_tpus(config.leader_forward_count)); + .map(|leader_info| leader_info.get_leader_tpus(config.leader_forward_count, protocol)); addresses .map(|address_list| { if address_list.is_empty() { diff --git a/send-transaction-service/src/tpu_info.rs b/send-transaction-service/src/tpu_info.rs index 4cbbcfdca..6ecd56ae4 100644 --- a/send-transaction-service/src/tpu_info.rs +++ b/send-transaction-service/src/tpu_info.rs @@ -1,8 +1,8 @@ -use std::net::SocketAddr; +use {solana_client::connection_cache::Protocol, std::net::SocketAddr}; pub trait TpuInfo { fn refresh_recent_peers(&mut self); - fn get_leader_tpus(&self, max_count: u64) -> Vec<&SocketAddr>; + fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr>; } #[derive(Clone)] @@ -10,7 +10,7 @@ pub struct NullTpuInfo; impl TpuInfo for NullTpuInfo { fn refresh_recent_peers(&mut self) {} - fn get_leader_tpus(&self, _max_count: u64) -> Vec<&SocketAddr> { + fn get_leader_tpus(&self, _max_count: u64, _protocol: Protocol) -> Vec<&SocketAddr> { vec![] } }