diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index e515d8653..3a317fc24 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -3,7 +3,7 @@ use { crate::{cluster_nodes::check_feature_activation, serve_repair::ServeRepair}, crossbeam_channel::{unbounded, Sender}, - solana_gossip::cluster_info::ClusterInfo, + solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol}, solana_ledger::shred::{should_discard_shred, ShredFetchStats}, solana_perf::packet::{PacketBatch, PacketBatchRecycler, PacketFlags}, solana_runtime::{bank::Bank, bank_forks::BankForks}, @@ -11,7 +11,7 @@ use { clock::{Slot, DEFAULT_MS_PER_SLOT}, feature_set, }, - solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats}, + solana_streamer::streamer::{self, PacketBatchReceiver, StakedNodes, StreamerReceiveStats}, std::{ net::UdpSocket, sync::{ @@ -23,6 +23,12 @@ use { }, }; +const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; +const MAX_STAKED_QUIC_CONNECTIONS: usize = 2000; +const MAX_UNSTAKED_QUIC_CONNECTIONS: usize = 1000; +const QUIC_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(5); +const QUIC_COALESCE_WAIT: Duration = Duration::from_millis(10); + pub(crate) struct ShredFetchStage { thread_hdls: Vec>, } @@ -161,14 +167,17 @@ impl ShredFetchStage { (streamers, modifier_hdl) } + #[allow(clippy::too_many_arguments)] pub(crate) fn new( sockets: Vec>, + quic_socket: UdpSocket, forward_sockets: Vec>, repair_socket: Arc, sender: Sender, shred_version: u16, bank_forks: Arc>, cluster_info: Arc, + staked_nodes: Arc>, turbine_disabled: Arc, exit: Arc, ) -> Self { @@ -202,15 +211,15 @@ impl ShredFetchStage { let (repair_receiver, repair_handler) = Self::packet_modifier( vec![repair_socket.clone()], - exit, - sender, + exit.clone(), + sender.clone(), recycler, - bank_forks, + bank_forks.clone(), shred_version, "shred_fetch_repair", PacketFlags::REPAIR, - Some((repair_socket, cluster_info)), - turbine_disabled, + Some((repair_socket, cluster_info.clone())), + turbine_disabled.clone(), ); tvu_threads.extend(tvu_forwards_threads.into_iter()); @@ -219,6 +228,48 @@ impl ShredFetchStage { tvu_threads.push(fwd_thread_hdl); tvu_threads.push(repair_handler); + let keypair = cluster_info.keypair().clone(); + let ip_addr = cluster_info + .my_contact_info() + .tvu(Protocol::QUIC) + .expect("Operator must spin up node with valid (QUIC) TVU address") + .ip(); + let (packet_sender, packet_receiver) = unbounded(); + let (_endpoint, join_handle) = solana_streamer::quic::spawn_server( + "quic_streamer_tvu", + quic_socket, + &keypair, + ip_addr, + packet_sender, + exit, + MAX_QUIC_CONNECTIONS_PER_PEER, + staked_nodes, + MAX_STAKED_QUIC_CONNECTIONS, + MAX_UNSTAKED_QUIC_CONNECTIONS, + QUIC_WAIT_FOR_CHUNK_TIMEOUT, + QUIC_COALESCE_WAIT, + ) + .unwrap(); + tvu_threads.push(join_handle); + + tvu_threads.push( + Builder::new() + .name("solTvuFetchPMod".to_string()) + .spawn(move || { + Self::modify_packets( + packet_receiver, + sender, + &bank_forks, + shred_version, + "shred_fetch_quic", + PacketFlags::empty(), + None, // repair_context + turbine_disabled, + ) + }) + .unwrap(), + ); + Self { thread_hdls: tvu_threads, } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 6575f4b4e..3c9ccb8ec 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -50,6 +50,7 @@ use { vote_sender_types::ReplayVoteSender, }, solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}, + solana_streamer::streamer::StakedNodes, std::{ collections::HashSet, net::UdpSocket, @@ -75,6 +76,7 @@ pub struct Tvu { pub struct TvuSockets { pub fetch: Vec, + pub(crate) fetch_quic: UdpSocket, pub repair: UdpSocket, pub retransmit: Vec, pub forwards: Vec, @@ -138,10 +140,12 @@ impl Tvu { connection_cache: &Arc, prioritization_fee_cache: &Arc, banking_tracer: Arc, + staked_nodes: Arc>, ) -> Result { let TvuSockets { repair: repair_socket, fetch: fetch_sockets, + fetch_quic: fetch_quic_socket, retransmit: retransmit_sockets, forwards: tvu_forward_sockets, ancestor_hashes_requests: ancestor_hashes_socket, @@ -156,12 +160,14 @@ impl Tvu { tvu_forward_sockets.into_iter().map(Arc::new).collect(); let fetch_stage = ShredFetchStage::new( fetch_sockets, + fetch_quic_socket, forward_sockets, repair_socket.clone(), fetch_sender, tvu_config.shred_version, bank_forks.clone(), cluster_info.clone(), + staked_nodes, turbine_disabled, exit.clone(), ); @@ -441,6 +447,7 @@ pub mod tests { repair: target1.sockets.repair, retransmit: target1.sockets.retransmit_sockets, fetch: target1.sockets.tvu, + fetch_quic: target1.sockets.tvu_quic, forwards: target1.sockets.tvu_forwards, ancestor_hashes_requests: target1.sockets.ancestor_hashes_requests, } @@ -483,6 +490,7 @@ pub mod tests { &Arc::new(ConnectionCache::new("connection_cache_test")), &ignored_prioritization_fee_cache, BankingTracer::new_disabled(), + Arc::>::default(), ) .expect("assume success"); exit.store(true, Ordering::Relaxed); diff --git a/core/src/validator.rs b/core/src/validator.rs index cb82e1c4a..fe20bf730 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1117,6 +1117,7 @@ impl Validator { repair: node.sockets.repair, retransmit: node.sockets.retransmit_sockets, fetch: node.sockets.tvu, + fetch_quic: node.sockets.tvu_quic, forwards: node.sockets.tvu_forwards, ancestor_hashes_requests: node.sockets.ancestor_hashes_requests, }, @@ -1158,6 +1159,7 @@ impl Validator { &connection_cache, &prioritization_fee_cache, banking_tracer.clone(), + staked_nodes.clone(), )?; let tpu = Tpu::new( diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index f086fe239..f8c2e1c76 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2833,6 +2833,7 @@ pub struct Sockets { pub ip_echo: Option, pub tvu: Vec, pub tvu_forwards: Vec, + pub tvu_quic: UdpSocket, pub tpu: Vec, pub tpu_forwards: Vec, pub tpu_vote: Vec, @@ -2867,7 +2868,8 @@ impl Node { let (gossip_port, (gossip, ip_echo)) = bind_common_in_range(localhost_ip_addr, port_range).unwrap(); let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port); - let tvu = UdpSocket::bind(&localhost_bind_addr).unwrap(); + let ((_tvu_port, tvu), (_tvu_quic_port, tvu_quic)) = + bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); let tvu_forwards = UdpSocket::bind(&localhost_bind_addr).unwrap(); let ((_tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); @@ -2924,6 +2926,7 @@ impl Node { ip_echo: Some(ip_echo), tvu: vec![tvu], tvu_forwards: vec![tvu_forwards], + tvu_quic, tpu: vec![tpu], tpu_forwards: vec![tpu_forwards], tpu_vote: vec![tpu_vote], @@ -2966,7 +2969,8 @@ impl Node { ) -> Self { let (gossip_port, (gossip, ip_echo)) = Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr); - let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range); + let ((tvu_port, tvu), (_tvu_quic_port, tvu_quic)) = + bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); let (tvu_forwards_port, tvu_forwards) = Self::bind(bind_ip_addr, port_range); let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); @@ -3015,6 +3019,7 @@ impl Node { ip_echo: Some(ip_echo), tvu: vec![tvu], tvu_forwards: vec![tvu_forwards], + tvu_quic, tpu: vec![tpu], tpu_forwards: vec![tpu_forwards], tpu_vote: vec![tpu_vote], @@ -3042,7 +3047,10 @@ impl Node { let (tvu_port, tvu_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tvu multi_bind"); - + let (_tvu_port_quic, tvu_quic) = Self::bind( + bind_ip_addr, + (tvu_port + QUIC_PORT_OFFSET, tvu_port + QUIC_PORT_OFFSET + 1), + ); let (tvu_forwards_port, tvu_forwards_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tvu_forwards multi_bind"); @@ -3103,6 +3111,7 @@ impl Node { gossip, tvu: tvu_sockets, tvu_forwards: tvu_forwards_sockets, + tvu_quic, tpu: tpu_sockets, tpu_forwards: tpu_forwards_sockets, tpu_vote: tpu_vote_sockets, @@ -3294,6 +3303,7 @@ mod tests { ip_echo: None, tvu: vec![], tvu_forwards: vec![], + tvu_quic: UdpSocket::bind("0.0.0.0:0").unwrap(), tpu: vec![], tpu_forwards: vec![], tpu_vote: vec![], diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 57d9d14cc..095ddc724 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -29,7 +29,7 @@ pub struct UdpSocketPair { pub type PortRange = (u16, u16); pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000); -pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 13; // VALIDATOR_PORT_RANGE must be at least this wide +pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 14; // VALIDATOR_PORT_RANGE must be at least this wide pub(crate) const HEADER_LENGTH: usize = 4; pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23;