Fixing send-transaction-service using quic, tpu address is wrong (#31899)

* Fixing send-transaction-service using quic, tpu address is wrong

* Use Protocol field instead of bool for passing protocol info

* Address some code review comment from Behzad: get_leader_tpus per protocol
This commit is contained in:
Lijun Wang 2023-06-02 09:25:23 -07:00 committed by GitHub
parent 038ca31cfa
commit 4b0514d9b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 24 deletions

View File

@ -14,7 +14,7 @@ use {
pub struct ClusterTpuInfo { pub struct ClusterTpuInfo {
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
poh_recorder: Arc<RwLock<PohRecorder>>, poh_recorder: Arc<RwLock<PohRecorder>>,
recent_peers: HashMap<Pubkey, SocketAddr>, recent_peers: HashMap<Pubkey, (SocketAddr, SocketAddr)>, // values are socket address for UDP and QUIC protocols
} }
impl ClusterTpuInfo { impl ClusterTpuInfo {
@ -33,11 +33,19 @@ impl TpuInfo for ClusterTpuInfo {
.cluster_info .cluster_info
.tpu_peers() .tpu_peers()
.into_iter() .into_iter()
.filter_map(|node| Some((*node.pubkey(), node.tpu(Protocol::UDP).ok()?))) .filter_map(|node| {
Some((
*node.pubkey(),
(
node.tpu(Protocol::UDP).ok()?,
node.tpu(Protocol::QUIC).ok()?,
),
))
})
.collect(); .collect();
} }
fn get_leader_tpus(&self, max_count: u64) -> Vec<&SocketAddr> { fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
let recorder = self.poh_recorder.read().unwrap(); let recorder = self.poh_recorder.read().unwrap();
let leaders: Vec<_> = (0..max_count) let leaders: Vec<_> = (0..max_count)
.filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS)) .filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS))
@ -45,7 +53,10 @@ impl TpuInfo for ClusterTpuInfo {
drop(recorder); drop(recorder);
let mut unique_leaders = vec![]; let mut unique_leaders = vec![];
for leader in leaders.iter() { for leader in leaders.iter() {
if let Some(addr) = self.recent_peers.get(leader) { if let Some(addr) = self.recent_peers.get(leader).map(|addr| match protocol {
Protocol::UDP => &addr.0,
Protocol::QUIC => &addr.1,
}) {
if !unique_leaders.contains(&addr) { if !unique_leaders.contains(&addr) {
unique_leaders.push(addr); unique_leaders.push(addr);
} }
@ -71,6 +82,7 @@ mod test {
}, },
solana_sdk::{ solana_sdk::{
poh_config::PohConfig, poh_config::PohConfig,
quic::QUIC_PORT_OFFSET,
signature::{Keypair, Signer}, signature::{Keypair, Signer},
timing::timestamp, timing::timestamp,
}, },
@ -119,9 +131,18 @@ mod test {
SocketAddrSpace::Unspecified, SocketAddrSpace::Unspecified,
)); ));
let validator0_socket = SocketAddr::from((Ipv4Addr::LOCALHOST, 1111)); let validator0_socket = (
let validator1_socket = SocketAddr::from((Ipv4Addr::LOCALHOST, 2222)); SocketAddr::from((Ipv4Addr::LOCALHOST, 1111)),
let validator2_socket = SocketAddr::from((Ipv4Addr::LOCALHOST, 3333)); SocketAddr::from((Ipv4Addr::LOCALHOST, 1111 + QUIC_PORT_OFFSET)),
);
let validator1_socket = (
SocketAddr::from((Ipv4Addr::LOCALHOST, 2222)),
SocketAddr::from((Ipv4Addr::LOCALHOST, 2222 + QUIC_PORT_OFFSET)),
);
let validator2_socket = (
SocketAddr::from((Ipv4Addr::LOCALHOST, 3333)),
SocketAddr::from((Ipv4Addr::LOCALHOST, 3333 + QUIC_PORT_OFFSET)),
);
let recent_peers: HashMap<_, _> = vec![ let recent_peers: HashMap<_, _> = vec![
( (
validator_vote_keypairs0.node_keypair.pubkey(), validator_vote_keypairs0.node_keypair.pubkey(),
@ -149,8 +170,8 @@ mod test {
let first_leader = let first_leader =
solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap(); solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap();
assert_eq!( assert_eq!(
leader_info.get_leader_tpus(1), leader_info.get_leader_tpus(1, Protocol::UDP),
vec![recent_peers.get(&first_leader).unwrap()] vec![&recent_peers.get(&first_leader).unwrap().0]
); );
let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at( let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
@ -159,11 +180,14 @@ mod test {
) )
.unwrap(); .unwrap();
let mut expected_leader_sockets = vec![ let mut expected_leader_sockets = vec![
recent_peers.get(&first_leader).unwrap(), &recent_peers.get(&first_leader).unwrap().0,
recent_peers.get(&second_leader).unwrap(), &recent_peers.get(&second_leader).unwrap().0,
]; ];
expected_leader_sockets.dedup(); expected_leader_sockets.dedup();
assert_eq!(leader_info.get_leader_tpus(2), expected_leader_sockets); assert_eq!(
leader_info.get_leader_tpus(2, Protocol::UDP),
expected_leader_sockets
);
let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at( let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
slot + (2 * NUM_CONSECUTIVE_LEADER_SLOTS), slot + (2 * NUM_CONSECUTIVE_LEADER_SLOTS),
@ -171,15 +195,18 @@ mod test {
) )
.unwrap(); .unwrap();
let mut expected_leader_sockets = vec![ let mut expected_leader_sockets = vec![
recent_peers.get(&first_leader).unwrap(), &recent_peers.get(&first_leader).unwrap().0,
recent_peers.get(&second_leader).unwrap(), &recent_peers.get(&second_leader).unwrap().0,
recent_peers.get(&third_leader).unwrap(), &recent_peers.get(&third_leader).unwrap().0,
]; ];
expected_leader_sockets.dedup(); expected_leader_sockets.dedup();
assert_eq!(leader_info.get_leader_tpus(3), expected_leader_sockets); assert_eq!(
leader_info.get_leader_tpus(3, Protocol::UDP),
expected_leader_sockets
);
for x in 4..8 { for x in 4..8 {
assert!(leader_info.get_leader_tpus(x).len() <= recent_peers.len()); assert!(leader_info.get_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len());
} }
} }
Blockstore::destroy(&ledger_path).unwrap(); Blockstore::destroy(&ledger_path).unwrap();

View File

@ -2,7 +2,10 @@ use {
crate::tpu_info::TpuInfo, crate::tpu_info::TpuInfo,
crossbeam_channel::{Receiver, RecvTimeoutError}, crossbeam_channel::{Receiver, RecvTimeoutError},
log::*, log::*,
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, solana_client::{
connection_cache::{ConnectionCache, Protocol},
tpu_connection::TpuConnection,
},
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_metrics::datapoint_warn, solana_metrics::datapoint_warn,
solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_runtime::{bank::Bank, bank_forks::BankForks},
@ -562,7 +565,12 @@ impl SendTransactionService {
stats: &SendTransactionServiceStats, stats: &SendTransactionServiceStats,
) { ) {
// Processing the transactions in batch // Processing the transactions in batch
let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config); let addresses = Self::get_tpu_addresses(
tpu_address,
leader_info,
config,
connection_cache.protocol(),
);
let wire_transactions = transactions let wire_transactions = transactions
.iter() .iter()
@ -689,7 +697,12 @@ impl SendTransactionService {
for chunk in iter { for chunk in iter {
let mut leader_info_provider = leader_info_provider.lock().unwrap(); let mut leader_info_provider = leader_info_provider.lock().unwrap();
let leader_info = leader_info_provider.get_leader_info(); let leader_info = leader_info_provider.get_leader_info();
let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config); let addresses = Self::get_tpu_addresses(
tpu_address,
leader_info,
config,
connection_cache.protocol(),
);
for address in &addresses { for address in &addresses {
Self::send_transactions(address, chunk, connection_cache, stats); Self::send_transactions(address, chunk, connection_cache, stats);
@ -748,10 +761,11 @@ impl SendTransactionService {
tpu_address: &'a SocketAddr, tpu_address: &'a SocketAddr,
leader_info: Option<&'a T>, leader_info: Option<&'a T>,
config: &'a Config, config: &'a Config,
protocol: Protocol,
) -> Vec<&'a SocketAddr> { ) -> Vec<&'a SocketAddr> {
let addresses = leader_info let addresses = leader_info
.as_ref() .as_ref()
.map(|leader_info| leader_info.get_leader_tpus(config.leader_forward_count)); .map(|leader_info| leader_info.get_leader_tpus(config.leader_forward_count, protocol));
addresses addresses
.map(|address_list| { .map(|address_list| {
if address_list.is_empty() { if address_list.is_empty() {

View File

@ -1,8 +1,8 @@
use std::net::SocketAddr; use {solana_client::connection_cache::Protocol, std::net::SocketAddr};
pub trait TpuInfo { pub trait TpuInfo {
fn refresh_recent_peers(&mut self); fn refresh_recent_peers(&mut self);
fn get_leader_tpus(&self, max_count: u64) -> Vec<&SocketAddr>; fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr>;
} }
#[derive(Clone)] #[derive(Clone)]
@ -10,7 +10,7 @@ pub struct NullTpuInfo;
impl TpuInfo for NullTpuInfo { impl TpuInfo for NullTpuInfo {
fn refresh_recent_peers(&mut self) {} fn refresh_recent_peers(&mut self) {}
fn get_leader_tpus(&self, _max_count: u64) -> Vec<&SocketAddr> { fn get_leader_tpus(&self, _max_count: u64, _protocol: Protocol) -> Vec<&SocketAddr> {
vec![] vec![]
} }
} }