Implement QUIC connection warmup service for future leaders (#24054)

* Increase connection timeouts

* Bump quic connection cache to 1024

* Use constant for quic connection timeout and add warm cache service

* Fixes to QUIC warmup service

* fix check failure

* fixes after rebase

* fix timeout test

Co-authored-by: Pankaj Garg <pankaj@solana.com>
This commit is contained in:
sakridge 2022-04-15 21:09:24 +02:00 committed by GitHub
parent 43d3f049e9
commit 1b7d1f78de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 153 additions and 11 deletions

View File

@ -20,10 +20,10 @@ use {
}; };
// Should be non-zero // Should be non-zero
static MAX_CONNECTIONS: usize = 64; static MAX_CONNECTIONS: usize = 1024;
#[derive(Clone)] #[derive(Clone)]
enum Connection { pub enum Connection {
Udp(Arc<UdpTpuConnection>), Udp(Arc<UdpTpuConnection>),
Quic(Arc<QuicTpuConnection>), Quic(Arc<QuicTpuConnection>),
} }
@ -117,14 +117,14 @@ impl ConnectionCacheStats {
} }
} }
struct ConnMap { struct ConnectionMap {
map: LruCache<SocketAddr, Connection>, map: LruCache<SocketAddr, Connection>,
stats: Arc<ConnectionCacheStats>, stats: Arc<ConnectionCacheStats>,
last_stats: AtomicInterval, last_stats: AtomicInterval,
use_quic: bool, use_quic: bool,
} }
impl ConnMap { impl ConnectionMap {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
map: LruCache::new(MAX_CONNECTIONS), map: LruCache::new(MAX_CONNECTIONS),
@ -140,7 +140,7 @@ impl ConnMap {
} }
lazy_static! { lazy_static! {
static ref CONNECTION_MAP: Mutex<ConnMap> = Mutex::new(ConnMap::new()); static ref CONNECTION_MAP: Mutex<ConnectionMap> = Mutex::new(ConnectionMap::new());
} }
pub fn set_use_quic(use_quic: bool) { pub fn set_use_quic(use_quic: bool) {
@ -346,6 +346,7 @@ mod tests {
#[test] #[test]
fn test_connection_cache() { fn test_connection_cache() {
solana_logger::setup();
// Allow the test to run deterministically // Allow the test to run deterministically
// with the same pseudorandom sequence between runs // with the same pseudorandom sequence between runs
// and on different platforms - the cryptographic security // and on different platforms - the cryptographic security

View File

@ -11,15 +11,20 @@ use {
itertools::Itertools, itertools::Itertools,
lazy_static::lazy_static, lazy_static::lazy_static,
log::*, log::*,
quinn::{ClientConfig, Endpoint, EndpointConfig, NewConnection, WriteError}, quinn::{
ClientConfig, Endpoint, EndpointConfig, IdleTimeout, NewConnection, VarInt, WriteError,
},
quinn_proto::ConnectionStats, quinn_proto::ConnectionStats,
solana_sdk::{ solana_sdk::{
quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_PORT_OFFSET}, quic::{
QUIC_KEEP_ALIVE_MS, QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS, QUIC_PORT_OFFSET,
},
transport::Result as TransportResult, transport::Result as TransportResult,
}, },
std::{ std::{
net::{SocketAddr, UdpSocket}, net::{SocketAddr, UdpSocket},
sync::{atomic::Ordering, Arc}, sync::{atomic::Ordering, Arc},
time::Duration,
}, },
tokio::runtime::Runtime, tokio::runtime::Runtime,
}; };
@ -163,7 +168,13 @@ impl QuicClient {
let mut endpoint = RUNTIME.block_on(create_endpoint); let mut endpoint = RUNTIME.block_on(create_endpoint);
endpoint.set_default_client_config(ClientConfig::new(Arc::new(crypto))); let mut config = ClientConfig::new(Arc::new(crypto));
let transport_config = Arc::get_mut(&mut config.transport).unwrap();
let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS));
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS)));
endpoint.set_default_client_config(config);
Self { Self {
endpoint, endpoint,

View File

@ -73,6 +73,7 @@ pub mod verified_vote_packets;
pub mod vote_simulator; pub mod vote_simulator;
pub mod vote_stake_tracker; pub mod vote_stake_tracker;
pub mod voting_service; pub mod voting_service;
pub mod warm_quic_cache_service;
pub mod window_service; pub mod window_service;
#[macro_use] #[macro_use]

View File

@ -25,6 +25,7 @@ use {
sigverify_stage::SigVerifyStage, sigverify_stage::SigVerifyStage,
tower_storage::TowerStorage, tower_storage::TowerStorage,
voting_service::VotingService, voting_service::VotingService,
warm_quic_cache_service::WarmQuicCacheService,
}, },
crossbeam_channel::{bounded, unbounded, Receiver, RecvTimeoutError}, crossbeam_channel::{bounded, unbounded, Receiver, RecvTimeoutError},
solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock, solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock,
@ -78,6 +79,7 @@ pub struct Tvu {
accounts_hash_verifier: AccountsHashVerifier, accounts_hash_verifier: AccountsHashVerifier,
cost_update_service: CostUpdateService, cost_update_service: CostUpdateService,
voting_service: VotingService, voting_service: VotingService,
warm_quic_cache_service: WarmQuicCacheService,
drop_bank_service: DropBankService, drop_bank_service: DropBankService,
transaction_cost_metrics_service: TransactionCostMetricsService, transaction_cost_metrics_service: TransactionCostMetricsService,
} }
@ -283,6 +285,9 @@ impl Tvu {
bank_forks.clone(), bank_forks.clone(),
); );
let warm_quic_cache_service =
WarmQuicCacheService::new(cluster_info.clone(), poh_recorder.clone(), exit.clone());
let (cost_update_sender, cost_update_receiver) = unbounded(); let (cost_update_sender, cost_update_receiver) = unbounded();
let cost_update_service = let cost_update_service =
CostUpdateService::new(blockstore.clone(), cost_model.clone(), cost_update_receiver); CostUpdateService::new(blockstore.clone(), cost_model.clone(), cost_update_receiver);
@ -356,6 +361,7 @@ impl Tvu {
accounts_hash_verifier, accounts_hash_verifier,
cost_update_service, cost_update_service,
voting_service, voting_service,
warm_quic_cache_service,
drop_bank_service, drop_bank_service,
transaction_cost_metrics_service, transaction_cost_metrics_service,
} }
@ -390,6 +396,7 @@ impl Tvu {
self.accounts_hash_verifier.join()?; self.accounts_hash_verifier.join()?;
self.cost_update_service.join()?; self.cost_update_service.join()?;
self.voting_service.join()?; self.voting_service.join()?;
self.warm_quic_cache_service.join()?;
self.drop_bank_service.join()?; self.drop_bank_service.join()?;
self.transaction_cost_metrics_service.join()?; self.transaction_cost_metrics_service.join()?;
Ok(()) Ok(())

View File

@ -0,0 +1,70 @@
// Connect to future leaders with some jitter so the quic connection is warm
// by the time we need it.
use {
rand::{thread_rng, Rng},
solana_client::connection_cache::send_wire_transaction,
solana_gossip::cluster_info::ClusterInfo,
solana_poh::poh_recorder::PohRecorder,
std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
},
};
pub struct WarmQuicCacheService {
thread_hdl: JoinHandle<()>,
}
// ~50 seconds
const CACHE_OFFSET_SLOT: i64 = 100;
const CACHE_JITTER_SLOT: i64 = 20;
impl WarmQuicCacheService {
pub fn new(
cluster_info: Arc<ClusterInfo>,
poh_recorder: Arc<Mutex<PohRecorder>>,
exit: Arc<AtomicBool>,
) -> Self {
let thread_hdl = Builder::new()
.name("sol-warm-quic-service".to_string())
.spawn(move || {
let slot_jitter = thread_rng().gen_range(-CACHE_JITTER_SLOT, CACHE_JITTER_SLOT);
let mut maybe_last_leader = None;
while !exit.load(Ordering::Relaxed) {
if let Some(leader_pubkey) = poh_recorder
.lock()
.unwrap()
.leader_after_n_slots((CACHE_OFFSET_SLOT + slot_jitter) as u64)
{
if maybe_last_leader
.map_or(true, |last_leader| last_leader != leader_pubkey)
{
maybe_last_leader = Some(leader_pubkey);
if let Some(addr) = cluster_info
.lookup_contact_info(&leader_pubkey, |leader| leader.tpu)
{
if let Err(err) = send_wire_transaction(&[0u8], &addr) {
warn!(
"Failed to warmup QUIC connection to the leader {:?}, Error {:?}",
leader_pubkey, err
);
}
}
}
}
sleep(Duration::from_millis(200));
}
})
.unwrap();
Self { thread_hdl }
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -3,3 +3,6 @@ pub const QUIC_PORT_OFFSET: u16 = 6;
// that seems to maximize TPS on GCE (higher values don't seem to // that seems to maximize TPS on GCE (higher values don't seem to
// give significant improvement or seem to impact stability) // give significant improvement or seem to impact stability)
pub const QUIC_MAX_CONCURRENT_STREAMS: usize = 2048; pub const QUIC_MAX_CONCURRENT_STREAMS: usize = 2048;
pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000;
pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000;

View File

@ -3,12 +3,12 @@ use {
futures_util::stream::StreamExt, futures_util::stream::StreamExt,
pem::Pem, pem::Pem,
pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier}, pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier},
quinn::{Endpoint, EndpointConfig, IncomingUniStreams, ServerConfig}, quinn::{Endpoint, EndpointConfig, IdleTimeout, IncomingUniStreams, ServerConfig, VarInt},
rcgen::{CertificateParams, DistinguishedName, DnType, SanType}, rcgen::{CertificateParams, DistinguishedName, DnType, SanType},
solana_perf::packet::PacketBatch, solana_perf::packet::PacketBatch,
solana_sdk::{ solana_sdk::{
packet::{Packet, PACKET_DATA_SIZE}, packet::{Packet, PACKET_DATA_SIZE},
quic::QUIC_MAX_CONCURRENT_STREAMS, quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS},
signature::Keypair, signature::Keypair,
timing, timing,
}, },
@ -55,6 +55,8 @@ fn configure_server(
config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into()); config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into());
config.stream_receive_window((PACKET_DATA_SIZE as u32).into()); config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into()); config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());
let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS));
config.max_idle_timeout(Some(timeout));
// disable bidi & datagrams // disable bidi & datagrams
const MAX_CONCURRENT_BIDI_STREAMS: u32 = 0; const MAX_CONCURRENT_BIDI_STREAMS: u32 = 0;
@ -484,6 +486,7 @@ mod test {
super::*, super::*,
crossbeam_channel::unbounded, crossbeam_channel::unbounded,
quinn::{ClientConfig, NewConnection}, quinn::{ClientConfig, NewConnection},
solana_sdk::quic::QUIC_KEEP_ALIVE_MS,
std::{net::SocketAddr, time::Instant}, std::{net::SocketAddr, time::Instant},
}; };
@ -514,7 +517,14 @@ mod test {
.with_safe_defaults() .with_safe_defaults()
.with_custom_certificate_verifier(SkipServerVerification::new()) .with_custom_certificate_verifier(SkipServerVerification::new())
.with_no_client_auth(); .with_no_client_auth();
ClientConfig::new(Arc::new(crypto)) let mut config = ClientConfig::new(Arc::new(crypto));
let transport_config = Arc::get_mut(&mut config.transport).unwrap();
let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS));
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS)));
config
} }
#[test] #[test]
@ -540,6 +550,45 @@ mod test {
.unwrap() .unwrap()
} }
#[test]
fn test_quic_timeout() {
solana_logger::setup();
let s = UdpSocket::bind("127.0.0.1:0").unwrap();
let exit = Arc::new(AtomicBool::new(false));
let (sender, receiver) = unbounded();
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let t = spawn_server(s, &keypair, ip, sender, exit.clone(), 1).unwrap();
let runtime = rt();
let _rt_guard = runtime.enter();
let conn1 = make_client_endpoint(&runtime, &server_address);
let total = 30;
let handle = runtime.spawn(async move {
for i in 0..total {
let mut s1 = conn1.connection.open_uni().await.unwrap();
s1.write_all(&[0u8]).await.unwrap();
s1.finish().await.unwrap();
info!("done {}", i);
std::thread::sleep(Duration::from_millis(1000));
}
});
let mut received = 0;
loop {
if let Ok(_x) = receiver.recv_timeout(Duration::from_millis(500)) {
received += 1;
info!("got {}", received);
}
if received >= total {
break;
}
}
runtime.block_on(handle).unwrap();
exit.store(true, Ordering::Relaxed);
t.join().unwrap();
}
#[test] #[test]
fn test_quic_server_block_multiple_connections() { fn test_quic_server_block_multiple_connections() {
solana_logger::setup(); solana_logger::setup();