From 2ee19f536a073490f53aa3a693569589affc72ca Mon Sep 17 00:00:00 2001 From: Trent Nelson Date: Wed, 20 Jul 2022 15:07:35 -0600 Subject: [PATCH] Revert "Revert "core: disable quic servers on mainnet-beta" (#26216)" This reverts commit 4a7fb2a8088fd0bc1ba977ae5d894b014fb613fc. --- core/src/tpu.rs | 69 +++++++++++++++----------- core/src/validator.rs | 17 ++++++- local-cluster/src/validator_configs.rs | 1 + validator/src/main.rs | 7 +++ 4 files changed, 63 insertions(+), 31 deletions(-) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 47f27520f..7f0be6319 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -62,8 +62,8 @@ pub struct Tpu { banking_stage: BankingStage, cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_stage: BroadcastStage, - tpu_quic_t: thread::JoinHandle<()>, - tpu_forwards_quic_t: thread::JoinHandle<()>, + tpu_quic_t: Option>, + tpu_forwards_quic_t: Option>, find_packet_sender_stake_stage: FindPacketSenderStakeStage, vote_find_packet_sender_stake_stage: FindPacketSenderStakeStage, staked_nodes_updater_service: StakedNodesUpdaterService, @@ -96,6 +96,7 @@ impl Tpu { connection_cache: &Arc, keypair: &Keypair, log_messages_bytes_limit: Option, + enable_quic_servers: bool, ) -> Self { let TpuSockets { transactions: transactions_sockets, @@ -153,33 +154,37 @@ impl Tpu { let (verified_sender, verified_receiver) = unbounded(); let stats = Arc::new(StreamStats::default()); - let tpu_quic_t = spawn_server( - transactions_quic_sockets, - keypair, - cluster_info.my_contact_info().tpu.ip(), - packet_sender, - exit.clone(), - MAX_QUIC_CONNECTIONS_PER_PEER, - staked_nodes.clone(), - MAX_STAKED_CONNECTIONS, - MAX_UNSTAKED_CONNECTIONS, - stats.clone(), - ) - .unwrap(); + let tpu_quic_t = enable_quic_servers.then(|| { + spawn_server( + transactions_quic_sockets, + keypair, + cluster_info.my_contact_info().tpu.ip(), + packet_sender, + exit.clone(), + MAX_QUIC_CONNECTIONS_PER_PEER, + staked_nodes.clone(), + MAX_STAKED_CONNECTIONS, + MAX_UNSTAKED_CONNECTIONS, + stats.clone(), + ) + .unwrap() + }); - let tpu_forwards_quic_t = spawn_server( - transactions_forwards_quic_sockets, - keypair, - cluster_info.my_contact_info().tpu_forwards.ip(), - forwarded_packet_sender, - exit.clone(), - MAX_QUIC_CONNECTIONS_PER_PEER, - staked_nodes, - MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS), - 0, // Prevent unstaked nodes from forwarding transactions - stats, - ) - .unwrap(); + let tpu_forwards_quic_t = enable_quic_servers.then(|| { + spawn_server( + transactions_forwards_quic_sockets, + keypair, + cluster_info.my_contact_info().tpu_forwards.ip(), + forwarded_packet_sender, + exit.clone(), + MAX_QUIC_CONNECTIONS_PER_PEER, + staked_nodes, + MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS), + 0, // Prevent unstaked nodes from forwarding transactions + stats, + ) + .unwrap() + }); let sigverify_stage = { let verifier = TransactionSigVerifier::new(verified_sender); @@ -266,9 +271,13 @@ impl Tpu { self.find_packet_sender_stake_stage.join(), self.vote_find_packet_sender_stake_stage.join(), self.staked_nodes_updater_service.join(), - self.tpu_quic_t.join(), - self.tpu_forwards_quic_t.join(), ]; + if let Some(tpu_quic_t) = self.tpu_quic_t { + tpu_quic_t.join()?; + } + if let Some(tpu_forwards_quic_t) = self.tpu_forwards_quic_t { + tpu_forwards_quic_t.join()?; + } let broadcast_result = self.broadcast_stage.join(); for result in results { result?; diff --git a/core/src/validator.rs b/core/src/validator.rs index 21e78a294..cb7ec8416 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -90,7 +90,7 @@ use { clock::Slot, epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET, exit::Exit, - genesis_config::GenesisConfig, + genesis_config::{ClusterType, GenesisConfig}, hash::Hash, pubkey::Pubkey, shred_version::compute_shred_version, @@ -176,6 +176,7 @@ pub struct ValidatorConfig { pub wait_to_vote_slot: Option, pub ledger_column_options: LedgerColumnOptions, pub runtime_config: RuntimeConfig, + pub enable_quic_servers: bool, } impl Default for ValidatorConfig { @@ -237,6 +238,7 @@ impl Default for ValidatorConfig { wait_to_vote_slot: None, ledger_column_options: LedgerColumnOptions::default(), runtime_config: RuntimeConfig::default(), + enable_quic_servers: false, } } } @@ -990,6 +992,18 @@ impl Validator { &connection_cache, ); + let enable_quic_servers = if genesis_config.cluster_type == ClusterType::MainnetBeta { + config.enable_quic_servers + } else { + if config.enable_quic_servers { + warn!( + "ignoring --enable-quic-servers. QUIC is always enabled for cluster type: {:?}", + genesis_config.cluster_type + ); + } + true + }; + let tpu = Tpu::new( &cluster_info, &poh_recorder, @@ -1022,6 +1036,7 @@ impl Validator { &connection_cache, &identity_keypair, config.runtime_config.log_messages_bytes_limit, + enable_quic_servers, ); datapoint_info!( diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 76f56251b..cf4d8d775 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -63,6 +63,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { wait_to_vote_slot: config.wait_to_vote_slot, ledger_column_options: config.ledger_column_options.clone(), runtime_config: config.runtime_config.clone(), + enable_quic_servers: config.enable_quic_servers, } } diff --git a/validator/src/main.rs b/validator/src/main.rs index d28f1d525..f18812cd8 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1210,6 +1210,11 @@ pub fn main() { .takes_value(false) .help("Use QUIC to send transactions."), ) + .arg( + Arg::with_name("enable_quic_servers") + .hidden(true) + .long("enable-quic-servers") + ) .arg( Arg::with_name("tpu_connection_pool_size") .long("tpu-connection-pool-size") @@ -2299,6 +2304,7 @@ pub fn main() { let accounts_shrink_optimize_total_space = value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool); let tpu_use_quic = matches.is_present("tpu_use_quic"); + let enable_quic_servers = matches.is_present("enable_quic_servers"); let tpu_connection_pool_size = value_t_or_exit!(matches, "tpu_connection_pool_size", usize); let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64); @@ -2642,6 +2648,7 @@ pub fn main() { log_messages_bytes_limit: value_of(&matches, "log_messages_bytes_limit"), ..RuntimeConfig::default() }, + enable_quic_servers, ..ValidatorConfig::default() };