diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 4acf3d319e..3bfa4bdfc6 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -20,10 +20,10 @@ use { }; // Should be non-zero -static MAX_CONNECTIONS: usize = 64; +static MAX_CONNECTIONS: usize = 1024; #[derive(Clone)] -enum Connection { +pub enum Connection { Udp(Arc), Quic(Arc), } @@ -117,14 +117,14 @@ impl ConnectionCacheStats { } } -struct ConnMap { +struct ConnectionMap { map: LruCache, stats: Arc, last_stats: AtomicInterval, use_quic: bool, } -impl ConnMap { +impl ConnectionMap { pub fn new() -> Self { Self { map: LruCache::new(MAX_CONNECTIONS), @@ -140,7 +140,7 @@ impl ConnMap { } lazy_static! { - static ref CONNECTION_MAP: Mutex = Mutex::new(ConnMap::new()); + static ref CONNECTION_MAP: Mutex = Mutex::new(ConnectionMap::new()); } pub fn set_use_quic(use_quic: bool) { @@ -346,6 +346,7 @@ mod tests { #[test] fn test_connection_cache() { + solana_logger::setup(); // Allow the test to run deterministically // with the same pseudorandom sequence between runs // and on different platforms - the cryptographic security diff --git a/client/src/quic_client.rs b/client/src/quic_client.rs index 69efbbee07..5a0004f2e0 100644 --- a/client/src/quic_client.rs +++ b/client/src/quic_client.rs @@ -11,15 +11,20 @@ use { itertools::Itertools, lazy_static::lazy_static, log::*, - quinn::{ClientConfig, Endpoint, EndpointConfig, NewConnection, WriteError}, + quinn::{ + ClientConfig, Endpoint, EndpointConfig, IdleTimeout, NewConnection, VarInt, WriteError, + }, quinn_proto::ConnectionStats, solana_sdk::{ - quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_PORT_OFFSET}, + quic::{ + QUIC_KEEP_ALIVE_MS, QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS, QUIC_PORT_OFFSET, + }, transport::Result as TransportResult, }, std::{ net::{SocketAddr, UdpSocket}, sync::{atomic::Ordering, Arc}, + time::Duration, }, tokio::runtime::Runtime, }; @@ -163,7 +168,13 @@ impl QuicClient { let mut endpoint = RUNTIME.block_on(create_endpoint); - endpoint.set_default_client_config(ClientConfig::new(Arc::new(crypto))); + let mut config = ClientConfig::new(Arc::new(crypto)); + let transport_config = Arc::get_mut(&mut config.transport).unwrap(); + let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS)); + transport_config.max_idle_timeout(Some(timeout)); + transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS))); + + endpoint.set_default_client_config(config); Self { endpoint, diff --git a/core/src/lib.rs b/core/src/lib.rs index abc843bbd9..be682be770 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -73,6 +73,7 @@ pub mod verified_vote_packets; pub mod vote_simulator; pub mod vote_stake_tracker; pub mod voting_service; +pub mod warm_quic_cache_service; pub mod window_service; #[macro_use] diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 649b027019..dc6d382d94 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -25,6 +25,7 @@ use { sigverify_stage::SigVerifyStage, tower_storage::TowerStorage, voting_service::VotingService, + warm_quic_cache_service::WarmQuicCacheService, }, crossbeam_channel::{bounded, unbounded, Receiver, RecvTimeoutError}, solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock, @@ -78,6 +79,7 @@ pub struct Tvu { accounts_hash_verifier: AccountsHashVerifier, cost_update_service: CostUpdateService, voting_service: VotingService, + warm_quic_cache_service: WarmQuicCacheService, drop_bank_service: DropBankService, transaction_cost_metrics_service: TransactionCostMetricsService, } @@ -283,6 +285,9 @@ impl Tvu { bank_forks.clone(), ); + let warm_quic_cache_service = + WarmQuicCacheService::new(cluster_info.clone(), poh_recorder.clone(), exit.clone()); + let (cost_update_sender, cost_update_receiver) = unbounded(); let cost_update_service = CostUpdateService::new(blockstore.clone(), cost_model.clone(), cost_update_receiver); @@ -356,6 +361,7 @@ impl Tvu { accounts_hash_verifier, cost_update_service, voting_service, + warm_quic_cache_service, drop_bank_service, transaction_cost_metrics_service, } @@ -390,6 +396,7 @@ impl Tvu { self.accounts_hash_verifier.join()?; self.cost_update_service.join()?; self.voting_service.join()?; + self.warm_quic_cache_service.join()?; self.drop_bank_service.join()?; self.transaction_cost_metrics_service.join()?; Ok(()) diff --git a/core/src/warm_quic_cache_service.rs b/core/src/warm_quic_cache_service.rs new file mode 100644 index 0000000000..d7013710a6 --- /dev/null +++ b/core/src/warm_quic_cache_service.rs @@ -0,0 +1,70 @@ +// Connect to future leaders with some jitter so the quic connection is warm +// by the time we need it. + +use { + rand::{thread_rng, Rng}, + solana_client::connection_cache::send_wire_transaction, + solana_gossip::cluster_info::ClusterInfo, + solana_poh::poh_recorder::PohRecorder, + std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, + thread::{self, sleep, Builder, JoinHandle}, + time::Duration, + }, +}; + +pub struct WarmQuicCacheService { + thread_hdl: JoinHandle<()>, +} + +// ~50 seconds +const CACHE_OFFSET_SLOT: i64 = 100; +const CACHE_JITTER_SLOT: i64 = 20; + +impl WarmQuicCacheService { + pub fn new( + cluster_info: Arc, + poh_recorder: Arc>, + exit: Arc, + ) -> Self { + let thread_hdl = Builder::new() + .name("sol-warm-quic-service".to_string()) + .spawn(move || { + let slot_jitter = thread_rng().gen_range(-CACHE_JITTER_SLOT, CACHE_JITTER_SLOT); + let mut maybe_last_leader = None; + while !exit.load(Ordering::Relaxed) { + if let Some(leader_pubkey) = poh_recorder + .lock() + .unwrap() + .leader_after_n_slots((CACHE_OFFSET_SLOT + slot_jitter) as u64) + { + if maybe_last_leader + .map_or(true, |last_leader| last_leader != leader_pubkey) + { + maybe_last_leader = Some(leader_pubkey); + if let Some(addr) = cluster_info + .lookup_contact_info(&leader_pubkey, |leader| leader.tpu) + { + if let Err(err) = send_wire_transaction(&[0u8], &addr) { + warn!( + "Failed to warmup QUIC connection to the leader {:?}, Error {:?}", + leader_pubkey, err + ); + } + } + } + } + sleep(Duration::from_millis(200)); + } + }) + .unwrap(); + Self { thread_hdl } + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/sdk/src/quic.rs b/sdk/src/quic.rs index 2fe4cf080c..acfb889408 100644 --- a/sdk/src/quic.rs +++ b/sdk/src/quic.rs @@ -3,3 +3,6 @@ pub const QUIC_PORT_OFFSET: u16 = 6; // that seems to maximize TPS on GCE (higher values don't seem to // give significant improvement or seem to impact stability) pub const QUIC_MAX_CONCURRENT_STREAMS: usize = 2048; + +pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000; +pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000; diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 8b5dec4712..3dd2e986fb 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -3,12 +3,12 @@ use { futures_util::stream::StreamExt, pem::Pem, pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier}, - quinn::{Endpoint, EndpointConfig, IncomingUniStreams, ServerConfig}, + quinn::{Endpoint, EndpointConfig, IdleTimeout, IncomingUniStreams, ServerConfig, VarInt}, rcgen::{CertificateParams, DistinguishedName, DnType, SanType}, solana_perf::packet::PacketBatch, solana_sdk::{ packet::{Packet, PACKET_DATA_SIZE}, - quic::QUIC_MAX_CONCURRENT_STREAMS, + quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS}, signature::Keypair, timing, }, @@ -55,6 +55,8 @@ fn configure_server( config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into()); config.stream_receive_window((PACKET_DATA_SIZE as u32).into()); config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into()); + let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS)); + config.max_idle_timeout(Some(timeout)); // disable bidi & datagrams const MAX_CONCURRENT_BIDI_STREAMS: u32 = 0; @@ -484,6 +486,7 @@ mod test { super::*, crossbeam_channel::unbounded, quinn::{ClientConfig, NewConnection}, + solana_sdk::quic::QUIC_KEEP_ALIVE_MS, std::{net::SocketAddr, time::Instant}, }; @@ -514,7 +517,14 @@ mod test { .with_safe_defaults() .with_custom_certificate_verifier(SkipServerVerification::new()) .with_no_client_auth(); - ClientConfig::new(Arc::new(crypto)) + let mut config = ClientConfig::new(Arc::new(crypto)); + + let transport_config = Arc::get_mut(&mut config.transport).unwrap(); + let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS)); + transport_config.max_idle_timeout(Some(timeout)); + transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS))); + + config } #[test] @@ -540,6 +550,45 @@ mod test { .unwrap() } + #[test] + fn test_quic_timeout() { + solana_logger::setup(); + let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let exit = Arc::new(AtomicBool::new(false)); + let (sender, receiver) = unbounded(); + let keypair = Keypair::new(); + let ip = "127.0.0.1".parse().unwrap(); + let server_address = s.local_addr().unwrap(); + let t = spawn_server(s, &keypair, ip, sender, exit.clone(), 1).unwrap(); + + let runtime = rt(); + let _rt_guard = runtime.enter(); + let conn1 = make_client_endpoint(&runtime, &server_address); + let total = 30; + let handle = runtime.spawn(async move { + for i in 0..total { + let mut s1 = conn1.connection.open_uni().await.unwrap(); + s1.write_all(&[0u8]).await.unwrap(); + s1.finish().await.unwrap(); + info!("done {}", i); + std::thread::sleep(Duration::from_millis(1000)); + } + }); + let mut received = 0; + loop { + if let Ok(_x) = receiver.recv_timeout(Duration::from_millis(500)) { + received += 1; + info!("got {}", received); + } + if received >= total { + break; + } + } + runtime.block_on(handle).unwrap(); + exit.store(true, Ordering::Relaxed); + t.join().unwrap(); + } + #[test] fn test_quic_server_block_multiple_connections() { solana_logger::setup();