diff --git a/blocking_client/src/quiche_client_loop.rs b/blocking_client/src/quiche_client_loop.rs index 42aa196..f6afa03 100644 --- a/blocking_client/src/quiche_client_loop.rs +++ b/blocking_client/src/quiche_client_loop.rs @@ -249,7 +249,6 @@ pub fn create_quiche_client_thread( // no more new messages } std::sync::mpsc::TryRecvError::Disconnected => { - log::error!("message_queue disconnected"); let _ = connection.close(true, 0, b"no longer needed"); } } @@ -405,7 +404,10 @@ mod tests { let (server_send_queue, rx_sent_queue) = mpsc::channel::(); let _server_loop_jh = std::thread::spawn(move || { if let Err(e) = server_loop( - QuicParameters::default(), + QuicParameters { + incremental_priority: true, + ..Default::default() + }, socket_addr, rx_sent_queue, CompressionType::Lz4Fast(8), diff --git a/common/src/config.rs b/common/src/config.rs index 94b8a1a..c3bcc4b 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -7,8 +7,8 @@ use crate::{ defaults::{ DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_ENABLE_GSO, DEFAULT_ENABLE_PACING, DEFAULT_INCREMENTAL_PRIORITY, DEFAULT_MAX_ACK_DELAY, - DEFAULT_MAX_MESSAGES_IN_QUEUE, DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE, - DEFAULT_MAX_STREAMS, DEFAULT_USE_CC_BBR, + DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS, + DEFAULT_USE_CC_BBR, }, }; @@ -69,7 +69,6 @@ pub struct QuicParameters { pub enable_pacing: bool, pub use_cc_bbr: bool, pub incremental_priority: bool, - pub max_messages_in_queue: u64, pub enable_gso: bool, } @@ -85,7 +84,6 @@ impl Default for QuicParameters { enable_pacing: DEFAULT_ENABLE_PACING, use_cc_bbr: DEFAULT_USE_CC_BBR, incremental_priority: DEFAULT_INCREMENTAL_PRIORITY, - max_messages_in_queue: DEFAULT_MAX_MESSAGES_IN_QUEUE, enable_gso: DEFAULT_ENABLE_GSO, } } diff --git a/common/src/defaults.rs b/common/src/defaults.rs index f4d017f..3352b5a 100644 --- a/common/src/defaults.rs +++ b/common/src/defaults.rs @@ -1,5 +1,4 @@ pub const DEFAULT_MAX_STREAMS: u64 = 128 * 1024; -pub const DEFAULT_MAX_MESSAGES_IN_QUEUE: u64 = 1024 * 1024; pub const MAX_ALLOWED_PARTIAL_RESPONSES: u64 = DEFAULT_MAX_STREAMS * 3 / 4; pub const DEFAULT_MAX_RECIEVE_WINDOW_SIZE: u64 = 24 * 1024 * 1024; // 24 MBs pub const DEFAULT_CONNECTION_TIMEOUT: u64 = 10; diff --git a/server/src/quiche_server_loop.rs b/server/src/quiche_server_loop.rs index d292edf..212303c 100644 --- a/server/src/quiche_server_loop.rs +++ b/server/src/quiche_server_loop.rs @@ -2,7 +2,7 @@ use std::{ collections::HashMap, net::SocketAddr, sync::{ - atomic::{AtomicBool, AtomicU64, AtomicUsize}, + atomic::{AtomicBool, AtomicU64}, mpsc::{self, Sender}, Arc, Mutex, RwLock, }, @@ -39,7 +39,6 @@ use crate::configure_server::configure_server; struct DispatchingData { pub sender: Sender<(Vec, u8)>, pub filters: Arc>>, - pub messages_in_queue: Arc, } type DispachingConnections = Arc, DispatchingData>>>; @@ -52,7 +51,6 @@ pub fn server_loop( stop_laggy_client: bool, ) -> anyhow::Result<()> { let maximum_concurrent_streams_id = u64::MAX; - let max_messages_in_queue = quic_params.max_messages_in_queue; let mut config = configure_server(quic_params)?; let mut socket = mio::net::UdpSocket::bind(socket_addr)?; @@ -103,7 +101,6 @@ pub fn server_loop( message_send_queue, dispatching_connections.clone(), compression_type, - max_messages_in_queue, ); let mut client_id_counter = 0; @@ -225,7 +222,6 @@ pub fn server_loop( let (client_sender, client_reciver) = mio_channel::channel(); let (client_message_sx, client_message_rx) = mpsc::channel(); - let messages_in_queue = Arc::new(AtomicUsize::new(0)); let current_client_id = client_id_counter; client_id_counter += 1; @@ -240,7 +236,6 @@ pub fn server_loop( filters.clone(), maximum_concurrent_streams_id, stop_laggy_client, - messages_in_queue.clone(), quic_params.incremental_priority, rng.clone(), ); @@ -250,7 +245,6 @@ pub fn server_loop( DispatchingData { sender: client_message_sx, filters, - messages_in_queue, }, ); clients_lk.insert(scid, current_client_id); @@ -312,7 +306,6 @@ fn create_client_task( filters: Arc>>, maximum_concurrent_streams_id: u64, stop_laggy_client: bool, - messages_in_queue: Arc, incremental_priority: bool, rng: SystemRandom, ) { @@ -348,7 +341,6 @@ fn create_client_task( let number_of_readable_streams = number_of_readable_streams.clone(); let number_of_writable_streams = number_of_writable_streams.clone(); let messages_added = messages_added.clone(); - let messages_in_queue = messages_in_queue.clone(); let quit = quit.clone(); std::thread::spawn(move || { while !quit.load(std::sync::atomic::Ordering::Relaxed) { @@ -379,10 +371,6 @@ fn create_client_task( "messages_added : {}", messages_added.swap(0, std::sync::atomic::Ordering::Relaxed) ); - log::debug!( - "messages in queue to be sent : {}", - messages_in_queue.load(std::sync::atomic::Ordering::Relaxed) - ); } }); } @@ -459,8 +447,6 @@ fn create_client_task( loop { let close = match message_channel.try_recv() { Ok((message, priority)) => { - messages_in_queue - .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); let stream_id = next_stream; next_stream = get_next_unidi(stream_id, true, maximum_concurrent_streams_id); @@ -615,10 +601,8 @@ fn create_dispatching_thread( message_send_queue: mpsc::Receiver, dispatching_connections: DispachingConnections, compression_type: CompressionType, - max_messages_in_queue: u64, ) { std::thread::spawn(move || { - let max_messages_in_queue = max_messages_in_queue as usize; while let Ok(message) = message_send_queue.recv() { let mut dispatching_connections_lk = dispatching_connections.lock().unwrap(); @@ -670,12 +654,7 @@ fn create_dispatching_thread( bincode::serialize(&message).expect("Message should be serializable in binary"); for id in dispatching_connections.iter() { let data = dispatching_connections_lk.get(id).unwrap(); - let messages_in_queue = data - .messages_in_queue - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if data.sender.send((binary.clone(), priority)).is_err() - || messages_in_queue > max_messages_in_queue - { + if data.sender.send((binary.clone(), priority)).is_err() { // client is closed dispatching_connections_lk.remove(id); }