adds quic receiver to shred-fetch-stage (#31576)

Working towards migrating turbine to QUIC.
This commit is contained in:
behzad nouri 2023-06-12 13:16:27 +00:00 committed by GitHub
parent 1ff7b28abc
commit aed4ecb633
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 82 additions and 11 deletions

View File

@ -3,7 +3,7 @@
use {
crate::{cluster_nodes::check_feature_activation, serve_repair::ServeRepair},
crossbeam_channel::{unbounded, Sender},
solana_gossip::cluster_info::ClusterInfo,
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol},
solana_ledger::shred::{should_discard_shred, ShredFetchStats},
solana_perf::packet::{PacketBatch, PacketBatchRecycler, PacketFlags},
solana_runtime::{bank::Bank, bank_forks::BankForks},
@ -11,7 +11,7 @@ use {
clock::{Slot, DEFAULT_MS_PER_SLOT},
feature_set,
},
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats},
solana_streamer::streamer::{self, PacketBatchReceiver, StakedNodes, StreamerReceiveStats},
std::{
net::UdpSocket,
sync::{
@ -23,6 +23,12 @@ use {
},
};
const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8;
const MAX_STAKED_QUIC_CONNECTIONS: usize = 2000;
const MAX_UNSTAKED_QUIC_CONNECTIONS: usize = 1000;
const QUIC_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(5);
const QUIC_COALESCE_WAIT: Duration = Duration::from_millis(10);
pub(crate) struct ShredFetchStage {
thread_hdls: Vec<JoinHandle<()>>,
}
@ -161,14 +167,17 @@ impl ShredFetchStage {
(streamers, modifier_hdl)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
sockets: Vec<Arc<UdpSocket>>,
quic_socket: UdpSocket,
forward_sockets: Vec<Arc<UdpSocket>>,
repair_socket: Arc<UdpSocket>,
sender: Sender<PacketBatch>,
shred_version: u16,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
staked_nodes: Arc<RwLock<StakedNodes>>,
turbine_disabled: Arc<AtomicBool>,
exit: Arc<AtomicBool>,
) -> Self {
@ -202,15 +211,15 @@ impl ShredFetchStage {
let (repair_receiver, repair_handler) = Self::packet_modifier(
vec![repair_socket.clone()],
exit,
sender,
exit.clone(),
sender.clone(),
recycler,
bank_forks,
bank_forks.clone(),
shred_version,
"shred_fetch_repair",
PacketFlags::REPAIR,
Some((repair_socket, cluster_info)),
turbine_disabled,
Some((repair_socket, cluster_info.clone())),
turbine_disabled.clone(),
);
tvu_threads.extend(tvu_forwards_threads.into_iter());
@ -219,6 +228,48 @@ impl ShredFetchStage {
tvu_threads.push(fwd_thread_hdl);
tvu_threads.push(repair_handler);
let keypair = cluster_info.keypair().clone();
let ip_addr = cluster_info
.my_contact_info()
.tvu(Protocol::QUIC)
.expect("Operator must spin up node with valid (QUIC) TVU address")
.ip();
let (packet_sender, packet_receiver) = unbounded();
let (_endpoint, join_handle) = solana_streamer::quic::spawn_server(
"quic_streamer_tvu",
quic_socket,
&keypair,
ip_addr,
packet_sender,
exit,
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes,
MAX_STAKED_QUIC_CONNECTIONS,
MAX_UNSTAKED_QUIC_CONNECTIONS,
QUIC_WAIT_FOR_CHUNK_TIMEOUT,
QUIC_COALESCE_WAIT,
)
.unwrap();
tvu_threads.push(join_handle);
tvu_threads.push(
Builder::new()
.name("solTvuFetchPMod".to_string())
.spawn(move || {
Self::modify_packets(
packet_receiver,
sender,
&bank_forks,
shred_version,
"shred_fetch_quic",
PacketFlags::empty(),
None, // repair_context
turbine_disabled,
)
})
.unwrap(),
);
Self {
thread_hdls: tvu_threads,
}

View File

@ -50,6 +50,7 @@ use {
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
solana_streamer::streamer::StakedNodes,
std::{
collections::HashSet,
net::UdpSocket,
@ -75,6 +76,7 @@ pub struct Tvu {
pub struct TvuSockets {
pub fetch: Vec<UdpSocket>,
pub(crate) fetch_quic: UdpSocket,
pub repair: UdpSocket,
pub retransmit: Vec<UdpSocket>,
pub forwards: Vec<UdpSocket>,
@ -138,10 +140,12 @@ impl Tvu {
connection_cache: &Arc<ConnectionCache>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
banking_tracer: Arc<BankingTracer>,
staked_nodes: Arc<RwLock<StakedNodes>>,
) -> Result<Self, String> {
let TvuSockets {
repair: repair_socket,
fetch: fetch_sockets,
fetch_quic: fetch_quic_socket,
retransmit: retransmit_sockets,
forwards: tvu_forward_sockets,
ancestor_hashes_requests: ancestor_hashes_socket,
@ -156,12 +160,14 @@ impl Tvu {
tvu_forward_sockets.into_iter().map(Arc::new).collect();
let fetch_stage = ShredFetchStage::new(
fetch_sockets,
fetch_quic_socket,
forward_sockets,
repair_socket.clone(),
fetch_sender,
tvu_config.shred_version,
bank_forks.clone(),
cluster_info.clone(),
staked_nodes,
turbine_disabled,
exit.clone(),
);
@ -441,6 +447,7 @@ pub mod tests {
repair: target1.sockets.repair,
retransmit: target1.sockets.retransmit_sockets,
fetch: target1.sockets.tvu,
fetch_quic: target1.sockets.tvu_quic,
forwards: target1.sockets.tvu_forwards,
ancestor_hashes_requests: target1.sockets.ancestor_hashes_requests,
}
@ -483,6 +490,7 @@ pub mod tests {
&Arc::new(ConnectionCache::new("connection_cache_test")),
&ignored_prioritization_fee_cache,
BankingTracer::new_disabled(),
Arc::<RwLock<StakedNodes>>::default(),
)
.expect("assume success");
exit.store(true, Ordering::Relaxed);

View File

@ -1117,6 +1117,7 @@ impl Validator {
repair: node.sockets.repair,
retransmit: node.sockets.retransmit_sockets,
fetch: node.sockets.tvu,
fetch_quic: node.sockets.tvu_quic,
forwards: node.sockets.tvu_forwards,
ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
},
@ -1158,6 +1159,7 @@ impl Validator {
&connection_cache,
&prioritization_fee_cache,
banking_tracer.clone(),
staked_nodes.clone(),
)?;
let tpu = Tpu::new(

View File

@ -2833,6 +2833,7 @@ pub struct Sockets {
pub ip_echo: Option<TcpListener>,
pub tvu: Vec<UdpSocket>,
pub tvu_forwards: Vec<UdpSocket>,
pub tvu_quic: UdpSocket,
pub tpu: Vec<UdpSocket>,
pub tpu_forwards: Vec<UdpSocket>,
pub tpu_vote: Vec<UdpSocket>,
@ -2867,7 +2868,8 @@ impl Node {
let (gossip_port, (gossip, ip_echo)) =
bind_common_in_range(localhost_ip_addr, port_range).unwrap();
let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port);
let tvu = UdpSocket::bind(&localhost_bind_addr).unwrap();
let ((_tvu_port, tvu), (_tvu_quic_port, tvu_quic)) =
bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
let tvu_forwards = UdpSocket::bind(&localhost_bind_addr).unwrap();
let ((_tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
@ -2924,6 +2926,7 @@ impl Node {
ip_echo: Some(ip_echo),
tvu: vec![tvu],
tvu_forwards: vec![tvu_forwards],
tvu_quic,
tpu: vec![tpu],
tpu_forwards: vec![tpu_forwards],
tpu_vote: vec![tpu_vote],
@ -2966,7 +2969,8 @@ impl Node {
) -> Self {
let (gossip_port, (gossip, ip_echo)) =
Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr);
let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range);
let ((tvu_port, tvu), (_tvu_quic_port, tvu_quic)) =
bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
let (tvu_forwards_port, tvu_forwards) = Self::bind(bind_ip_addr, port_range);
let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) =
bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
@ -3015,6 +3019,7 @@ impl Node {
ip_echo: Some(ip_echo),
tvu: vec![tvu],
tvu_forwards: vec![tvu_forwards],
tvu_quic,
tpu: vec![tpu],
tpu_forwards: vec![tpu_forwards],
tpu_vote: vec![tpu_vote],
@ -3042,7 +3047,10 @@ impl Node {
let (tvu_port, tvu_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tvu multi_bind");
let (_tvu_port_quic, tvu_quic) = Self::bind(
bind_ip_addr,
(tvu_port + QUIC_PORT_OFFSET, tvu_port + QUIC_PORT_OFFSET + 1),
);
let (tvu_forwards_port, tvu_forwards_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tvu_forwards multi_bind");
@ -3103,6 +3111,7 @@ impl Node {
gossip,
tvu: tvu_sockets,
tvu_forwards: tvu_forwards_sockets,
tvu_quic,
tpu: tpu_sockets,
tpu_forwards: tpu_forwards_sockets,
tpu_vote: tpu_vote_sockets,
@ -3294,6 +3303,7 @@ mod tests {
ip_echo: None,
tvu: vec![],
tvu_forwards: vec![],
tvu_quic: UdpSocket::bind("0.0.0.0:0").unwrap(),
tpu: vec![],
tpu_forwards: vec![],
tpu_vote: vec![],

View File

@ -29,7 +29,7 @@ pub struct UdpSocketPair {
pub type PortRange = (u16, u16);
pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000);
pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 13; // VALIDATOR_PORT_RANGE must be at least this wide
pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 14; // VALIDATOR_PORT_RANGE must be at least this wide
pub(crate) const HEADER_LENGTH: usize = 4;
pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23;