diff --git a/cli/src/cli.rs b/cli/src/cli.rs index e64398396c..22fbfd2c2f 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -30,6 +30,7 @@ use { stake::{instruction::LockupArgs, state::Lockup}, transaction::{TransactionError, VersionedTransaction}, }, + solana_tpu_client::connection_cache::DEFAULT_TPU_ENABLE_UDP, solana_vote_program::vote_state::VoteAuthorize, std::{collections::HashMap, error, io::stdout, str::FromStr, sync::Arc, time::Duration}, thiserror::Error, @@ -550,7 +551,7 @@ impl Default for CliConfig<'_> { u64::from_str(DEFAULT_CONFIRM_TX_TIMEOUT_SECONDS).unwrap(), ), address_labels: HashMap::new(), - use_quic: false, + use_quic: !DEFAULT_TPU_ENABLE_UDP, } } } diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 93d8ed28ac..701e4f58e8 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -16,6 +16,7 @@ use { solana_streamer::streamer::{ self, PacketBatchReceiver, PacketBatchSender, StreamerReceiveStats, }, + solana_tpu_client::connection_cache::DEFAULT_TPU_ENABLE_UDP, std::{ net::UdpSocket, sync::{ @@ -57,6 +58,7 @@ impl FetchStage { poh_recorder, coalesce_ms, None, + DEFAULT_TPU_ENABLE_UDP, ), receiver, vote_receiver, @@ -76,6 +78,7 @@ impl FetchStage { poh_recorder: &Arc>, coalesce_ms: u64, in_vote_only_mode: Option>, + tpu_enable_udp: bool, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect(); @@ -92,6 +95,7 @@ impl FetchStage { poh_recorder, coalesce_ms, in_vote_only_mode, + tpu_enable_udp, ) } @@ -150,42 +154,52 @@ impl FetchStage { poh_recorder: &Arc>, coalesce_ms: u64, in_vote_only_mode: Option>, + tpu_enable_udp: bool, ) -> Self { let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024); let tpu_stats = Arc::new(StreamerReceiveStats::new("tpu_receiver")); - let tpu_threads: Vec<_> = tpu_sockets - .into_iter() - .map(|socket| { - streamer::receiver( - socket, - exit.clone(), - sender.clone(), - recycler.clone(), - tpu_stats.clone(), - coalesce_ms, - true, - in_vote_only_mode.clone(), - ) - }) - .collect(); + + let tpu_threads: Vec<_> = if tpu_enable_udp { + tpu_sockets + .into_iter() + .map(|socket| { + streamer::receiver( + socket, + exit.clone(), + sender.clone(), + recycler.clone(), + tpu_stats.clone(), + coalesce_ms, + true, + in_vote_only_mode.clone(), + ) + }) + .collect() + } else { + Vec::default() + }; let tpu_forward_stats = Arc::new(StreamerReceiveStats::new("tpu_forwards_receiver")); - let tpu_forwards_threads: Vec<_> = tpu_forwards_sockets - .into_iter() - .map(|socket| { - streamer::receiver( - socket, - exit.clone(), - forward_sender.clone(), - recycler.clone(), - tpu_forward_stats.clone(), - coalesce_ms, - true, - in_vote_only_mode.clone(), - ) - }) - .collect(); + let tpu_forwards_threads: Vec<_> = if tpu_enable_udp { + tpu_forwards_sockets + .into_iter() + .map(|socket| { + streamer::receiver( + socket, + exit.clone(), + forward_sender.clone(), + recycler.clone(), + tpu_forward_stats.clone(), + coalesce_ms, + true, + in_vote_only_mode.clone(), + ) + }) + .collect() + } else { + Vec::default() + }; let tpu_vote_stats = Arc::new(StreamerReceiveStats::new("tpu_vote_receiver")); let tpu_vote_threads: Vec<_> = tpu_vote_sockets diff --git a/core/src/tpu.rs b/core/src/tpu.rs index e2378929cf..27c7a7c6b2 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -99,6 +99,7 @@ impl Tpu { log_messages_bytes_limit: Option, staked_nodes: &Arc>, shared_staked_nodes_overrides: Arc>>, + tpu_enable_udp: bool, ) -> Self { let TpuSockets { transactions: transactions_sockets, @@ -124,6 +125,7 @@ impl Tpu { poh_recorder, tpu_coalesce_ms, Some(bank_forks.read().unwrap().get_vote_only_mode_signal()), + tpu_enable_udp, ); let staked_nodes_updater_service = StakedNodesUpdaterService::new( diff --git a/core/src/validator.rs b/core/src/validator.rs index 78a27517c4..399d4d1356 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -381,6 +381,7 @@ impl Validator { socket_addr_space: SocketAddrSpace, use_quic: bool, tpu_connection_pool_size: usize, + tpu_enable_udp: bool, ) -> Result { let id = identity_keypair.pubkey(); assert_eq!(id, node.info.id); @@ -1020,6 +1021,7 @@ impl Validator { config.runtime_config.log_messages_bytes_limit, &staked_nodes, config.staked_nodes_overrides.clone(), + tpu_enable_udp, ); datapoint_info!( @@ -2171,7 +2173,7 @@ mod tests { solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader}, solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig}, solana_tpu_client::connection_cache::{ - DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, + DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, }, std::{fs::remove_dir_all, thread, time::Duration}, }; @@ -2208,6 +2210,7 @@ mod tests { SocketAddrSpace::Unspecified, DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, + DEFAULT_TPU_ENABLE_UDP, ) .expect("assume successful validator start"); assert_eq!( @@ -2292,6 +2295,7 @@ mod tests { SocketAddrSpace::Unspecified, DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, + DEFAULT_TPU_ENABLE_UDP, ) .expect("assume successful validator start") }) diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 4cd28a6a2c..660e21776f 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -43,7 +43,8 @@ use { solana_streamer::socket::SocketAddrSpace, solana_thin_client::thin_client::ThinClient, solana_tpu_client::connection_cache::{ - ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, + ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, + DEFAULT_TPU_USE_QUIC, }, solana_vote_program::{ vote_instruction, @@ -278,6 +279,7 @@ impl LocalCluster { socket_addr_space, DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, + DEFAULT_TPU_ENABLE_UDP, ) .expect("assume successful validator start"); @@ -477,6 +479,7 @@ impl LocalCluster { socket_addr_space, DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, + DEFAULT_TPU_ENABLE_UDP, ) .expect("assume successful validator start"); @@ -837,6 +840,7 @@ impl Cluster for LocalCluster { socket_addr_space, DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, + DEFAULT_TPU_ENABLE_UDP, ) .expect("assume successful validator start"); cluster_validator_info.validator = Some(restarted_node); diff --git a/multinode-demo/bootstrap-validator.sh b/multinode-demo/bootstrap-validator.sh index deb82f106f..fb6353f76b 100755 --- a/multinode-demo/bootstrap-validator.sh +++ b/multinode-demo/bootstrap-validator.sh @@ -64,6 +64,9 @@ while [[ -n $1 ]]; do elif [[ $1 = --tpu-disable-quic ]]; then args+=("$1") shift + elif [[ $1 = --tpu-enable-udp ]]; then + args+=("$1") + shift elif [[ $1 = --rpc-send-batch-ms ]]; then args+=("$1" "$2") shift 2 diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index 98ab0143a1..f339726b5b 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -150,6 +150,9 @@ while [[ -n $1 ]]; do elif [[ $1 = --tpu-disable-quic ]]; then args+=("$1") shift + elif [[ $1 = --tpu-enable-udp ]]; then + args+=("$1") + shift elif [[ $1 = --rpc-send-batch-ms ]]; then args+=("$1" "$2") shift 2 diff --git a/net/net.sh b/net/net.sh index c61799340b..0bb1ea0d83 100755 --- a/net/net.sh +++ b/net/net.sh @@ -111,6 +111,9 @@ Operate a configured testnet --tpu-disable-quic - Disable quic for tpu packet forwarding + --tpu-enable-udp + - Enable UDP for tpu transactions + sanity/start-specific options: -F - Discard validator nodes that didn't bootup successfully -o noInstallCheck - Skip solana-install sanity @@ -325,6 +328,7 @@ startBootstrapLeader() { \"$extraPrimordialStakes\" \ \"$TMPFS_ACCOUNTS\" \ \"$disableQuic\" \ + \"$enableUdp\" \ " ) >> "$logFile" 2>&1 || { @@ -398,6 +402,7 @@ startNode() { \"$extraPrimordialStakes\" \ \"$TMPFS_ACCOUNTS\" \ \"$disableQuic\" \ + \"$enableUdp\" \ " ) >> "$logFile" 2>&1 & declare pid=$! @@ -807,6 +812,7 @@ maybeFullRpc=false waitForNodeInit=true extraPrimordialStakes=0 disableQuic=false +enableUdp=false command=$1 [[ -n $command ]] || usage @@ -922,6 +928,9 @@ while [[ -n $1 ]]; do elif [[ $1 == --tpu-disable-quic ]]; then disableQuic=true shift 1 + elif [[ $1 == --tpu-enable-udp ]]; then + enableUdp=true + shift 1 elif [[ $1 == --async-node-init ]]; then waitForNodeInit=false shift 1 diff --git a/net/remote/remote-node.sh b/net/remote/remote-node.sh index b07429f24d..94518199a9 100755 --- a/net/remote/remote-node.sh +++ b/net/remote/remote-node.sh @@ -29,6 +29,7 @@ waitForNodeInit="${20}" extraPrimordialStakes="${21:=0}" tmpfsAccounts="${22:false}" disableQuic="${23}" +enableUdp="${24}" set +x @@ -290,6 +291,10 @@ EOF args+=(--tpu-disable-quic) fi + if $enableUdp; then + args+=(--tpu-enable-udp) + fi + if [[ $airdropsEnabled = true ]]; then cat >> ~/solana/on-reboot < faucet.log 2>&1 & @@ -422,6 +427,10 @@ EOF args+=(--tpu-disable-quic) fi + if $enableUdp; then + args+=(--tpu-enable-udp) + fi + cat >> ~/solana/on-reboot < validator.log.\$now 2>&1 & diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index a4e704232a..d5eff757eb 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -245,7 +245,7 @@ fn test_rpc_subscriptions() { let alice = Keypair::new(); let test_validator = - TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified); + TestValidator::with_no_fees_udp(alice.pubkey(), None, SocketAddrSpace::Unspecified); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); transactions_socket.connect(test_validator.tpu()).unwrap(); diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index a54436eba1..a683efd41c 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -44,7 +44,9 @@ use { signature::{read_keypair_file, write_keypair_file, Keypair, Signer}, }, solana_streamer::socket::SocketAddrSpace, - solana_tpu_client::connection_cache::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC}, + solana_tpu_client::connection_cache::{ + DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, + }, std::{ collections::{HashMap, HashSet}, ffi::OsStr, @@ -123,6 +125,7 @@ pub struct TestValidatorGenesis { compute_unit_limit: Option, pub log_messages_bytes_limit: Option, pub transaction_account_lock_limit: Option, + pub tpu_enable_udp: bool, } impl Default for TestValidatorGenesis { @@ -154,6 +157,7 @@ impl Default for TestValidatorGenesis { compute_unit_limit: Option::::default(), log_messages_bytes_limit: Option::::default(), transaction_account_lock_limit: Option::::default(), + tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP, } } } @@ -181,6 +185,11 @@ impl TestValidatorGenesis { ledger_path.join("vote-account-keypair.json").exists() } + pub fn tpu_enable_udp(&mut self, tpu_enable_udp: bool) -> &mut Self { + self.tpu_enable_udp = tpu_enable_udp; + self + } + pub fn fee_rate_governor(&mut self, fee_rate_governor: FeeRateGovernor) -> &mut Self { self.fee_rate_governor = fee_rate_governor; self @@ -549,6 +558,25 @@ impl TestValidator { .expect("validator start failed") } + /// Create a test validator using udp for TPU. + pub fn with_no_fees_udp( + mint_address: Pubkey, + faucet_addr: Option, + socket_addr_space: SocketAddrSpace, + ) -> Self { + TestValidatorGenesis::default() + .tpu_enable_udp(true) + .fee_rate_governor(FeeRateGovernor::new(0, 0)) + .rent(Rent { + lamports_per_byte_year: 1, + exemption_threshold: 1.0, + ..Rent::default() + }) + .faucet_addr(faucet_addr) + .start_with_mint_address(mint_address, socket_addr_space) + .expect("validator start failed") + } + /// Create and start a `TestValidator` with custom transaction fees and minimal rent. /// Faucet optional. /// @@ -815,6 +843,7 @@ impl TestValidator { socket_addr_space, DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, + config.tpu_enable_udp, )?); // Needed to avoid panics in `solana-responder-gossip` in tests that create a number of diff --git a/tpu-client/src/connection_cache.rs b/tpu-client/src/connection_cache.rs index 28d8f10e5e..502d0ede87 100644 --- a/tpu-client/src/connection_cache.rs +++ b/tpu-client/src/connection_cache.rs @@ -37,6 +37,8 @@ pub const DEFAULT_TPU_USE_QUIC: bool = true; /// Default TPU connection pool size per remote address pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4; +pub const DEFAULT_TPU_ENABLE_UDP: bool = false; + #[derive(Default)] pub struct ConnectionCacheStats { cache_hits: AtomicU64, diff --git a/validator/src/main.rs b/validator/src/main.rs index 364cb9538e..ba8c8fd4b3 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -70,7 +70,9 @@ use { self, MAX_BATCH_SEND_RATE_MS, MAX_TRANSACTION_BATCH_SIZE, }, solana_streamer::socket::SocketAddrSpace, - solana_tpu_client::connection_cache::DEFAULT_TPU_CONNECTION_POOL_SIZE, + solana_tpu_client::connection_cache::{ + DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, + }, solana_validator::{ admin_rpc_service, admin_rpc_service::{load_staked_nodes_overrides, StakedNodesOverrides}, @@ -1245,6 +1247,12 @@ pub fn main() { .takes_value(false) .help("Do not use QUIC to send transactions."), ) + .arg( + Arg::with_name("tpu_enable_udp") + .long("tpu-enable-udp") + .takes_value(false) + .help("Enable UDP for receiving/sending transactions."), + ) .arg( Arg::with_name("disable_quic_servers") .long("disable-quic-servers") @@ -2431,6 +2439,12 @@ pub fn main() { let accounts_shrink_optimize_total_space = value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool); let tpu_use_quic = !matches.is_present("tpu_disable_quic"); + let tpu_enable_udp = if matches.is_present("tpu_enable_udp") { + true + } else { + DEFAULT_TPU_ENABLE_UDP + }; + let tpu_connection_pool_size = value_t_or_exit!(matches, "tpu_connection_pool_size", usize); let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64); @@ -3221,6 +3235,7 @@ pub fn main() { socket_addr_space, tpu_use_quic, tpu_connection_pool_size, + tpu_enable_udp, ) .unwrap_or_else(|e| { error!("Failed to start validator: {:?}", e);