From 36e97654e3ea12beb37c8a7459b16e15cc94a3f0 Mon Sep 17 00:00:00 2001 From: ryleung-solana <91908731+ryleung-solana@users.noreply.github.com> Date: Tue, 19 Mar 2024 03:05:00 +0800 Subject: [PATCH] Make the quic server connection table use an async lock, reducing thrashing (#293) Make the quic server connection table use an async lock, reducing lock contention --- streamer/src/nonblocking/quic.rs | 34 ++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 225412dd0..c49690062 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -33,13 +33,26 @@ use { std::{ iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, + // CAUTION: be careful not to introduce any awaits while holding an RwLock. sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, Mutex, MutexGuard, RwLock, + Arc, RwLock, }, time::{Duration, Instant}, }, - tokio::{task::JoinHandle, time::timeout}, + tokio::{ + // CAUTION: It's kind of sketch that we're mixing async and sync locks (see the RwLock above). + // This is done so that sync code can also access the stake table. + // Make sure we don't hold a sync lock across an await - including the await to + // lock an async Mutex. This does not happen now and should not happen as long as we + // don't hold an async Mutex and sync RwLock at the same time (currently true) + // but if we do, the scope of the RwLock must always be a subset of the async Mutex + // (i.e. lock order is always async Mutex -> RwLock). Also, be careful not to + // introduce any other awaits while holding the RwLock. + sync::{Mutex, MutexGuard}, + task::JoinHandle, + time::timeout, + }, }; const WAIT_FOR_STREAM_TIMEOUT: Duration = Duration::from_millis(100); @@ -383,7 +396,7 @@ fn handle_and_cache_new_connection( } } -fn prune_unstaked_connections_and_add_new_connection( +async fn prune_unstaked_connections_and_add_new_connection( connection: Connection, connection_table: Arc>, max_connections: usize, @@ -394,7 +407,7 @@ fn prune_unstaked_connections_and_add_new_connection( let stats = params.stats.clone(); if max_connections > 0 { let connection_table_clone = connection_table.clone(); - let mut connection_table = connection_table.lock().unwrap(); + let mut connection_table = connection_table.lock().await; prune_unstaked_connection_table(&mut connection_table, max_connections, stats); handle_and_cache_new_connection( connection, @@ -504,7 +517,8 @@ async fn setup_connection( match params.peer_type { ConnectionPeerType::Staked(stake) => { - let mut connection_table_l = staked_connection_table.lock().unwrap(); + let mut connection_table_l = staked_connection_table.lock().await; + if connection_table_l.total_size >= max_staked_connections { let num_pruned = connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, stake); @@ -535,7 +549,9 @@ async fn setup_connection( ¶ms, wait_for_chunk_timeout, stream_load_ema.clone(), - ) { + ) + .await + { stats .connection_added_from_staked_peer .fetch_add(1, Ordering::Relaxed); @@ -557,7 +573,9 @@ async fn setup_connection( ¶ms, wait_for_chunk_timeout, stream_load_ema.clone(), - ) { + ) + .await + { stats .connection_added_from_unstaked_peer .fetch_add(1, Ordering::Relaxed); @@ -800,7 +818,7 @@ async fn handle_connection( } } - let removed_connection_count = connection_table.lock().unwrap().remove_connection( + let removed_connection_count = connection_table.lock().await.remove_connection( ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), remote_addr.port(), stable_id,