From 9b5a5fae7e2a17fb152f6d768f60885226cbdc5c Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Fri, 2 Aug 2024 07:01:14 +0200 Subject: [PATCH 1/9] removing maximum message in queue --- blocking_client/src/quiche_client_loop.rs | 6 ++++-- common/src/config.rs | 6 ++---- common/src/defaults.rs | 1 - server/src/quiche_server_loop.rs | 25 ++--------------------- 4 files changed, 8 insertions(+), 30 deletions(-) diff --git a/blocking_client/src/quiche_client_loop.rs b/blocking_client/src/quiche_client_loop.rs index 42aa196..f6afa03 100644 --- a/blocking_client/src/quiche_client_loop.rs +++ b/blocking_client/src/quiche_client_loop.rs @@ -249,7 +249,6 @@ pub fn create_quiche_client_thread( // no more new messages } std::sync::mpsc::TryRecvError::Disconnected => { - log::error!("message_queue disconnected"); let _ = connection.close(true, 0, b"no longer needed"); } } @@ -405,7 +404,10 @@ mod tests { let (server_send_queue, rx_sent_queue) = mpsc::channel::(); let _server_loop_jh = std::thread::spawn(move || { if let Err(e) = server_loop( - QuicParameters::default(), + QuicParameters { + incremental_priority: true, + ..Default::default() + }, socket_addr, rx_sent_queue, CompressionType::Lz4Fast(8), diff --git a/common/src/config.rs b/common/src/config.rs index 94b8a1a..c3bcc4b 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -7,8 +7,8 @@ use crate::{ defaults::{ DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_ENABLE_GSO, DEFAULT_ENABLE_PACING, DEFAULT_INCREMENTAL_PRIORITY, DEFAULT_MAX_ACK_DELAY, - DEFAULT_MAX_MESSAGES_IN_QUEUE, DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE, - DEFAULT_MAX_STREAMS, DEFAULT_USE_CC_BBR, + DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS, + DEFAULT_USE_CC_BBR, }, }; @@ -69,7 +69,6 @@ pub struct QuicParameters { pub enable_pacing: bool, pub use_cc_bbr: bool, pub incremental_priority: bool, - pub max_messages_in_queue: u64, pub enable_gso: bool, } @@ -85,7 +84,6 @@ impl Default for QuicParameters { enable_pacing: DEFAULT_ENABLE_PACING, use_cc_bbr: DEFAULT_USE_CC_BBR, incremental_priority: DEFAULT_INCREMENTAL_PRIORITY, - max_messages_in_queue: DEFAULT_MAX_MESSAGES_IN_QUEUE, enable_gso: DEFAULT_ENABLE_GSO, } } diff --git a/common/src/defaults.rs b/common/src/defaults.rs index f4d017f..3352b5a 100644 --- a/common/src/defaults.rs +++ b/common/src/defaults.rs @@ -1,5 +1,4 @@ pub const DEFAULT_MAX_STREAMS: u64 = 128 * 1024; -pub const DEFAULT_MAX_MESSAGES_IN_QUEUE: u64 = 1024 * 1024; pub const MAX_ALLOWED_PARTIAL_RESPONSES: u64 = DEFAULT_MAX_STREAMS * 3 / 4; pub const DEFAULT_MAX_RECIEVE_WINDOW_SIZE: u64 = 24 * 1024 * 1024; // 24 MBs pub const DEFAULT_CONNECTION_TIMEOUT: u64 = 10; diff --git a/server/src/quiche_server_loop.rs b/server/src/quiche_server_loop.rs index d292edf..212303c 100644 --- a/server/src/quiche_server_loop.rs +++ b/server/src/quiche_server_loop.rs @@ -2,7 +2,7 @@ use std::{ collections::HashMap, net::SocketAddr, sync::{ - atomic::{AtomicBool, AtomicU64, AtomicUsize}, + atomic::{AtomicBool, AtomicU64}, mpsc::{self, Sender}, Arc, Mutex, RwLock, }, @@ -39,7 +39,6 @@ use crate::configure_server::configure_server; struct DispatchingData { pub sender: Sender<(Vec, u8)>, pub filters: Arc>>, - pub messages_in_queue: Arc, } type DispachingConnections = Arc, DispatchingData>>>; @@ -52,7 +51,6 @@ pub fn server_loop( stop_laggy_client: bool, ) -> anyhow::Result<()> { let maximum_concurrent_streams_id = u64::MAX; - let max_messages_in_queue = quic_params.max_messages_in_queue; let mut config = configure_server(quic_params)?; let mut socket = mio::net::UdpSocket::bind(socket_addr)?; @@ -103,7 +101,6 @@ pub fn server_loop( message_send_queue, dispatching_connections.clone(), compression_type, - max_messages_in_queue, ); let mut client_id_counter = 0; @@ -225,7 +222,6 @@ pub fn server_loop( let (client_sender, client_reciver) = mio_channel::channel(); let (client_message_sx, client_message_rx) = mpsc::channel(); - let messages_in_queue = Arc::new(AtomicUsize::new(0)); let current_client_id = client_id_counter; client_id_counter += 1; @@ -240,7 +236,6 @@ pub fn server_loop( filters.clone(), maximum_concurrent_streams_id, stop_laggy_client, - messages_in_queue.clone(), quic_params.incremental_priority, rng.clone(), ); @@ -250,7 +245,6 @@ pub fn server_loop( DispatchingData { sender: client_message_sx, filters, - messages_in_queue, }, ); clients_lk.insert(scid, current_client_id); @@ -312,7 +306,6 @@ fn create_client_task( filters: Arc>>, maximum_concurrent_streams_id: u64, stop_laggy_client: bool, - messages_in_queue: Arc, incremental_priority: bool, rng: SystemRandom, ) { @@ -348,7 +341,6 @@ fn create_client_task( let number_of_readable_streams = number_of_readable_streams.clone(); let number_of_writable_streams = number_of_writable_streams.clone(); let messages_added = messages_added.clone(); - let messages_in_queue = messages_in_queue.clone(); let quit = quit.clone(); std::thread::spawn(move || { while !quit.load(std::sync::atomic::Ordering::Relaxed) { @@ -379,10 +371,6 @@ fn create_client_task( "messages_added : {}", messages_added.swap(0, std::sync::atomic::Ordering::Relaxed) ); - log::debug!( - "messages in queue to be sent : {}", - messages_in_queue.load(std::sync::atomic::Ordering::Relaxed) - ); } }); } @@ -459,8 +447,6 @@ fn create_client_task( loop { let close = match message_channel.try_recv() { Ok((message, priority)) => { - messages_in_queue - .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); let stream_id = next_stream; next_stream = get_next_unidi(stream_id, true, maximum_concurrent_streams_id); @@ -615,10 +601,8 @@ fn create_dispatching_thread( message_send_queue: mpsc::Receiver, dispatching_connections: DispachingConnections, compression_type: CompressionType, - max_messages_in_queue: u64, ) { std::thread::spawn(move || { - let max_messages_in_queue = max_messages_in_queue as usize; while let Ok(message) = message_send_queue.recv() { let mut dispatching_connections_lk = dispatching_connections.lock().unwrap(); @@ -670,12 +654,7 @@ fn create_dispatching_thread( bincode::serialize(&message).expect("Message should be serializable in binary"); for id in dispatching_connections.iter() { let data = dispatching_connections_lk.get(id).unwrap(); - let messages_in_queue = data - .messages_in_queue - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if data.sender.send((binary.clone(), priority)).is_err() - || messages_in_queue > max_messages_in_queue - { + if data.sender.send((binary.clone(), priority)).is_err() { // client is closed dispatching_connections_lk.remove(id); } From c927b713c69b5e92be36a9097b8c6bffeaef5ed4 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Fri, 2 Aug 2024 16:09:35 +0200 Subject: [PATCH 2/9] moving from mio::udp socket to std::net::udp socket --- blocking_client/src/quiche_client_loop.rs | 2 +- server/src/quiche_server_loop.rs | 395 +++++++++++----------- 2 files changed, 193 insertions(+), 204 deletions(-) diff --git a/blocking_client/src/quiche_client_loop.rs b/blocking_client/src/quiche_client_loop.rs index f6afa03..5c9490b 100644 --- a/blocking_client/src/quiche_client_loop.rs +++ b/blocking_client/src/quiche_client_loop.rs @@ -76,7 +76,7 @@ pub fn client_loop( let mut buf = [0; 65535]; 'client: loop { - poll.poll(&mut events, Some(Duration::from_micros(100)))?; + poll.poll(&mut events, Some(Duration::from_millis(100)))?; 'read: loop { match socket.recv_from(&mut buf) { diff --git a/server/src/quiche_server_loop.rs b/server/src/quiche_server_loop.rs index 212303c..6a740cf 100644 --- a/server/src/quiche_server_loop.rs +++ b/server/src/quiche_server_loop.rs @@ -1,6 +1,7 @@ use std::{ collections::HashMap, net::SocketAddr, + net::UdpSocket, sync::{ atomic::{AtomicBool, AtomicU64}, mpsc::{self, Sender}, @@ -11,7 +12,6 @@ use std::{ use anyhow::bail; use itertools::Itertools; -use mio::Token; use quiche::ConnectionId; use ring::rand::SystemRandom; @@ -53,13 +53,7 @@ pub fn server_loop( let maximum_concurrent_streams_id = u64::MAX; let mut config = configure_server(quic_params)?; - let mut socket = mio::net::UdpSocket::bind(socket_addr)?; - let mut poll = mio::Poll::new()?; - let mut events = mio::Events::with_capacity(1024); - - poll.registry() - .register(&mut socket, mio::Token(0), mio::Interest::READABLE)?; - + let socket = Arc::new(UdpSocket::bind(socket_addr)?); let mut buf = [0; 65535]; let mut out = [0; MAX_DATAGRAM_SIZE]; @@ -73,11 +67,6 @@ pub fn server_loop( let clients_by_id: Arc, u64>>> = Arc::new(Mutex::new(HashMap::new())); - let (write_sender, mut write_reciver) = mio_channel::channel::<(quiche::SendInfo, Vec)>(); - - poll.registry() - .register(&mut write_reciver, mio::Token(1), mio::Interest::READABLE)?; - let enable_pacing = if quic_params.enable_pacing { set_txtime_sockopt(&socket).is_ok() } else { @@ -105,209 +94,193 @@ pub fn server_loop( let mut client_id_counter = 0; loop { - poll.poll(&mut events, Some(Duration::from_millis(10)))?; - let do_read = events.is_empty() || events.iter().any(|x| x.token() == Token(0)); - if do_read { - 'read: loop { - let (len, from) = match socket.recv_from(&mut buf) { - Ok(v) => v, - Err(e) => { - if e.kind() == std::io::ErrorKind::WouldBlock { - log::trace!("recv() would block"); - break 'read; - } - bail!("recv() failed: {:?}", e); - } - }; - - let pkt_buf = &mut buf[..len]; - - // Parse the QUIC packet's header. - let hdr = match quiche::Header::from_slice(pkt_buf, quiche::MAX_CONN_ID_LEN) { - Ok(v) => v, - - Err(e) => { - log::error!("Parsing packet header failed: {:?}", e); - continue 'read; - } - }; - - let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid); - let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN]; - let conn_id: ConnectionId<'static> = conn_id.to_vec().into(); - let mut clients_lk = clients_by_id.lock().unwrap(); - if !clients_lk.contains_key(&hdr.dcid) && !clients_lk.contains_key(&conn_id) { - if hdr.ty != quiche::Type::Initial { - log::error!("Packet is not Initial"); - continue 'read; - } - - if !quiche::version_is_supported(hdr.version) { - log::warn!("Doing version negotiation"); - let len = - quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out).unwrap(); - - let out = &out[..len]; - - if let Err(e) = socket.send_to(out, from) { - if e.kind() == std::io::ErrorKind::WouldBlock { - break; - } - panic!("send() failed: {:?}", e); - } - continue 'read; - } - - let mut scid = [0; quiche::MAX_CONN_ID_LEN]; - scid.copy_from_slice(&conn_id); - - let scid = quiche::ConnectionId::from_ref(&scid); - - // Token is always present in Initial packets. - let token = hdr.token.as_ref().unwrap(); - - // Do stateless retry if the client didn't send a token. - if token.is_empty() { - log::debug!("Doing stateless retry"); - - let new_token = mint_token(&hdr, &from); - - let len = quiche::retry( - &hdr.scid, - &hdr.dcid, - &scid, - &new_token, - hdr.version, - &mut out, - ) - .unwrap(); - - if let Err(e) = socket.send_to(&out[..len], from) { - log::error!("Error sending retry messages : {e:?}"); - } - continue 'read; - } - - let odcid = validate_token(&from, token); - - if odcid.is_none() { - log::error!("Invalid address validation token"); - continue 'read; - } - - if scid.len() != hdr.dcid.len() { - log::error!("Invalid destination connection ID"); - continue 'read; - } - - let scid = hdr.dcid.clone(); - - log::info!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid); - - let mut conn = - quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config)?; - - let recv_info = quiche::RecvInfo { - to: socket.local_addr().unwrap(), - from, - }; - // Process potentially coalesced packets. - match conn.recv(pkt_buf, recv_info) { - Ok(v) => v, - Err(e) => { - log::error!("{} recv failed: {:?}", conn.trace_id(), e); - continue 'read; - } - }; - - let (client_sender, client_reciver) = mio_channel::channel(); - let (client_message_sx, client_message_rx) = mpsc::channel(); - let current_client_id = client_id_counter; - client_id_counter += 1; - - let filters = Arc::new(RwLock::new(Vec::new())); - create_client_task( - conn, - current_client_id, - clients_by_id.clone(), - client_reciver, - write_sender.clone(), - client_message_rx, - filters.clone(), - maximum_concurrent_streams_id, - stop_laggy_client, - quic_params.incremental_priority, - rng.clone(), - ); - let mut lk = dispatching_connections.lock().unwrap(); - lk.insert( - scid.clone(), - DispatchingData { - sender: client_message_sx, - filters, - }, - ); - clients_lk.insert(scid, current_client_id); - client_messsage_channel_by_id.insert(current_client_id, client_sender); - } else { - // get the existing client - let client_id = match clients_lk.get(&hdr.dcid) { - Some(v) => *v, - None => *clients_lk - .get(&conn_id) - .expect("The client should exist in the map"), - }; - - let recv_info = quiche::RecvInfo { - to: socket.local_addr().unwrap(), - from, - }; - match client_messsage_channel_by_id.get_mut(&client_id) { - Some(channel) => { - if channel.send((recv_info, pkt_buf.to_vec())).is_err() { - // client is closed - clients_lk.remove(&hdr.dcid); - clients_lk.remove(&conn_id); - client_messsage_channel_by_id.remove(&client_id); - } - } - None => { - log::error!("channel with client id {client_id} not found"); - } - } - }; + let (len, from) = match socket.recv_from(&mut buf) { + Ok(v) => v, + Err(e) => { + if e.kind() == std::io::ErrorKind::WouldBlock { + break; + } + bail!("recv() failed: {:?}", e); } - } + }; - while let Ok((send_info, buffer)) = write_reciver.try_recv() { - let send_result = if enable_pacing { - send_with_pacing(&socket, &buffer, &send_info, enable_gso) - } else { - socket.send_to(&buffer, send_info.to) + let pkt_buf = &mut buf[..len]; + + // Parse the QUIC packet's header. + let hdr = match quiche::Header::from_slice(pkt_buf, quiche::MAX_CONN_ID_LEN) { + Ok(v) => v, + + Err(e) => { + log::error!("Parsing packet header failed: {:?}", e); + continue; + } + }; + + let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid); + let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN]; + let conn_id: ConnectionId<'static> = conn_id.to_vec().into(); + let mut clients_lk = clients_by_id.lock().unwrap(); + if !clients_lk.contains_key(&hdr.dcid) && !clients_lk.contains_key(&conn_id) { + drop(clients_lk); + if hdr.ty != quiche::Type::Initial { + log::error!("Packet is not Initial"); + continue; + } + + if !quiche::version_is_supported(hdr.version) { + log::warn!("Doing version negotiation"); + let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out).unwrap(); + + let out = &out[..len]; + + if let Err(e) = socket.send_to(out, from) { + if e.kind() == std::io::ErrorKind::WouldBlock { + break; + } + panic!("send() failed: {:?}", e); + } + continue; + } + + let mut scid = [0; quiche::MAX_CONN_ID_LEN]; + scid.copy_from_slice(&conn_id); + + let scid = quiche::ConnectionId::from_ref(&scid); + + // Token is always present in Initial packets. + let token = hdr.token.as_ref().unwrap(); + + // Do stateless retry if the client didn't send a token. + if token.is_empty() { + log::debug!("Doing stateless retry"); + + let new_token = mint_token(&hdr, &from); + + let len = quiche::retry( + &hdr.scid, + &hdr.dcid, + &scid, + &new_token, + hdr.version, + &mut out, + ) + .unwrap(); + + if let Err(e) = socket.send_to(&out[..len], from) { + log::error!("Error sending retry messages : {e:?}"); + } + continue; + } + + let odcid = validate_token(&from, token); + + if odcid.is_none() { + log::error!("Invalid address validation token"); + continue; + } + + if scid.len() != hdr.dcid.len() { + log::error!("Invalid destination connection ID"); + continue; + } + + let scid = hdr.dcid.clone(); + + log::info!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid); + + let mut conn = quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config)?; + + let recv_info = quiche::RecvInfo { + to: socket.local_addr().unwrap(), + from, }; - match send_result { - Ok(_written) => {} + // Process potentially coalesced packets. + match conn.recv(pkt_buf, recv_info) { + Ok(v) => v, Err(e) => { - log::error!("sending failed with error : {e:?}"); + log::error!("{} recv failed: {:?}", conn.trace_id(), e); + continue; + } + }; + + let (client_sender, client_reciver) = mio_channel::channel(); + let (client_message_sx, client_message_rx) = mpsc::channel(); + let current_client_id = client_id_counter; + client_id_counter += 1; + + let filters = Arc::new(RwLock::new(Vec::new())); + create_client_task( + socket.clone(), + conn, + current_client_id, + clients_by_id.clone(), + client_reciver, + client_message_rx, + filters.clone(), + maximum_concurrent_streams_id, + stop_laggy_client, + quic_params.incremental_priority, + rng.clone(), + enable_pacing, + enable_gso, + ); + let mut lk = dispatching_connections.lock().unwrap(); + lk.insert( + scid.clone(), + DispatchingData { + sender: client_message_sx, + filters, + }, + ); + let mut clients_lk = clients_by_id.lock().unwrap(); + clients_lk.insert(scid, current_client_id); + client_messsage_channel_by_id.insert(current_client_id, client_sender); + } else { + // get the existing client + let client_id = match clients_lk.get(&hdr.dcid) { + Some(v) => *v, + None => *clients_lk + .get(&conn_id) + .expect("The client should exist in the map"), + }; + + let recv_info = quiche::RecvInfo { + to: socket.local_addr().unwrap(), + from, + }; + match client_messsage_channel_by_id.get_mut(&client_id) { + Some(channel) => { + if channel.send((recv_info, pkt_buf.to_vec())).is_err() { + // client is closed + clients_lk.remove(&hdr.dcid); + clients_lk.remove(&conn_id); + client_messsage_channel_by_id.remove(&client_id); + } + } + None => { + log::error!("channel with client id {client_id} not found"); } } - } + }; } + Ok(()) } #[allow(clippy::too_many_arguments)] fn create_client_task( + socket: Arc, connection: quiche::Connection, client_id: u64, client_id_by_scid: Arc, u64>>>, mut receiver: mio_channel::Receiver<(quiche::RecvInfo, Vec)>, - sender: mio_channel::Sender<(quiche::SendInfo, Vec)>, message_channel: mpsc::Receiver<(Vec, u8)>, filters: Arc>>, maximum_concurrent_streams_id: u64, stop_laggy_client: bool, incremental_priority: bool, rng: SystemRandom, + enable_pacing: bool, + enable_gso: bool, ) { std::thread::spawn(move || { let mut partial_responses = PartialResponses::new(); @@ -332,7 +305,11 @@ fn create_client_task( let number_of_writable_streams = Arc::new(AtomicU64::new(0)); let messages_added = Arc::new(AtomicU64::new(0)); let quit = Arc::new(AtomicBool::new(false)); - let max_burst_size = MAX_DATAGRAM_SIZE * 10; + let max_burst_size = if enable_gso { + MAX_DATAGRAM_SIZE * 10 + } else { + MAX_DATAGRAM_SIZE + }; { let number_of_loops = number_of_loops.clone(); @@ -575,9 +552,22 @@ fn create_client_task( } if total_length > 0 && send_message_to.is_some() { - sender - .send((send_message_to.unwrap(), out[..total_length].to_vec())) - .unwrap(); + let send_result = if enable_pacing { + send_with_pacing( + &socket, + &out[..total_length], + &send_message_to.unwrap(), + enable_gso, + ) + } else { + socket.send(&out[..total_length]) + }; + match send_result { + Ok(_written) => {} + Err(e) => { + log::error!("sending failed with error : {e:?}"); + } + } total_length = 0; } else { break; @@ -664,7 +654,7 @@ fn create_dispatching_thread( }); } -fn set_txtime_sockopt(sock: &mio::net::UdpSocket) -> std::io::Result<()> { +fn set_txtime_sockopt(sock: &UdpSocket) -> std::io::Result<()> { use nix::sys::socket::setsockopt; use nix::sys::socket::sockopt::TxTime; use std::os::unix::io::AsRawFd; @@ -674,7 +664,6 @@ fn set_txtime_sockopt(sock: &mio::net::UdpSocket) -> std::io::Result<()> { flags: 0, }; - // mio::net::UdpSocket doesn't implement AsFd (yet?). let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(sock.as_raw_fd()) }; setsockopt(&fd, TxTime, &config)?; @@ -696,7 +685,7 @@ fn std_time_to_u64(time: &std::time::Instant) -> u64 { const GSO_SEGMENT_SIZE: u16 = MAX_DATAGRAM_SIZE as u16; fn send_with_pacing( - socket: &mio::net::UdpSocket, + socket: &UdpSocket, buf: &[u8], send_info: &quiche::SendInfo, enable_gso: bool, @@ -727,7 +716,7 @@ fn send_with_pacing( } } -pub fn detect_gso(socket: &mio::net::UdpSocket, segment_size: usize) -> bool { +pub fn detect_gso(socket: &UdpSocket, segment_size: usize) -> bool { use nix::sys::socket::setsockopt; use nix::sys::socket::sockopt::UdpGsoSegment; use std::os::unix::io::AsRawFd; From 8eed730f315e3a6d0909a31da717b52b4b12d5e3 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Sun, 25 Aug 2024 03:43:22 +0200 Subject: [PATCH 3/9] fixing unit tests in main branch --- blocking_client/src/quiche_client_loop.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/blocking_client/src/quiche_client_loop.rs b/blocking_client/src/quiche_client_loop.rs index 5c9490b..d5924b2 100644 --- a/blocking_client/src/quiche_client_loop.rs +++ b/blocking_client/src/quiche_client_loop.rs @@ -76,7 +76,7 @@ pub fn client_loop( let mut buf = [0; 65535]; 'client: loop { - poll.poll(&mut events, Some(Duration::from_millis(100)))?; + poll.poll(&mut events, Some(Duration::from_millis(10)))?; 'read: loop { match socket.recv_from(&mut buf) { @@ -151,7 +151,7 @@ pub fn create_quiche_client_thread( let rng = SystemRandom::new(); 'client: loop { - poll.poll(&mut events, Some(Duration::from_millis(100))) + poll.poll(&mut events, Some(Duration::from_millis(10))) .unwrap(); if events.is_empty() { connection.on_timeout(); From a05f82dcfac033b3796c52babe01b545a2a0ded9 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Sun, 25 Aug 2024 10:57:58 +0200 Subject: [PATCH 4/9] avoiding timers and using one channel for packets and messages --- common/src/defaults.rs | 2 +- server/src/configure_server.rs | 1 + server/src/quiche_server_loop.rs | 318 +++++++++++++++---------------- 3 files changed, 155 insertions(+), 166 deletions(-) diff --git a/common/src/defaults.rs b/common/src/defaults.rs index 3352b5a..cbfb063 100644 --- a/common/src/defaults.rs +++ b/common/src/defaults.rs @@ -6,7 +6,7 @@ pub const DEFAULT_MAX_NB_CONNECTIONS: u64 = 10; pub const DEFAULT_MAX_ACK_DELAY: u64 = 25; pub const DEFAULT_ACK_EXPONENT: u64 = 3; pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"geyser"; -pub const MAX_DATAGRAM_SIZE: usize = 1200; +pub const MAX_DATAGRAM_SIZE: usize = 1350; pub const MAX_PAYLOAD_BUFFER: usize = 10 * MAX_DATAGRAM_SIZE; pub const DEFAULT_ENABLE_PACING: bool = true; pub const DEFAULT_USE_CC_BBR: bool = false; diff --git a/server/src/configure_server.rs b/server/src/configure_server.rs index 39d5cd0..8bafc9a 100644 --- a/server/src/configure_server.rs +++ b/server/src/configure_server.rs @@ -48,6 +48,7 @@ pub fn configure_server(quic_parameter: QuicParameters) -> anyhow::Result), + ClientMessage(Vec, u8), +} + struct DispatchingData { - pub sender: Sender<(Vec, u8)>, + pub sender: Sender, pub filters: Arc>>, } @@ -60,10 +65,8 @@ pub fn server_loop( let local_addr = socket.local_addr()?; let rng = SystemRandom::new(); let conn_id_seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap(); - let mut client_messsage_channel_by_id: HashMap< - u64, - mio_channel::Sender<(quiche::RecvInfo, Vec)>, - > = HashMap::new(); + let mut client_messsage_channel_by_id: HashMap> = + HashMap::new(); let clients_by_id: Arc, u64>>> = Arc::new(Mutex::new(HashMap::new())); @@ -203,7 +206,6 @@ pub fn server_loop( } }; - let (client_sender, client_reciver) = mio_channel::channel(); let (client_message_sx, client_message_rx) = mpsc::channel(); let current_client_id = client_id_counter; client_id_counter += 1; @@ -214,7 +216,6 @@ pub fn server_loop( conn, current_client_id, clients_by_id.clone(), - client_reciver, client_message_rx, filters.clone(), maximum_concurrent_streams_id, @@ -228,13 +229,13 @@ pub fn server_loop( lk.insert( scid.clone(), DispatchingData { - sender: client_message_sx, + sender: client_message_sx.clone(), filters, }, ); let mut clients_lk = clients_by_id.lock().unwrap(); clients_lk.insert(scid, current_client_id); - client_messsage_channel_by_id.insert(current_client_id, client_sender); + client_messsage_channel_by_id.insert(current_client_id, client_message_sx); } else { // get the existing client let client_id = match clients_lk.get(&hdr.dcid) { @@ -250,7 +251,10 @@ pub fn server_loop( }; match client_messsage_channel_by_id.get_mut(&client_id) { Some(channel) => { - if channel.send((recv_info, pkt_buf.to_vec())).is_err() { + if channel + .send(InternalMessage::Packet(recv_info, pkt_buf.to_vec())) + .is_err() + { // client is closed clients_lk.remove(&hdr.dcid); clients_lk.remove(&conn_id); @@ -272,8 +276,7 @@ fn create_client_task( connection: quiche::Connection, client_id: u64, client_id_by_scid: Arc, u64>>>, - mut receiver: mio_channel::Receiver<(quiche::RecvInfo, Vec)>, - message_channel: mpsc::Receiver<(Vec, u8)>, + receiver: mpsc::Receiver, filters: Arc>>, maximum_concurrent_streams_id: u64, stop_laggy_client: bool, @@ -290,13 +293,7 @@ fn create_client_task( let mut instance = Instant::now(); let mut closed = false; let mut out = [0; 65535]; - - let mut poll = mio::Poll::new().unwrap(); - let mut events = mio::Events::with_capacity(1024); - - poll.registry() - .register(&mut receiver, mio::Token(0), mio::Interest::READABLE) - .unwrap(); + let mut datagram_size = MAX_DATAGRAM_SIZE; let number_of_loops = Arc::new(AtomicU64::new(0)); let number_of_meesages_from_network = Arc::new(AtomicU64::new(0)); @@ -305,11 +302,6 @@ fn create_client_task( let number_of_writable_streams = Arc::new(AtomicU64::new(0)); let messages_added = Arc::new(AtomicU64::new(0)); let quit = Arc::new(AtomicBool::new(false)); - let max_burst_size = if enable_gso { - MAX_DATAGRAM_SIZE * 10 - } else { - MAX_DATAGRAM_SIZE - }; { let number_of_loops = number_of_loops.clone(); @@ -351,27 +343,92 @@ fn create_client_task( } }); } + + let mut continue_write = false; loop { number_of_loops.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - poll.poll(&mut events, Some(Duration::from_millis(1))) - .unwrap(); + let mut timeout = if continue_write { + Duration::from_secs(0) + } else { + Duration::from_millis(1) + }; - if !events.is_empty() { - while let Ok((info, mut buf)) = receiver.try_recv() { - let buf = buf.as_mut_slice(); - match connection.recv(buf, info) { - Ok(_) => { - number_of_meesages_from_network - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - Err(e) => { - log::error!("{} recv failed: {:?}", connection.trace_id(), e); + let mut did_read = false; + + while let Ok(internal_message) = receiver.recv_timeout(timeout) { + did_read = true; + timeout = Duration::from_secs(0); + + match internal_message { + InternalMessage::Packet(info, mut buf) => { + // handle packet from udp socket + let buf = buf.as_mut_slice(); + match connection.recv(buf, info) { + Ok(_) => { + number_of_meesages_from_network + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => { + log::error!("{} recv failed: {:?}", connection.trace_id(), e); + break; + } + }; + } + InternalMessage::ClientMessage(message, priority) => { + // handle message from client + let stream_id = next_stream; + next_stream = + get_next_unidi(stream_id, true, maximum_concurrent_streams_id); + + let close = if let Err(e) = + connection.stream_priority(stream_id, priority, incremental_priority) + { + if !closed { + log::error!( + "Unable to set priority for the stream {}, error {}", + stream_id, + e + ); + } + true + } else { + messages_added.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + match send_message( + &mut connection, + &mut partial_responses, + stream_id, + message, + ) { + Ok(_) => { + // do nothing + false + } + Err(e) => { + // done writing / queue is full + log::error!("got error sending message client : {}", e); + true + } + } + }; + + if close && !closed && stop_laggy_client { + if let Err(e) = connection.close(true, 1, b"laggy client") { + if e != quiche::Error::Done { + log::error!("error closing client : {}", e); + } + } else { + log::info!("Stopping laggy client : {}", connection.trace_id(),); + } + closed = true; break; } - }; + } } - continue; } + if !did_read && !continue_write { + connection.on_timeout(); + } + continue_write = false; if connection.is_in_early_data() || connection.is_established() { // Process all readable streams. @@ -402,13 +459,12 @@ fn create_client_task( if !connection.is_closed() && (connection.is_established() || connection.is_in_early_data()) { - let mut is_writable = true; + datagram_size = connection.max_send_udp_payload_size(); for stream_id in connection.writable() { if let Err(e) = handle_writable(&mut connection, &mut partial_responses, stream_id) { if e == quiche::Error::Done { - is_writable = false; break; } if !closed { @@ -419,81 +475,10 @@ fn create_client_task( } } } - - if is_writable { - loop { - let close = match message_channel.try_recv() { - Ok((message, priority)) => { - let stream_id = next_stream; - next_stream = - get_next_unidi(stream_id, true, maximum_concurrent_streams_id); - - if let Err(e) = connection.stream_priority( - stream_id, - priority, - incremental_priority, - ) { - if !closed { - log::error!( - "Unable to set priority for the stream {}, error {}", - stream_id, - e - ); - } - true - } else { - messages_added - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - match send_message( - &mut connection, - &mut partial_responses, - stream_id, - message, - ) { - Ok(_) => false, - Err(quiche::Error::Done) => { - // done writing / queue is full - break; - } - Err(e) => { - log::error!("error sending message : {e:?}"); - true - } - } - } - } - Err(e) => { - match e { - mpsc::TryRecvError::Empty => { - break; - } - mpsc::TryRecvError::Disconnected => { - // too many message the connection is lagging - log::error!("channel disconnected by dispatcher"); - true - } - } - } - }; - - if close && !closed && stop_laggy_client { - if let Err(e) = connection.close(true, 1, b"laggy client") { - if e != quiche::Error::Done { - log::error!("error closing client : {}", e); - } - } else { - log::info!("Stopping laggy client : {}", connection.trace_id(),); - } - closed = true; - break; - } - } - } } if instance.elapsed() > Duration::from_secs(1) { instance = Instant::now(); - connection.on_timeout(); handle_path_events(&mut connection); // See whether source Connection IDs have been retired. @@ -516,61 +501,61 @@ fn create_client_task( let mut send_message_to = None; let mut total_length = 0; - let mut done_writing = false; - 'writing_loop: while !done_writing { - while total_length < max_burst_size { - match connection.send(&mut out[total_length..max_burst_size]) { - Ok((len, send_info)) => { - number_of_meesages_to_network - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - send_message_to.get_or_insert(send_info); - total_length += len; - if len < MAX_DATAGRAM_SIZE { - break; - } - } - Err(quiche::Error::BufferTooShort) => { - // retry later - log::trace!("{} buffer to short", connection.trace_id()); - break; - } - Err(quiche::Error::Done) => { - done_writing = true; - break; - } - Err(e) => { - log::error!( - "{} send failed: {:?}, closing connection", - connection.trace_id(), - e - ); - connection.close(false, 0x1, b"fail").ok(); - break 'writing_loop; - } - }; - } + let max_burst_size = if enable_gso { + std::cmp::min(datagram_size * 10, out.len()) + } else { + datagram_size + }; - if total_length > 0 && send_message_to.is_some() { - let send_result = if enable_pacing { - send_with_pacing( - &socket, - &out[..total_length], - &send_message_to.unwrap(), - enable_gso, - ) - } else { - socket.send(&out[..total_length]) - }; - match send_result { - Ok(_written) => {} - Err(e) => { - log::error!("sending failed with error : {e:?}"); + while total_length < max_burst_size { + match connection.send(&mut out[total_length..max_burst_size]) { + Ok((len, send_info)) => { + number_of_meesages_to_network + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + send_message_to.get_or_insert(send_info); + total_length += len; + if len < datagram_size { + break; } } - total_length = 0; + Err(quiche::Error::BufferTooShort) => { + // retry later + log::trace!("{} buffer to short", connection.trace_id()); + break; + } + Err(quiche::Error::Done) => { + break; + } + Err(e) => { + log::error!( + "{} send failed: {:?}, closing connection", + connection.trace_id(), + e + ); + connection.close(false, 0x1, b"fail").ok(); + } + }; + } + + if total_length > 0 && send_message_to.is_some() { + continue_write = true; + let send_result = if enable_pacing { + send_with_pacing( + &socket, + &out[..total_length], + &send_message_to.unwrap(), + enable_gso, + datagram_size as u16, + ) } else { - break; + socket.send(&out[..total_length]) + }; + match send_result { + Ok(_written) => {} + Err(e) => { + log::error!("sending failed with error : {e:?}"); + } } } @@ -644,7 +629,11 @@ fn create_dispatching_thread( bincode::serialize(&message).expect("Message should be serializable in binary"); for id in dispatching_connections.iter() { let data = dispatching_connections_lk.get(id).unwrap(); - if data.sender.send((binary.clone(), priority)).is_err() { + if data + .sender + .send(InternalMessage::ClientMessage(binary.clone(), priority)) + .is_err() + { // client is closed dispatching_connections_lk.remove(id); } @@ -682,13 +671,12 @@ fn std_time_to_u64(time: &std::time::Instant) -> u64 { sec * NANOS_PER_SEC + nsec as u64 } -const GSO_SEGMENT_SIZE: u16 = MAX_DATAGRAM_SIZE as u16; - fn send_with_pacing( socket: &UdpSocket, buf: &[u8], send_info: &quiche::SendInfo, enable_gso: bool, + segment_size: u16, ) -> std::io::Result { use nix::sys::socket::sendmsg; use nix::sys::socket::ControlMessage; @@ -707,7 +695,7 @@ fn send_with_pacing( let mut cmgs = vec![cmsg_txtime]; if enable_gso { - cmgs.push(ControlMessage::UdpGsoSegments(&GSO_SEGMENT_SIZE)); + cmgs.push(ControlMessage::UdpGsoSegments(&segment_size)); } match sendmsg(sockfd, &iov, &cmgs, MsgFlags::empty(), Some(&dst)) { From c4041fd9852f16afb7190eddc8641f2364f266bc Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Mon, 26 Aug 2024 12:46:34 +0200 Subject: [PATCH 5/9] adding more logs to debug the issues --- examples/tester-client/src/main.rs | 20 ++++++++++---------- server/src/quiche_server_loop.rs | 13 +++++++++++-- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/examples/tester-client/src/main.rs b/examples/tester-client/src/main.rs index 9ec02e5..9ef1730 100644 --- a/examples/tester-client/src/main.rs +++ b/examples/tester-client/src/main.rs @@ -428,23 +428,23 @@ pub async fn main() { transaction_notifications_stats.add_value(&transaction_notifications); block_notifications_stats.add_value(&block_notifications); - println!("------------------------------------------"); - println!( + log::info!("------------------------------------------"); + log::info!( " DateTime : {:?}", instant.duration_since(start_instance).as_secs() ); - println!(" Bytes Transfered : {} Mbs/s", bytes_transfered / 1_000_000); - println!( + log::info!(" Bytes Transfered : {} Mbs/s", bytes_transfered / 1_000_000); + log::info!( " Accounts transfered size (uncompressed) : {} Mbs", total_accounts_size / 1_000_000 ); - println!(" Accounts Notified : {}", account_notification); - println!(" Slots Notified : {}", slot_notifications); - println!(" Blockmeta notified : {}", blockmeta_notifications); - println!(" Transactions notified : {}", transaction_notifications); - println!(" Blocks notified : {}", block_notifications); + log::info!(" Accounts Notified : {}", account_notification); + log::info!(" Slots Notified : {}", slot_notifications); + log::info!(" Blockmeta notified : {}", blockmeta_notifications); + log::info!(" Transactions notified : {}", transaction_notifications); + log::info!(" Blocks notified : {}", block_notifications); - println!(" Cluster Slots: {}, Account Slot: {}, Slot Notification slot: {}, BlockMeta slot: {}, Block slot: {}", cluster_slot.load(std::sync::atomic::Ordering::Relaxed), account_slot.load(std::sync::atomic::Ordering::Relaxed), slot_slot.load(std::sync::atomic::Ordering::Relaxed), blockmeta_slot.load(std::sync::atomic::Ordering::Relaxed), block_slot.load(std::sync::atomic::Ordering::Relaxed)); + log::info!(" Cluster Slots: {}, Account Slot: {}, Slot Notification slot: {}, BlockMeta slot: {}, Block slot: {}", cluster_slot.load(std::sync::atomic::Ordering::Relaxed), account_slot.load(std::sync::atomic::Ordering::Relaxed), slot_slot.load(std::sync::atomic::Ordering::Relaxed), blockmeta_slot.load(std::sync::atomic::Ordering::Relaxed), block_slot.load(std::sync::atomic::Ordering::Relaxed)); if counter % 10 == 0 { println!("------------------STATS------------------------"); diff --git a/server/src/quiche_server_loop.rs b/server/src/quiche_server_loop.rs index 3b6dc93..e065ed1 100644 --- a/server/src/quiche_server_loop.rs +++ b/server/src/quiche_server_loop.rs @@ -346,6 +346,7 @@ fn create_client_task( let mut continue_write = false; loop { + log::debug!("start"); number_of_loops.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let mut timeout = if continue_write { Duration::from_secs(0) @@ -361,6 +362,7 @@ fn create_client_task( match internal_message { InternalMessage::Packet(info, mut buf) => { + log::debug!("packet"); // handle packet from udp socket let buf = buf.as_mut_slice(); match connection.recv(buf, info) { @@ -375,6 +377,7 @@ fn create_client_task( }; } InternalMessage::ClientMessage(message, priority) => { + log::debug!("client"); // handle message from client let stream_id = next_stream; next_stream = @@ -425,6 +428,7 @@ fn create_client_task( } } } + log::debug!("readble"); if !did_read && !continue_write { connection.on_timeout(); } @@ -455,7 +459,7 @@ fn create_client_task( } } } - + log::debug!("writable"); if !connection.is_closed() && (connection.is_established() || connection.is_in_early_data()) { @@ -478,6 +482,7 @@ fn create_client_task( } if instance.elapsed() > Duration::from_secs(1) { + log::debug!("other tasks"); instance = Instant::now(); handle_path_events(&mut connection); @@ -508,6 +513,7 @@ fn create_client_task( datagram_size }; + log::debug!("creating packets"); while total_length < max_burst_size { match connection.send(&mut out[total_length..max_burst_size]) { Ok((len, send_info)) => { @@ -539,6 +545,7 @@ fn create_client_task( } if total_length > 0 && send_message_to.is_some() { + log::debug!("sending :{total_length:?}"); continue_write = true; let send_result = if enable_pacing { send_with_pacing( @@ -552,7 +559,9 @@ fn create_client_task( socket.send(&out[..total_length]) }; match send_result { - Ok(_written) => {} + Ok(_written) => { + log::debug!("finished sending"); + } Err(e) => { log::error!("sending failed with error : {e:?}"); } From 210540409a4755ec1d5c016bbcfc741cd1d22e37 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Tue, 27 Aug 2024 14:40:27 +0200 Subject: [PATCH 6/9] moving some logs to trace and adding logs for draining --- server/src/quiche_server_loop.rs | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/server/src/quiche_server_loop.rs b/server/src/quiche_server_loop.rs index e065ed1..bc52d0b 100644 --- a/server/src/quiche_server_loop.rs +++ b/server/src/quiche_server_loop.rs @@ -294,6 +294,7 @@ fn create_client_task( let mut closed = false; let mut out = [0; 65535]; let mut datagram_size = MAX_DATAGRAM_SIZE; + let mut logged_is_draining = false; let number_of_loops = Arc::new(AtomicU64::new(0)); let number_of_meesages_from_network = Arc::new(AtomicU64::new(0)); @@ -346,7 +347,7 @@ fn create_client_task( let mut continue_write = false; loop { - log::debug!("start"); + log::trace!("start"); number_of_loops.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let mut timeout = if continue_write { Duration::from_secs(0) @@ -362,7 +363,7 @@ fn create_client_task( match internal_message { InternalMessage::Packet(info, mut buf) => { - log::debug!("packet"); + log::trace!("packet"); // handle packet from udp socket let buf = buf.as_mut_slice(); match connection.recv(buf, info) { @@ -377,7 +378,7 @@ fn create_client_task( }; } InternalMessage::ClientMessage(message, priority) => { - log::debug!("client"); + log::trace!("client"); // handle message from client let stream_id = next_stream; next_stream = @@ -428,7 +429,7 @@ fn create_client_task( } } } - log::debug!("readble"); + log::trace!("readble"); if !did_read && !continue_write { connection.on_timeout(); } @@ -459,7 +460,7 @@ fn create_client_task( } } } - log::debug!("writable"); + log::trace!("writable"); if !connection.is_closed() && (connection.is_established() || connection.is_in_early_data()) { @@ -482,7 +483,7 @@ fn create_client_task( } if instance.elapsed() > Duration::from_secs(1) { - log::debug!("other tasks"); + log::trace!("other tasks"); instance = Instant::now(); handle_path_events(&mut connection); @@ -513,7 +514,7 @@ fn create_client_task( datagram_size }; - log::debug!("creating packets"); + log::trace!("creating packets"); while total_length < max_burst_size { match connection.send(&mut out[total_length..max_burst_size]) { Ok((len, send_info)) => { @@ -568,7 +569,20 @@ fn create_client_task( } } + if !logged_is_draining && connection.is_draining() { + log::warn!("connection is draining"); + logged_is_draining = true; + } + if connection.is_closed() { + if let Some(e) = connection.peer_error() { + log::error!("peer error : {e:?} "); + } + + if let Some(e) = connection.local_error() { + log::error!("local error : {e:?} "); + } + log::info!( "{} connection closed {:?}", connection.trace_id(), From 8dcedb586e80d771465ab0c2ccc44dd1fad7bcb3 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Thu, 29 Aug 2024 11:01:11 +0200 Subject: [PATCH 7/9] updating timeout for mpsc channel --- server/src/quiche_server_loop.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/quiche_server_loop.rs b/server/src/quiche_server_loop.rs index bc52d0b..f3d0f9f 100644 --- a/server/src/quiche_server_loop.rs +++ b/server/src/quiche_server_loop.rs @@ -345,14 +345,14 @@ fn create_client_task( }); } - let mut continue_write = false; + let mut continue_write = true; loop { - log::trace!("start"); + log::debug!("start"); number_of_loops.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let mut timeout = if continue_write { Duration::from_secs(0) } else { - Duration::from_millis(1) + connection.timeout().unwrap_or(Duration::from_secs(1)) }; let mut did_read = false; @@ -363,7 +363,7 @@ fn create_client_task( match internal_message { InternalMessage::Packet(info, mut buf) => { - log::trace!("packet"); + log::debug!("packet"); // handle packet from udp socket let buf = buf.as_mut_slice(); match connection.recv(buf, info) { @@ -378,7 +378,7 @@ fn create_client_task( }; } InternalMessage::ClientMessage(message, priority) => { - log::trace!("client"); + log::debug!("client"); // handle message from client let stream_id = next_stream; next_stream = @@ -429,7 +429,7 @@ fn create_client_task( } } } - log::trace!("readble"); + log::debug!("readble"); if !did_read && !continue_write { connection.on_timeout(); } @@ -460,7 +460,7 @@ fn create_client_task( } } } - log::trace!("writable"); + log::debug!("writable"); if !connection.is_closed() && (connection.is_established() || connection.is_in_early_data()) { @@ -483,7 +483,7 @@ fn create_client_task( } if instance.elapsed() > Duration::from_secs(1) { - log::trace!("other tasks"); + log::debug!("other tasks"); instance = Instant::now(); handle_path_events(&mut connection); @@ -514,7 +514,7 @@ fn create_client_task( datagram_size }; - log::trace!("creating packets"); + log::debug!("creating packets"); while total_length < max_burst_size { match connection.send(&mut out[total_length..max_burst_size]) { Ok((len, send_info)) => { From 150f0012572f7beace559aa11bd20fd9fa385273 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Fri, 30 Aug 2024 11:04:15 +0200 Subject: [PATCH 8/9] updating client code to set default MTPU discovery config --- client/src/non_blocking/client.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/client/src/non_blocking/client.rs b/client/src/non_blocking/client.rs index 08daf9c..a2d0bbc 100644 --- a/client/src/non_blocking/client.rs +++ b/client/src/non_blocking/client.rs @@ -51,7 +51,6 @@ pub fn create_client_endpoint(connection_parameters: ConnectionParameters) -> En transport_config.max_concurrent_uni_streams(VarInt::from( connection_parameters.max_number_of_streams as u32, )); - transport_config.mtu_discovery_config(None); transport_config.crypto_buffer_size(64 * 1024); transport_config @@ -128,10 +127,12 @@ impl Client { let message = recv_message(recv_stream, timeout).await; match message { Ok(message) => { - let _ = sender.send(message); + if let Err(e) = sender.send(message) { + log::error!("Message sent error : {:?}", e) + } } Err(e) => { - log::trace!("Error getting message {}", e); + log::debug!("Error getting message {:?}", e); } } }); @@ -140,10 +141,11 @@ impl Client { ConnectionError::ConnectionClosed(_) | ConnectionError::ApplicationClosed(_) | ConnectionError::LocallyClosed => { + log::debug!("Got {:?} while listing to the connection", e); break; } _ => { - log::error!("Got {} while listing to the connection", e); + log::error!("Got {:?} while listing to the connection", e); break; } }, From c9afd85b10cdcdd735724daa478dd1c2fba58dfa Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Wed, 11 Sep 2024 10:42:19 +0200 Subject: [PATCH 9/9] Adding more logs and testing --- Cargo.lock | 7 +-- Cargo.toml | 2 +- client/src/non_blocking/client.rs | 2 +- common/src/defaults.rs | 2 +- quiche/Cargo.toml | 1 - server/Cargo.toml | 3 -- server/src/configure_server.rs | 2 +- server/src/quiche_server_loop.rs | 71 +++---------------------------- 8 files changed, 12 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c52036a..ef5746c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3014,7 +3014,6 @@ dependencies = [ "bincode", "itertools", "log", - "mio", "quic-geyser-common", "quic-geyser-server", "quiche", @@ -3034,8 +3033,6 @@ dependencies = [ "itertools", "libc", "log", - "mio", - "mio_channel", "nix", "quic-geyser-common", "quic-geyser-quiche-utils", @@ -3111,9 +3108,9 @@ dependencies = [ [[package]] name = "quiche" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7986f11a8f2c63ade53d9ce49ce46d912c8d4770defbb117e16fafae76d8b5e" +checksum = "28e5a763fecb47867bd3720f69ec87031ff42fda1dc88be2cb5fbb3a558fa5e4" dependencies = [ "boring", "cmake", diff --git a/Cargo.toml b/Cargo.toml index c1bdbf0..48116b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,7 @@ lz4 = "1.24.0" mio = "0.8.11" mio_channel = "0.1.3" -quiche = "=0.21.0" +quiche = "=0.22.0" boring = "4.6.0" ring = "0.17.8" diff --git a/client/src/non_blocking/client.rs b/client/src/non_blocking/client.rs index a2d0bbc..7060cdb 100644 --- a/client/src/non_blocking/client.rs +++ b/client/src/non_blocking/client.rs @@ -132,7 +132,7 @@ impl Client { } } Err(e) => { - log::debug!("Error getting message {:?}", e); + log::trace!("Error getting message {:?}", e); } } }); diff --git a/common/src/defaults.rs b/common/src/defaults.rs index cbfb063..da33b73 100644 --- a/common/src/defaults.rs +++ b/common/src/defaults.rs @@ -10,5 +10,5 @@ pub const MAX_DATAGRAM_SIZE: usize = 1350; pub const MAX_PAYLOAD_BUFFER: usize = 10 * MAX_DATAGRAM_SIZE; pub const DEFAULT_ENABLE_PACING: bool = true; pub const DEFAULT_USE_CC_BBR: bool = false; -pub const DEFAULT_INCREMENTAL_PRIORITY: bool = true; +pub const DEFAULT_INCREMENTAL_PRIORITY: bool = false; pub const DEFAULT_ENABLE_GSO: bool = true; diff --git a/quiche/Cargo.toml b/quiche/Cargo.toml index 9430c74..524b6e1 100644 --- a/quiche/Cargo.toml +++ b/quiche/Cargo.toml @@ -15,7 +15,6 @@ quic-geyser-common = { workspace = true } bincode = { workspace = true } ring = {workspace = true} quiche = { workspace = true } -mio = { workspace = true } [dev-dependencies] rand = { workspace = true } diff --git a/server/Cargo.toml b/server/Cargo.toml index 35c2b62..b01ed38 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -19,9 +19,6 @@ quic-geyser-quiche-utils = { workspace = true } rcgen = { workspace = true } boring = { workspace = true } -mio = { workspace = true } -mio_channel = { workspace = true } - libc = "0.2" nix = { version = "0.27", features = ["net", "socket", "uio"] } diff --git a/server/src/configure_server.rs b/server/src/configure_server.rs index 8bafc9a..be7f64f 100644 --- a/server/src/configure_server.rs +++ b/server/src/configure_server.rs @@ -42,7 +42,7 @@ pub fn configure_server(quic_parameter: QuicParameters) -> anyhow::Result { - log::debug!("packet"); // handle packet from udp socket let buf = buf.as_mut_slice(); match connection.recv(buf, info) { - Ok(_) => { - number_of_meesages_from_network - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } + Ok(_) => {} Err(e) => { log::error!("{} recv failed: {:?}", connection.trace_id(), e); break; @@ -378,7 +325,6 @@ fn create_client_task( }; } InternalMessage::ClientMessage(message, priority) => { - log::debug!("client"); // handle message from client let stream_id = next_stream; next_stream = @@ -396,7 +342,6 @@ fn create_client_task( } true } else { - messages_added.fetch_add(1, std::sync::atomic::Ordering::Relaxed); match send_message( &mut connection, &mut partial_responses, @@ -429,7 +374,7 @@ fn create_client_task( } } } - log::debug!("readble"); + if !did_read && !continue_write { connection.on_timeout(); } @@ -438,7 +383,6 @@ fn create_client_task( if connection.is_in_early_data() || connection.is_established() { // Process all readable streams. for stream in connection.readable() { - number_of_readable_streams.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let message = recv_message(&mut connection, &mut read_streams, stream); match message { Ok(Some(message)) => match message { @@ -460,7 +404,7 @@ fn create_client_task( } } } - log::debug!("writable"); + if !connection.is_closed() && (connection.is_established() || connection.is_in_early_data()) { @@ -514,12 +458,9 @@ fn create_client_task( datagram_size }; - log::debug!("creating packets"); while total_length < max_burst_size { match connection.send(&mut out[total_length..max_burst_size]) { Ok((len, send_info)) => { - number_of_meesages_to_network - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); send_message_to.get_or_insert(send_info); total_length += len; if len < datagram_size { @@ -679,7 +620,7 @@ fn set_txtime_sockopt(sock: &UdpSocket) -> std::io::Result<()> { let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(sock.as_raw_fd()) }; setsockopt(&fd, TxTime, &config)?; - + setsockopt(&fd, ReusePort, &true)?; Ok(()) }