diff --git a/Cargo.lock b/Cargo.lock index 51f09b7d0..9c9c5ec22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5514,6 +5514,7 @@ dependencies = [ "solana-perf", "solana-poh", "solana-program-runtime", + "solana-quic-client", "solana-rayon-threadlimit", "solana-rpc", "solana-rpc-client-api", diff --git a/core/Cargo.toml b/core/Cargo.toml index 01f20e3d2..379581953 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -50,6 +50,7 @@ solana-net-utils = { workspace = true } solana-perf = { workspace = true } solana-poh = { workspace = true } solana-program-runtime = { workspace = true } +solana-quic-client = { workspace = true } solana-rayon-threadlimit = { workspace = true } solana-rpc = { workspace = true } solana-rpc-client-api = { workspace = true } diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index 04eb85c2d..cd9a0a3b0 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -9,6 +9,7 @@ use { broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage, }, cluster_nodes::ClusterNodesCache, + validator::TURBINE_QUIC_CONNECTION_POOL_SIZE, }, solana_gossip::{ cluster_info::{ClusterInfo, Node}, @@ -18,16 +19,17 @@ use { genesis_utils::{create_genesis_config, GenesisConfigInfo}, shred::{Shred, ShredFlags}, }, + solana_quic_client::new_quic_connection_cache, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ pubkey, signature::{Keypair, Signer}, timing::{timestamp, AtomicInterval}, }, - solana_streamer::socket::SocketAddrSpace, + solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, std::{ collections::HashMap, - net::UdpSocket, + net::{IpAddr, Ipv4Addr, UdpSocket}, sync::{Arc, RwLock}, time::Duration, }, @@ -38,6 +40,14 @@ use { fn broadcast_shreds_bench(bencher: &mut Bencher) { solana_logger::setup(); let leader_keypair = Arc::new(Keypair::new()); + let quic_connection_cache = new_quic_connection_cache( + "connection_cache_test", + &leader_keypair, + IpAddr::V4(Ipv4Addr::LOCALHOST), + &Arc::>::default(), + TURBINE_QUIC_CONNECTION_POOL_SIZE, + ) + .unwrap(); let leader_info = Node::new_localhost_with_pubkey(&leader_keypair.pubkey()); let cluster_info = ClusterInfo::new( leader_info.info, @@ -45,7 +55,6 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { SocketAddrSpace::Unspecified, ); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new_for_benches(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); @@ -74,6 +83,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { &socket, &shreds, &cluster_nodes_cache, + &quic_connection_cache, &last_datapoint, &mut TransmitShredsStats::default(), &cluster_info, diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 9af88ae88..f06657518 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -6,7 +6,7 @@ extern crate test; use { crossbeam_channel::unbounded, log::*, - solana_core::retransmit_stage::retransmitter, + solana_core::{retransmit_stage::retransmitter, validator::TURBINE_QUIC_CONNECTION_POOL_SIZE}, solana_entry::entry::Entry, solana_gossip::{ cluster_info::{ClusterInfo, Node}, @@ -26,10 +26,10 @@ use { system_transaction, timing::timestamp, }, - solana_streamer::socket::SocketAddrSpace, + solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, std::{ iter::repeat_with, - net::{Ipv4Addr, UdpSocket}, + net::{IpAddr, Ipv4Addr, UdpSocket}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, RwLock, @@ -100,6 +100,16 @@ fn bench_retransmitter(bencher: &mut Bencher) { .collect(); let keypair = Keypair::new(); + let quic_connection_cache = Arc::new( + solana_quic_client::new_quic_connection_cache( + "connection_cache_test", + &keypair, + IpAddr::V4(Ipv4Addr::LOCALHOST), + &Arc::>::default(), + TURBINE_QUIC_CONNECTION_POOL_SIZE, + ) + .unwrap(), + ); let slot = 0; let parent = 0; let shredder = Shredder::new(slot, parent, 0, 0).unwrap(); @@ -118,6 +128,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { let retransmitter_handles = retransmitter( Arc::new(sockets), + quic_connection_cache, bank_forks, leader_schedule_cache, cluster_info, diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 3b6e4967c..859e8f168 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -9,11 +9,12 @@ use { standard_broadcast_run::StandardBroadcastRun, }, crate::{ - cluster_nodes::{ClusterNodes, ClusterNodesCache}, + cluster_nodes::{self, ClusterNodes, ClusterNodesCache}, result::{Error, Result}, }, crossbeam_channel::{unbounded, Receiver, RecvError, RecvTimeoutError, Sender}, - itertools::Itertools, + itertools::{Either, Itertools}, + solana_client::tpu_connection::TpuConnection, solana_gossip::{ cluster_info::{ClusterInfo, ClusterInfoError}, contact_info::Protocol, @@ -22,6 +23,7 @@ use { solana_measure::measure::Measure, solana_metrics::{inc_new_counter_error, inc_new_counter_info}, solana_poh::poh_recorder::WorkingBankEntry, + solana_quic_client::QuicConnectionCache, solana_runtime::bank_forks::BankForks, solana_sdk::{ clock::Slot, @@ -87,6 +89,7 @@ impl BroadcastStageType { blockstore: Arc, bank_forks: Arc>, shred_version: u16, + quic_connection_cache: Arc, ) -> BroadcastStage { match self { BroadcastStageType::Standard => BroadcastStage::new( @@ -97,7 +100,7 @@ impl BroadcastStageType { exit_sender, blockstore, bank_forks, - StandardBroadcastRun::new(shred_version), + StandardBroadcastRun::new(shred_version, quic_connection_cache), ), BroadcastStageType::FailEntryVerification => BroadcastStage::new( @@ -108,7 +111,7 @@ impl BroadcastStageType { exit_sender, blockstore, bank_forks, - FailEntryVerificationBroadcastRun::new(shred_version), + FailEntryVerificationBroadcastRun::new(shred_version, quic_connection_cache), ), BroadcastStageType::BroadcastFakeShreds => BroadcastStage::new( @@ -392,6 +395,7 @@ pub fn broadcast_shreds( s: &UdpSocket, shreds: &[Shred], cluster_nodes_cache: &ClusterNodesCache, + quic_connection_cache: &QuicConnectionCache, last_datapoint_submit: &AtomicInterval, transmit_stats: &mut TransmitShredsStats, cluster_info: &ClusterInfo, @@ -404,7 +408,7 @@ pub fn broadcast_shreds( let bank_forks = bank_forks.read().unwrap(); (bank_forks.root_bank(), bank_forks.working_bank()) }; - let packets: Vec<_> = shreds + let (packets, quic_packets): (Vec<_>, Vec<_>) = shreds .iter() .group_by(|shred| shred.slot()) .into_iter() @@ -413,26 +417,40 @@ pub fn broadcast_shreds( cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info); update_peer_stats(&cluster_nodes, last_datapoint_submit); shreds.filter_map(move |shred| { + let key = shred.id(); + let protocol = cluster_nodes::get_broadcast_protocol(&key); cluster_nodes - .get_broadcast_peer(&shred.id())? - .tvu(Protocol::UDP) + .get_broadcast_peer(&key)? + .tvu(protocol) .ok() .filter(|addr| socket_addr_space.check(addr)) - .map(|addr| (shred.payload(), addr)) + .map(|addr| { + (match protocol { + Protocol::QUIC => Either::Right, + Protocol::UDP => Either::Left, + })((shred.payload(), addr)) + }) }) }) - .collect(); + .partition_map(std::convert::identity); shred_select.stop(); transmit_stats.shred_select += shred_select.as_us(); let mut send_mmsg_time = Measure::start("send_mmsg"); if let Err(SendPktsError::IoError(ioerr, num_failed)) = batch_send(s, &packets[..]) { - transmit_stats.dropped_packets += num_failed; + transmit_stats.dropped_packets_udp += num_failed; result = Err(Error::Io(ioerr)); } send_mmsg_time.stop(); transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us(); - transmit_stats.total_packets += packets.len(); + for (shred, addr) in &quic_packets { + let conn = quic_connection_cache.get_connection(addr); + if let Err(err) = conn.send_data(shred) { + transmit_stats.dropped_packets_quic += 1; + result = Err(Error::from(err)); + } + } + transmit_stats.total_packets += packets.len() + quic_packets.len(); result } @@ -440,6 +458,7 @@ pub fn broadcast_shreds( pub mod test { use { super::*, + crate::validator::TURBINE_QUIC_CONNECTION_POOL_SIZE, crossbeam_channel::unbounded, solana_entry::entry::create_ticks, solana_gossip::cluster_info::{ClusterInfo, Node}, @@ -454,7 +473,9 @@ pub mod test { hash::Hash, signature::{Keypair, Signer}, }, + solana_streamer::streamer::StakedNodes, std::{ + net::{IpAddr, Ipv4Addr}, path::Path, sync::{atomic::AtomicBool, Arc}, thread::sleep, @@ -586,6 +607,16 @@ pub mod test { ) -> MockBroadcastStage { // Make the database ledger let blockstore = Arc::new(Blockstore::open(ledger_path).unwrap()); + let quic_connection_cache = Arc::new( + solana_quic_client::new_quic_connection_cache( + "connection_cache_test", + &leader_keypair, + IpAddr::V4(Ipv4Addr::LOCALHOST), + &Arc::>::default(), + TURBINE_QUIC_CONNECTION_POOL_SIZE, + ) + .unwrap(), + ); // Make the leader node and scheduler let leader_info = Node::new_localhost_with_pubkey(&leader_keypair.pubkey()); @@ -619,7 +650,7 @@ pub mod test { exit_sender, blockstore.clone(), bank_forks, - StandardBroadcastRun::new(0), + StandardBroadcastRun::new(0, quic_connection_cache), ); MockBroadcastStage { diff --git a/core/src/broadcast_stage/broadcast_metrics.rs b/core/src/broadcast_stage/broadcast_metrics.rs index cfa49b01e..fe5149847 100644 --- a/core/src/broadcast_stage/broadcast_metrics.rs +++ b/core/src/broadcast_stage/broadcast_metrics.rs @@ -21,7 +21,8 @@ pub struct TransmitShredsStats { pub shred_select: u64, pub num_shreds: usize, pub total_packets: usize, - pub dropped_packets: usize, + pub(crate) dropped_packets_udp: usize, + pub(crate) dropped_packets_quic: usize, } impl BroadcastStats for TransmitShredsStats { @@ -32,7 +33,8 @@ impl BroadcastStats for TransmitShredsStats { self.num_shreds += new_stats.num_shreds; self.shred_select += new_stats.shred_select; self.total_packets += new_stats.total_packets; - self.dropped_packets += new_stats.dropped_packets; + self.dropped_packets_udp += new_stats.dropped_packets_udp; + self.dropped_packets_quic += new_stats.dropped_packets_quic; } fn report_stats(&mut self, slot: Slot, slot_start: Instant, was_interrupted: bool) { if was_interrupted { @@ -45,7 +47,12 @@ impl BroadcastStats for TransmitShredsStats { ("num_shreds", self.num_shreds as i64, i64), ("shred_select", self.shred_select as i64, i64), ("total_packets", self.total_packets as i64, i64), - ("dropped_packets", self.dropped_packets as i64, i64), + ("dropped_packets_udp", self.dropped_packets_udp as i64, i64), + ( + "dropped_packets_quic", + self.dropped_packets_quic as i64, + i64 + ), ); } else { datapoint_info!( @@ -64,7 +71,12 @@ impl BroadcastStats for TransmitShredsStats { ("num_shreds", self.num_shreds as i64, i64), ("shred_select", self.shred_select as i64, i64), ("total_packets", self.total_packets as i64, i64), - ("dropped_packets", self.dropped_packets as i64, i64), + ("dropped_packets_udp", self.dropped_packets_udp as i64, i64), + ( + "dropped_packets_quic", + self.dropped_packets_quic as i64, + i64 + ), ); } } @@ -210,7 +222,8 @@ mod test { shred_select: 4, num_shreds: 5, total_packets: 6, - dropped_packets: 7, + dropped_packets_udp: 7, + dropped_packets_quic: 8, }, &Some(BroadcastShredBatchInfo { slot: 0, @@ -230,7 +243,8 @@ mod test { assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4); assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5); assert_eq!(slot_0_stats.broadcast_shred_stats.total_packets, 6); - assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets, 7); + assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets_udp, 7); + assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets_quic, 8); slot_broadcast_stats.update( &TransmitShredsStats { @@ -240,7 +254,8 @@ mod test { shred_select: 14, num_shreds: 15, total_packets: 16, - dropped_packets: 17, + dropped_packets_udp: 17, + dropped_packets_quic: 18, }, &None, ); @@ -255,7 +270,8 @@ mod test { assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4); assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5); assert_eq!(slot_0_stats.broadcast_shred_stats.total_packets, 6); - assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets, 7); + assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets_udp, 7); + assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets_quic, 8); // If another batch is given, then total number of batches == num_expected_batches == 2, // so the batch should be purged from the HashMap @@ -267,7 +283,8 @@ mod test { shred_select: 1, num_shreds: 1, total_packets: 1, - dropped_packets: 1, + dropped_packets_udp: 1, + dropped_packets_quic: 1, }, &Some(BroadcastShredBatchInfo { slot: 0, diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index e7b899ab0..4e97cff12 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -17,11 +17,12 @@ pub(super) struct FailEntryVerificationBroadcastRun { next_shred_index: u32, next_code_index: u32, cluster_nodes_cache: Arc>, + quic_connection_cache: Arc, reed_solomon_cache: Arc, } impl FailEntryVerificationBroadcastRun { - pub(super) fn new(shred_version: u16) -> Self { + pub(super) fn new(shred_version: u16, quic_connection_cache: Arc) -> Self { let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, CLUSTER_NODES_CACHE_TTL, @@ -33,6 +34,7 @@ impl FailEntryVerificationBroadcastRun { next_shred_index: 0, next_code_index: 0, cluster_nodes_cache, + quic_connection_cache, reed_solomon_cache: Arc::::default(), } } @@ -168,6 +170,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { sock, &shreds, &self.cluster_nodes_cache, + &self.quic_connection_cache, &AtomicInterval::default(), &mut TransmitShredsStats::default(), cluster_info, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 5c69145c9..2fba2fed3 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -33,6 +33,7 @@ pub struct StandardBroadcastRun { last_datapoint_submit: Arc, num_batches: usize, cluster_nodes_cache: Arc>, + quic_connection_cache: Arc, reed_solomon_cache: Arc, } @@ -42,7 +43,7 @@ enum BroadcastError { } impl StandardBroadcastRun { - pub(super) fn new(shred_version: u16) -> Self { + pub(super) fn new(shred_version: u16, quic_connection_cache: Arc) -> Self { let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, CLUSTER_NODES_CACHE_TTL, @@ -58,6 +59,7 @@ impl StandardBroadcastRun { last_datapoint_submit: Arc::default(), num_batches: 0, cluster_nodes_cache, + quic_connection_cache, reed_solomon_cache: Arc::::default(), } } @@ -413,6 +415,7 @@ impl StandardBroadcastRun { sock, &shreds, &self.cluster_nodes_cache, + &self.quic_connection_cache, &self.last_datapoint_submit, &mut transmit_stats, cluster_info, @@ -506,6 +509,7 @@ fn should_use_merkle_variant(slot: Slot, cluster_type: ClusterType, shred_versio mod test { use { super::*, + crate::validator::TURBINE_QUIC_CONNECTION_POOL_SIZE, solana_entry::entry::create_ticks, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ @@ -517,8 +521,13 @@ mod test { genesis_config::GenesisConfig, signature::{Keypair, Signer}, }, - solana_streamer::socket::SocketAddrSpace, - std::{ops::Deref, sync::Arc, time::Duration}, + solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, + std::{ + net::{IpAddr, Ipv4Addr}, + ops::Deref, + sync::Arc, + time::Duration, + }, }; #[allow(clippy::type_complexity)] @@ -564,10 +573,24 @@ mod test { ) } + fn new_quic_connection_cache(keypair: &Keypair) -> Arc { + Arc::new( + solana_quic_client::new_quic_connection_cache( + "connection_cache_test", + keypair, + IpAddr::V4(Ipv4Addr::LOCALHOST), + &Arc::>::default(), + TURBINE_QUIC_CONNECTION_POOL_SIZE, + ) + .unwrap(), + ) + } + #[test] fn test_interrupted_slot_last_shred() { let keypair = Arc::new(Keypair::new()); - let mut run = StandardBroadcastRun::new(0); + let quic_connection_cache = new_quic_connection_cache(&keypair); + let mut run = StandardBroadcastRun::new(0, quic_connection_cache); // Set up the slot to be interrupted let next_shred_index = 10; @@ -609,6 +632,7 @@ mod test { let num_shreds_per_slot = 2; let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket, bank_forks) = setup(num_shreds_per_slot); + let quic_connection_cache = new_quic_connection_cache(&leader_keypair); // Insert 1 less than the number of ticks needed to finish the slot let ticks0 = create_ticks(genesis_config.ticks_per_slot - 1, 0, genesis_config.hash()); @@ -621,7 +645,7 @@ mod test { }; // Step 1: Make an incomplete transmission for slot 0 - let mut standard_broadcast_run = StandardBroadcastRun::new(0); + let mut standard_broadcast_run = StandardBroadcastRun::new(0, quic_connection_cache); standard_broadcast_run .test_process_receive_results( &leader_keypair, @@ -739,10 +763,11 @@ mod test { let num_shreds_per_slot = 2; let (blockstore, genesis_config, _cluster_info, bank, leader_keypair, _socket, _bank_forks) = setup(num_shreds_per_slot); + let quic_connection_cache = new_quic_connection_cache(&leader_keypair); let (bsend, brecv) = unbounded(); let (ssend, _srecv) = unbounded(); let mut last_tick_height = 0; - let mut standard_broadcast_run = StandardBroadcastRun::new(0); + let mut standard_broadcast_run = StandardBroadcastRun::new(0, quic_connection_cache); let mut process_ticks = |num_ticks| { let ticks = create_ticks(num_ticks, 0, genesis_config.hash()); last_tick_height += (ticks.len() - 1) as u64; @@ -787,6 +812,7 @@ mod test { let num_shreds_per_slot = 2; let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket, bank_forks) = setup(num_shreds_per_slot); + let quic_connection_cache = new_quic_connection_cache(&leader_keypair); // Insert complete slot of ticks needed to finish the slot let ticks = create_ticks(genesis_config.ticks_per_slot, 0, genesis_config.hash()); @@ -798,7 +824,7 @@ mod test { last_tick_height: ticks.len() as u64, }; - let mut standard_broadcast_run = StandardBroadcastRun::new(0); + let mut standard_broadcast_run = StandardBroadcastRun::new(0, quic_connection_cache); standard_broadcast_run .test_process_receive_results( &leader_keypair, @@ -815,9 +841,10 @@ mod test { #[test] fn entries_to_shreds_max() { solana_logger::setup(); - let mut bs = StandardBroadcastRun::new(0); - bs.current_slot_and_parent = Some((1, 0)); let keypair = Keypair::new(); + let quic_connection_cache = new_quic_connection_cache(&keypair); + let mut bs = StandardBroadcastRun::new(0, quic_connection_cache); + bs.current_slot_and_parent = Some((1, 0)); let entries = create_ticks(10_000, 1, solana_sdk::hash::Hash::default()); let ledger_path = get_tmp_ledger_path!(); diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 28e9396f5..3cc36b307 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -176,10 +176,11 @@ impl ClusterNodes { addrs, frwds, } = self.get_retransmit_peers(slot_leader, shred, root_bank, fanout)?; + let protocol = get_broadcast_protocol(shred); if neighbors.is_empty() { let peers = children.into_iter().filter_map(|node| { node.contact_info()? - .tvu(Protocol::UDP) + .tvu(protocol) .ok() .filter(|addr| addrs.get(addr) == Some(&node.pubkey())) }); @@ -209,7 +210,7 @@ impl ClusterNodes { }) .chain(children.into_iter().filter_map(|node| { node.contact_info()? - .tvu(Protocol::UDP) + .tvu(protocol) .ok() .filter(|addr| addrs.get(addr) == Some(&node.pubkey())) })); @@ -239,12 +240,13 @@ impl ClusterNodes { let mut frwds = HashMap::::with_capacity(self.nodes.len()); let mut rng = ChaChaRng::from_seed(shred_seed); let drop_redundant_turbine_path = drop_redundant_turbine_path(shred.slot(), root_bank); + let protocol = get_broadcast_protocol(shred); let nodes: Vec<_> = weighted_shuffle .shuffle(&mut rng) .map(|index| &self.nodes[index]) .inspect(|node| { if let Some(node) = node.contact_info() { - if let Ok(addr) = node.tvu(Protocol::UDP) { + if let Ok(addr) = node.tvu(protocol) { addrs.entry(addr).or_insert(*node.pubkey()); } if !drop_redundant_turbine_path { @@ -469,6 +471,11 @@ impl From for NodeId { } } +#[inline] +pub(crate) fn get_broadcast_protocol(_: &ShredId) -> Protocol { + Protocol::UDP +} + pub fn make_test_cluster( rng: &mut R, num_nodes: usize, diff --git a/core/src/result.rs b/core/src/result.rs index 3491a5dab..5dcaacb3f 100644 --- a/core/src/result.rs +++ b/core/src/result.rs @@ -27,6 +27,8 @@ pub enum Error { RecvTimeout(#[from] crossbeam_channel::RecvTimeoutError), #[error("Send")] Send, + #[error(transparent)] + TransportError(#[from] solana_sdk::transport::TransportError), #[error("TrySend")] TrySend, #[error(transparent)] diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 96eeffff8..8fd77aa88 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -8,15 +8,15 @@ use { lru::LruCache, rand::Rng, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, - solana_gossip::{ - cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo, - }, + solana_client::tpu_connection::TpuConnection, + solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol}, solana_ledger::{ leader_schedule_cache::LeaderScheduleCache, shred::{self, ShredId}, }, solana_measure::measure::Measure, solana_perf::deduper::Deduper, + solana_quic_client::QuicConnectionCache, solana_rayon_threadlimit::get_thread_count, solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, solana_rpc_client_api::response::SlotUpdate, @@ -173,6 +173,7 @@ fn retransmit( cluster_info: &ClusterInfo, shreds_receiver: &Receiver>>, sockets: &[UdpSocket], + quic_connection_cache: &QuicConnectionCache, stats: &mut RetransmitStats, cluster_nodes_cache: &ClusterNodesCache, shred_deduper: &mut ShredDeduper<2>, @@ -259,6 +260,7 @@ fn retransmit( &cluster_nodes, socket_addr_space, &sockets[index % sockets.len()], + quic_connection_cache, stats, ) .map_err(|err| { @@ -283,6 +285,7 @@ fn retransmit( &cluster_nodes, socket_addr_space, &sockets[index % sockets.len()], + quic_connection_cache, stats, ) .map_err(|err| { @@ -311,6 +314,7 @@ fn retransmit_shred( cluster_nodes: &ClusterNodes, socket_addr_space: &SocketAddrSpace, socket: &UdpSocket, + quic_connection_cache: &QuicConnectionCache, stats: &RetransmitStats, ) -> Result<(/*root_distance:*/ usize, /*num_nodes:*/ usize), Error> { let mut compute_turbine_peers = Measure::start("turbine_start"); @@ -319,7 +323,7 @@ fn retransmit_shred( cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, data_plane_fanout)?; let addrs: Vec<_> = addrs .into_iter() - .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) + .filter(|addr| socket_addr_space.check(addr)) .collect(); compute_turbine_peers.stop(); stats @@ -327,22 +331,33 @@ fn retransmit_shred( .fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed); let mut retransmit_time = Measure::start("retransmit_to"); - let num_nodes = match multi_target_send(socket, shred, &addrs) { - Ok(()) => addrs.len(), - Err(SendPktsError::IoError(ioerr, num_failed)) => { - stats - .num_addrs_failed - .fetch_add(num_failed, Ordering::Relaxed); - error!( - "retransmit_to multi_target_send error: {:?}, {}/{} packets failed", - ioerr, - num_failed, - addrs.len(), - ); - addrs.len() - num_failed - } + let num_nodes = match cluster_nodes::get_broadcast_protocol(key) { + Protocol::QUIC => addrs + .iter() + .filter_map(|addr| { + quic_connection_cache + .get_connection(addr) + .send_data(shred) + .ok() + }) + .count(), + Protocol::UDP => match multi_target_send(socket, shred, &addrs) { + Ok(()) => addrs.len(), + Err(SendPktsError::IoError(ioerr, num_failed)) => { + error!( + "retransmit_to multi_target_send error: {:?}, {}/{} packets failed", + ioerr, + num_failed, + addrs.len(), + ); + addrs.len() - num_failed + } + }, }; retransmit_time.stop(); + stats + .num_addrs_failed + .fetch_add(addrs.len() - num_nodes, Ordering::Relaxed); stats.num_nodes.fetch_add(num_nodes, Ordering::Relaxed); stats .retransmit_total @@ -360,6 +375,7 @@ fn retransmit_shred( /// * `r` - Receive channel for shreds to be retransmitted to all the layer 1 nodes. pub fn retransmitter( sockets: Arc>, + quic_connection_cache: Arc, bank_forks: Arc>, leader_schedule_cache: Arc, cluster_info: Arc, @@ -391,6 +407,7 @@ pub fn retransmitter( &cluster_info, &shreds_receiver, &sockets, + &quic_connection_cache, &mut stats, &cluster_nodes_cache, &mut shred_deduper, @@ -415,12 +432,14 @@ impl RetransmitStage { leader_schedule_cache: Arc, cluster_info: Arc, retransmit_sockets: Arc>, + quic_connection_cache: Arc, retransmit_receiver: Receiver>>, max_slots: Arc, rpc_subscriptions: Option>, ) -> Self { let retransmit_thread_handle = retransmitter( retransmit_sockets, + quic_connection_cache, bank_forks, leader_schedule_cache, cluster_info, diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 3a317fc24..f0e01d5df 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -24,8 +24,8 @@ use { }; const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; -const MAX_STAKED_QUIC_CONNECTIONS: usize = 2000; -const MAX_UNSTAKED_QUIC_CONNECTIONS: usize = 1000; +const MAX_STAKED_QUIC_CONNECTIONS: usize = 4000; +const MAX_UNSTAKED_QUIC_CONNECTIONS: usize = 2000; const QUIC_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(5); const QUIC_COALESCE_WAIT: Duration = Duration::from_millis(10); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 515c2c5e7..3847f144c 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -26,6 +26,7 @@ use { entry_notifier_service::EntryNotifierSender, }, solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry}, + solana_quic_client::QuicConnectionCache, solana_rpc::{ optimistically_confirmed_bank_tracker::BankNotificationSender, rpc_subscriptions::RpcSubscriptions, @@ -101,6 +102,7 @@ impl Tpu { tpu_coalesce: Duration, cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender, connection_cache: &Arc, + turbine_quic_connection_cache: Arc, keypair: &Keypair, log_messages_bytes_limit: Option, staked_nodes: &Arc>, @@ -254,6 +256,7 @@ impl Tpu { blockstore.clone(), bank_forks, shred_version, + turbine_quic_connection_cache, ); Self { diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 3c9ccb8ec..55cac180b 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -40,6 +40,7 @@ use { entry_notifier_service::EntryNotifierSender, leader_schedule_cache::LeaderScheduleCache, }, solana_poh::poh_recorder::PohRecorder, + solana_quic_client::QuicConnectionCache, solana_rpc::{ max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig, rpc_subscriptions::RpcSubscriptions, @@ -141,6 +142,7 @@ impl Tvu { prioritization_fee_cache: &Arc, banking_tracer: Arc, staked_nodes: Arc>, + quic_connection_cache: Arc, ) -> Result { let TvuSockets { repair: repair_socket, @@ -188,6 +190,7 @@ impl Tvu { leader_schedule_cache.clone(), cluster_info.clone(), Arc::new(retransmit_sockets), + quic_connection_cache, retransmit_receiver, max_slots.clone(), Some(rpc_subscriptions.clone()), @@ -374,6 +377,7 @@ impl Tvu { pub mod tests { use { super::*, + crate::validator::TURBINE_QUIC_CONNECTION_POOL_SIZE, serial_test::serial, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ @@ -387,7 +391,10 @@ pub mod tests { solana_runtime::bank::Bank, solana_sdk::signature::{Keypair, Signer}, solana_streamer::socket::SocketAddrSpace, - std::sync::atomic::{AtomicU64, Ordering}, + std::{ + net::{IpAddr, Ipv4Addr}, + sync::atomic::{AtomicU64, Ordering}, + }, }; #[ignore] @@ -404,12 +411,20 @@ pub mod tests { let bank_forks = BankForks::new(Bank::new_for_tests(&genesis_config)); - //start cluster_info1 - let cluster_info1 = ClusterInfo::new( - target1.info.clone(), - Arc::new(Keypair::new()), - SocketAddrSpace::Unspecified, + let keypair = Arc::new(Keypair::new()); + let quic_connection_cache = Arc::new( + solana_quic_client::new_quic_connection_cache( + "connection_cache_test", + &keypair, + IpAddr::V4(Ipv4Addr::LOCALHOST), + &Arc::>::default(), + TURBINE_QUIC_CONNECTION_POOL_SIZE, + ) + .unwrap(), ); + //start cluster_info1 + let cluster_info1 = + ClusterInfo::new(target1.info.clone(), keypair, SocketAddrSpace::Unspecified); cluster_info1.insert_info(leader.info); let cref1 = Arc::new(cluster_info1); @@ -491,6 +506,7 @@ pub mod tests { &ignored_prioritization_fee_cache, BankingTracer::new_disabled(), Arc::>::default(), + quic_connection_cache, ) .expect("assume success"); exit.store(true, Ordering::Relaxed); diff --git a/core/src/validator.rs b/core/src/validator.rs index fe20bf730..1e5cbb8af 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -129,6 +129,8 @@ use { const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000; const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80; +pub const TURBINE_QUIC_CONNECTION_POOL_SIZE: usize = 4; + #[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)] #[strum(serialize_all = "kebab-case")] pub enum BlockVerificationMethod { @@ -1107,6 +1109,19 @@ impl Validator { let entry_notification_sender = entry_notifier_service .as_ref() .map(|service| service.sender_cloned()); + let turbine_quic_connection_cache = Arc::new( + solana_quic_client::new_quic_connection_cache( + "connection_cache_tvu_quic", + &identity_keypair, + node.info + .tvu(Protocol::QUIC) + .expect("Operator must spin up node with valid TVU address") + .ip(), + &staked_nodes, + TURBINE_QUIC_CONNECTION_POOL_SIZE, + ) + .unwrap(), + ); let (replay_vote_sender, replay_vote_receiver) = unbounded(); let tvu = Tvu::new( vote_account, @@ -1160,6 +1175,7 @@ impl Validator { &prioritization_fee_cache, banking_tracer.clone(), staked_nodes.clone(), + turbine_quic_connection_cache.clone(), )?; let tpu = Tpu::new( @@ -1192,6 +1208,7 @@ impl Validator { config.tpu_coalesce, cluster_confirmed_slot_sender, &connection_cache, + turbine_quic_connection_cache, &identity_keypair, config.runtime_config.log_messages_bytes_limit, &staked_nodes, diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index d98491b1f..606ca28a3 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -4744,6 +4744,7 @@ dependencies = [ "solana-perf", "solana-poh", "solana-program-runtime", + "solana-quic-client", "solana-rayon-threadlimit", "solana-rpc", "solana-rpc-client-api", diff --git a/quic-client/src/lib.rs b/quic-client/src/lib.rs index 65f2444e6..5560f018e 100644 --- a/quic-client/src/lib.rs +++ b/quic-client/src/lib.rs @@ -18,12 +18,15 @@ use { rcgen::RcgenError, solana_connection_cache::{ connection_cache::{ - BaseClientConnection, ClientError, ConnectionManager, ConnectionPool, + BaseClientConnection, ClientError, ConnectionCache, ConnectionManager, ConnectionPool, ConnectionPoolError, Protocol, }, connection_cache_stats::ConnectionCacheStats, }, - solana_sdk::{pubkey::Pubkey, signature::Keypair}, + solana_sdk::{ + pubkey::Pubkey, + signature::{Keypair, Signer}, + }, solana_streamer::{ nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType}, streamer::StakedNodes, @@ -214,6 +217,23 @@ impl QuicConnectionManager { Self { connection_config } } } + +pub type QuicConnectionCache = ConnectionCache; + +pub fn new_quic_connection_cache( + name: &'static str, + keypair: &Keypair, + ipaddr: IpAddr, + staked_nodes: &Arc>, + connection_pool_size: usize, +) -> Result { + let mut config = QuicConfig::new()?; + config.update_client_certificate(keypair, ipaddr)?; + config.set_staked_nodes(staked_nodes, &keypair.pubkey()); + let connection_manager = QuicConnectionManager::new_with_connection_config(config); + ConnectionCache::new(name, connection_manager, connection_pool_size) +} + #[cfg(test)] mod tests { use {