diff --git a/Cargo.lock b/Cargo.lock index c52036a..ef5746c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3014,7 +3014,6 @@ dependencies = [ "bincode", "itertools", "log", - "mio", "quic-geyser-common", "quic-geyser-server", "quiche", @@ -3034,8 +3033,6 @@ dependencies = [ "itertools", "libc", "log", - "mio", - "mio_channel", "nix", "quic-geyser-common", "quic-geyser-quiche-utils", @@ -3111,9 +3108,9 @@ dependencies = [ [[package]] name = "quiche" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7986f11a8f2c63ade53d9ce49ce46d912c8d4770defbb117e16fafae76d8b5e" +checksum = "28e5a763fecb47867bd3720f69ec87031ff42fda1dc88be2cb5fbb3a558fa5e4" dependencies = [ "boring", "cmake", diff --git a/Cargo.toml b/Cargo.toml index c1bdbf0..48116b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,7 @@ lz4 = "1.24.0" mio = "0.8.11" mio_channel = "0.1.3" -quiche = "=0.21.0" +quiche = "=0.22.0" boring = "4.6.0" ring = "0.17.8" diff --git a/blocking_client/src/quiche_client_loop.rs b/blocking_client/src/quiche_client_loop.rs index 42aa196..d5924b2 100644 --- a/blocking_client/src/quiche_client_loop.rs +++ b/blocking_client/src/quiche_client_loop.rs @@ -76,7 +76,7 @@ pub fn client_loop( let mut buf = [0; 65535]; 'client: loop { - poll.poll(&mut events, Some(Duration::from_micros(100)))?; + poll.poll(&mut events, Some(Duration::from_millis(10)))?; 'read: loop { match socket.recv_from(&mut buf) { @@ -151,7 +151,7 @@ pub fn create_quiche_client_thread( let rng = SystemRandom::new(); 'client: loop { - poll.poll(&mut events, Some(Duration::from_millis(100))) + poll.poll(&mut events, Some(Duration::from_millis(10))) .unwrap(); if events.is_empty() { connection.on_timeout(); @@ -249,7 +249,6 @@ pub fn create_quiche_client_thread( // no more new messages } std::sync::mpsc::TryRecvError::Disconnected => { - log::error!("message_queue disconnected"); let _ = connection.close(true, 0, b"no longer needed"); } } @@ -405,7 +404,10 @@ mod tests { let (server_send_queue, rx_sent_queue) = mpsc::channel::(); let _server_loop_jh = std::thread::spawn(move || { if let Err(e) = server_loop( - QuicParameters::default(), + QuicParameters { + incremental_priority: true, + ..Default::default() + }, socket_addr, rx_sent_queue, CompressionType::Lz4Fast(8), diff --git a/client/src/non_blocking/client.rs b/client/src/non_blocking/client.rs index 08daf9c..7060cdb 100644 --- a/client/src/non_blocking/client.rs +++ b/client/src/non_blocking/client.rs @@ -51,7 +51,6 @@ pub fn create_client_endpoint(connection_parameters: ConnectionParameters) -> En transport_config.max_concurrent_uni_streams(VarInt::from( connection_parameters.max_number_of_streams as u32, )); - transport_config.mtu_discovery_config(None); transport_config.crypto_buffer_size(64 * 1024); transport_config @@ -128,10 +127,12 @@ impl Client { let message = recv_message(recv_stream, timeout).await; match message { Ok(message) => { - let _ = sender.send(message); + if let Err(e) = sender.send(message) { + log::error!("Message sent error : {:?}", e) + } } Err(e) => { - log::trace!("Error getting message {}", e); + log::trace!("Error getting message {:?}", e); } } }); @@ -140,10 +141,11 @@ impl Client { ConnectionError::ConnectionClosed(_) | ConnectionError::ApplicationClosed(_) | ConnectionError::LocallyClosed => { + log::debug!("Got {:?} while listing to the connection", e); break; } _ => { - log::error!("Got {} while listing to the connection", e); + log::error!("Got {:?} while listing to the connection", e); break; } }, diff --git a/common/src/config.rs b/common/src/config.rs index 94b8a1a..c3bcc4b 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -7,8 +7,8 @@ use crate::{ defaults::{ DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_ENABLE_GSO, DEFAULT_ENABLE_PACING, DEFAULT_INCREMENTAL_PRIORITY, DEFAULT_MAX_ACK_DELAY, - DEFAULT_MAX_MESSAGES_IN_QUEUE, DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE, - DEFAULT_MAX_STREAMS, DEFAULT_USE_CC_BBR, + DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS, + DEFAULT_USE_CC_BBR, }, }; @@ -69,7 +69,6 @@ pub struct QuicParameters { pub enable_pacing: bool, pub use_cc_bbr: bool, pub incremental_priority: bool, - pub max_messages_in_queue: u64, pub enable_gso: bool, } @@ -85,7 +84,6 @@ impl Default for QuicParameters { enable_pacing: DEFAULT_ENABLE_PACING, use_cc_bbr: DEFAULT_USE_CC_BBR, incremental_priority: DEFAULT_INCREMENTAL_PRIORITY, - max_messages_in_queue: DEFAULT_MAX_MESSAGES_IN_QUEUE, enable_gso: DEFAULT_ENABLE_GSO, } } diff --git a/common/src/defaults.rs b/common/src/defaults.rs index f4d017f..da33b73 100644 --- a/common/src/defaults.rs +++ b/common/src/defaults.rs @@ -1,5 +1,4 @@ pub const DEFAULT_MAX_STREAMS: u64 = 128 * 1024; -pub const DEFAULT_MAX_MESSAGES_IN_QUEUE: u64 = 1024 * 1024; pub const MAX_ALLOWED_PARTIAL_RESPONSES: u64 = DEFAULT_MAX_STREAMS * 3 / 4; pub const DEFAULT_MAX_RECIEVE_WINDOW_SIZE: u64 = 24 * 1024 * 1024; // 24 MBs pub const DEFAULT_CONNECTION_TIMEOUT: u64 = 10; @@ -7,9 +6,9 @@ pub const DEFAULT_MAX_NB_CONNECTIONS: u64 = 10; pub const DEFAULT_MAX_ACK_DELAY: u64 = 25; pub const DEFAULT_ACK_EXPONENT: u64 = 3; pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"geyser"; -pub const MAX_DATAGRAM_SIZE: usize = 1200; +pub const MAX_DATAGRAM_SIZE: usize = 1350; pub const MAX_PAYLOAD_BUFFER: usize = 10 * MAX_DATAGRAM_SIZE; pub const DEFAULT_ENABLE_PACING: bool = true; pub const DEFAULT_USE_CC_BBR: bool = false; -pub const DEFAULT_INCREMENTAL_PRIORITY: bool = true; +pub const DEFAULT_INCREMENTAL_PRIORITY: bool = false; pub const DEFAULT_ENABLE_GSO: bool = true; diff --git a/examples/tester-client/src/main.rs b/examples/tester-client/src/main.rs index 9ec02e5..9ef1730 100644 --- a/examples/tester-client/src/main.rs +++ b/examples/tester-client/src/main.rs @@ -428,23 +428,23 @@ pub async fn main() { transaction_notifications_stats.add_value(&transaction_notifications); block_notifications_stats.add_value(&block_notifications); - println!("------------------------------------------"); - println!( + log::info!("------------------------------------------"); + log::info!( " DateTime : {:?}", instant.duration_since(start_instance).as_secs() ); - println!(" Bytes Transfered : {} Mbs/s", bytes_transfered / 1_000_000); - println!( + log::info!(" Bytes Transfered : {} Mbs/s", bytes_transfered / 1_000_000); + log::info!( " Accounts transfered size (uncompressed) : {} Mbs", total_accounts_size / 1_000_000 ); - println!(" Accounts Notified : {}", account_notification); - println!(" Slots Notified : {}", slot_notifications); - println!(" Blockmeta notified : {}", blockmeta_notifications); - println!(" Transactions notified : {}", transaction_notifications); - println!(" Blocks notified : {}", block_notifications); + log::info!(" Accounts Notified : {}", account_notification); + log::info!(" Slots Notified : {}", slot_notifications); + log::info!(" Blockmeta notified : {}", blockmeta_notifications); + log::info!(" Transactions notified : {}", transaction_notifications); + log::info!(" Blocks notified : {}", block_notifications); - println!(" Cluster Slots: {}, Account Slot: {}, Slot Notification slot: {}, BlockMeta slot: {}, Block slot: {}", cluster_slot.load(std::sync::atomic::Ordering::Relaxed), account_slot.load(std::sync::atomic::Ordering::Relaxed), slot_slot.load(std::sync::atomic::Ordering::Relaxed), blockmeta_slot.load(std::sync::atomic::Ordering::Relaxed), block_slot.load(std::sync::atomic::Ordering::Relaxed)); + log::info!(" Cluster Slots: {}, Account Slot: {}, Slot Notification slot: {}, BlockMeta slot: {}, Block slot: {}", cluster_slot.load(std::sync::atomic::Ordering::Relaxed), account_slot.load(std::sync::atomic::Ordering::Relaxed), slot_slot.load(std::sync::atomic::Ordering::Relaxed), blockmeta_slot.load(std::sync::atomic::Ordering::Relaxed), block_slot.load(std::sync::atomic::Ordering::Relaxed)); if counter % 10 == 0 { println!("------------------STATS------------------------"); diff --git a/quiche/Cargo.toml b/quiche/Cargo.toml index 9430c74..524b6e1 100644 --- a/quiche/Cargo.toml +++ b/quiche/Cargo.toml @@ -15,7 +15,6 @@ quic-geyser-common = { workspace = true } bincode = { workspace = true } ring = {workspace = true} quiche = { workspace = true } -mio = { workspace = true } [dev-dependencies] rand = { workspace = true } diff --git a/server/Cargo.toml b/server/Cargo.toml index 35c2b62..b01ed38 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -19,9 +19,6 @@ quic-geyser-quiche-utils = { workspace = true } rcgen = { workspace = true } boring = { workspace = true } -mio = { workspace = true } -mio_channel = { workspace = true } - libc = "0.2" nix = { version = "0.27", features = ["net", "socket", "uio"] } diff --git a/server/src/configure_server.rs b/server/src/configure_server.rs index 39d5cd0..be7f64f 100644 --- a/server/src/configure_server.rs +++ b/server/src/configure_server.rs @@ -42,12 +42,13 @@ pub fn configure_server(quic_parameter: QuicParameters) -> anyhow::Result), + ClientMessage(Vec, u8), +} + struct DispatchingData { - pub sender: Sender<(Vec, u8)>, + pub sender: Sender, pub filters: Arc>>, - pub messages_in_queue: Arc, } type DispachingConnections = Arc, DispatchingData>>>; @@ -52,34 +57,20 @@ pub fn server_loop( stop_laggy_client: bool, ) -> anyhow::Result<()> { let maximum_concurrent_streams_id = u64::MAX; - let max_messages_in_queue = quic_params.max_messages_in_queue; let mut config = configure_server(quic_params)?; - let mut socket = mio::net::UdpSocket::bind(socket_addr)?; - let mut poll = mio::Poll::new()?; - let mut events = mio::Events::with_capacity(1024); - - poll.registry() - .register(&mut socket, mio::Token(0), mio::Interest::READABLE)?; - + let socket = Arc::new(UdpSocket::bind(socket_addr)?); let mut buf = [0; 65535]; let mut out = [0; MAX_DATAGRAM_SIZE]; let local_addr = socket.local_addr()?; let rng = SystemRandom::new(); let conn_id_seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap(); - let mut client_messsage_channel_by_id: HashMap< - u64, - mio_channel::Sender<(quiche::RecvInfo, Vec)>, - > = HashMap::new(); + let mut client_messsage_channel_by_id: HashMap> = + HashMap::new(); let clients_by_id: Arc, u64>>> = Arc::new(Mutex::new(HashMap::new())); - let (write_sender, mut write_reciver) = mio_channel::channel::<(quiche::SendInfo, Vec)>(); - - poll.registry() - .register(&mut write_reciver, mio::Token(1), mio::Interest::READABLE)?; - let enable_pacing = if quic_params.enable_pacing { set_txtime_sockopt(&socket).is_ok() } else { @@ -103,218 +94,197 @@ pub fn server_loop( message_send_queue, dispatching_connections.clone(), compression_type, - max_messages_in_queue, ); let mut client_id_counter = 0; loop { - poll.poll(&mut events, Some(Duration::from_millis(10)))?; - let do_read = events.is_empty() || events.iter().any(|x| x.token() == Token(0)); - if do_read { - 'read: loop { - let (len, from) = match socket.recv_from(&mut buf) { - Ok(v) => v, - Err(e) => { - if e.kind() == std::io::ErrorKind::WouldBlock { - log::trace!("recv() would block"); - break 'read; - } - bail!("recv() failed: {:?}", e); - } - }; - - let pkt_buf = &mut buf[..len]; - - // Parse the QUIC packet's header. - let hdr = match quiche::Header::from_slice(pkt_buf, quiche::MAX_CONN_ID_LEN) { - Ok(v) => v, - - Err(e) => { - log::error!("Parsing packet header failed: {:?}", e); - continue 'read; - } - }; - - let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid); - let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN]; - let conn_id: ConnectionId<'static> = conn_id.to_vec().into(); - let mut clients_lk = clients_by_id.lock().unwrap(); - if !clients_lk.contains_key(&hdr.dcid) && !clients_lk.contains_key(&conn_id) { - if hdr.ty != quiche::Type::Initial { - log::error!("Packet is not Initial"); - continue 'read; - } - - if !quiche::version_is_supported(hdr.version) { - log::warn!("Doing version negotiation"); - let len = - quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out).unwrap(); - - let out = &out[..len]; - - if let Err(e) = socket.send_to(out, from) { - if e.kind() == std::io::ErrorKind::WouldBlock { - break; - } - panic!("send() failed: {:?}", e); - } - continue 'read; - } - - let mut scid = [0; quiche::MAX_CONN_ID_LEN]; - scid.copy_from_slice(&conn_id); - - let scid = quiche::ConnectionId::from_ref(&scid); - - // Token is always present in Initial packets. - let token = hdr.token.as_ref().unwrap(); - - // Do stateless retry if the client didn't send a token. - if token.is_empty() { - log::debug!("Doing stateless retry"); - - let new_token = mint_token(&hdr, &from); - - let len = quiche::retry( - &hdr.scid, - &hdr.dcid, - &scid, - &new_token, - hdr.version, - &mut out, - ) - .unwrap(); - - if let Err(e) = socket.send_to(&out[..len], from) { - log::error!("Error sending retry messages : {e:?}"); - } - continue 'read; - } - - let odcid = validate_token(&from, token); - - if odcid.is_none() { - log::error!("Invalid address validation token"); - continue 'read; - } - - if scid.len() != hdr.dcid.len() { - log::error!("Invalid destination connection ID"); - continue 'read; - } - - let scid = hdr.dcid.clone(); - - log::info!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid); - - let mut conn = - quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config)?; - - let recv_info = quiche::RecvInfo { - to: socket.local_addr().unwrap(), - from, - }; - // Process potentially coalesced packets. - match conn.recv(pkt_buf, recv_info) { - Ok(v) => v, - Err(e) => { - log::error!("{} recv failed: {:?}", conn.trace_id(), e); - continue 'read; - } - }; - - let (client_sender, client_reciver) = mio_channel::channel(); - let (client_message_sx, client_message_rx) = mpsc::channel(); - let messages_in_queue = Arc::new(AtomicUsize::new(0)); - let current_client_id = client_id_counter; - client_id_counter += 1; - - let filters = Arc::new(RwLock::new(Vec::new())); - create_client_task( - conn, - current_client_id, - clients_by_id.clone(), - client_reciver, - write_sender.clone(), - client_message_rx, - filters.clone(), - maximum_concurrent_streams_id, - stop_laggy_client, - messages_in_queue.clone(), - quic_params.incremental_priority, - rng.clone(), - ); - let mut lk = dispatching_connections.lock().unwrap(); - lk.insert( - scid.clone(), - DispatchingData { - sender: client_message_sx, - filters, - messages_in_queue, - }, - ); - clients_lk.insert(scid, current_client_id); - client_messsage_channel_by_id.insert(current_client_id, client_sender); - } else { - // get the existing client - let client_id = match clients_lk.get(&hdr.dcid) { - Some(v) => *v, - None => *clients_lk - .get(&conn_id) - .expect("The client should exist in the map"), - }; - - let recv_info = quiche::RecvInfo { - to: socket.local_addr().unwrap(), - from, - }; - match client_messsage_channel_by_id.get_mut(&client_id) { - Some(channel) => { - if channel.send((recv_info, pkt_buf.to_vec())).is_err() { - // client is closed - clients_lk.remove(&hdr.dcid); - clients_lk.remove(&conn_id); - client_messsage_channel_by_id.remove(&client_id); - } - } - None => { - log::error!("channel with client id {client_id} not found"); - } - } - }; + let (len, from) = match socket.recv_from(&mut buf) { + Ok(v) => v, + Err(e) => { + if e.kind() == std::io::ErrorKind::WouldBlock { + break; + } + bail!("recv() failed: {:?}", e); } - } + }; - while let Ok((send_info, buffer)) = write_reciver.try_recv() { - let send_result = if enable_pacing { - send_with_pacing(&socket, &buffer, &send_info, enable_gso) - } else { - socket.send_to(&buffer, send_info.to) + let pkt_buf = &mut buf[..len]; + + // Parse the QUIC packet's header. + let hdr = match quiche::Header::from_slice(pkt_buf, quiche::MAX_CONN_ID_LEN) { + Ok(v) => v, + + Err(e) => { + log::error!("Parsing packet header failed: {:?}", e); + continue; + } + }; + + let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid); + let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN]; + let conn_id: ConnectionId<'static> = conn_id.to_vec().into(); + let mut clients_lk = clients_by_id.lock().unwrap(); + if !clients_lk.contains_key(&hdr.dcid) && !clients_lk.contains_key(&conn_id) { + drop(clients_lk); + if hdr.ty != quiche::Type::Initial { + log::error!("Packet is not Initial"); + continue; + } + + if !quiche::version_is_supported(hdr.version) { + log::warn!("Doing version negotiation"); + let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out).unwrap(); + + let out = &out[..len]; + + if let Err(e) = socket.send_to(out, from) { + if e.kind() == std::io::ErrorKind::WouldBlock { + break; + } + panic!("send() failed: {:?}", e); + } + continue; + } + + let mut scid = [0; quiche::MAX_CONN_ID_LEN]; + scid.copy_from_slice(&conn_id); + + let scid = quiche::ConnectionId::from_ref(&scid); + + // Token is always present in Initial packets. + let token = hdr.token.as_ref().unwrap(); + + // Do stateless retry if the client didn't send a token. + if token.is_empty() { + log::debug!("Doing stateless retry"); + + let new_token = mint_token(&hdr, &from); + + let len = quiche::retry( + &hdr.scid, + &hdr.dcid, + &scid, + &new_token, + hdr.version, + &mut out, + ) + .unwrap(); + + if let Err(e) = socket.send_to(&out[..len], from) { + log::error!("Error sending retry messages : {e:?}"); + } + continue; + } + + let odcid = validate_token(&from, token); + + if odcid.is_none() { + log::error!("Invalid address validation token"); + continue; + } + + if scid.len() != hdr.dcid.len() { + log::error!("Invalid destination connection ID"); + continue; + } + + let scid = hdr.dcid.clone(); + + log::info!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid); + + let mut conn = quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config)?; + + let recv_info = quiche::RecvInfo { + to: socket.local_addr().unwrap(), + from, }; - match send_result { - Ok(_written) => {} + // Process potentially coalesced packets. + match conn.recv(pkt_buf, recv_info) { + Ok(v) => v, Err(e) => { - log::error!("sending failed with error : {e:?}"); + log::error!("{} recv failed: {:?}", conn.trace_id(), e); + continue; + } + }; + + let (client_message_sx, client_message_rx) = mpsc::channel(); + let current_client_id = client_id_counter; + client_id_counter += 1; + + let filters = Arc::new(RwLock::new(Vec::new())); + create_client_task( + socket.clone(), + conn, + current_client_id, + clients_by_id.clone(), + client_message_rx, + filters.clone(), + maximum_concurrent_streams_id, + stop_laggy_client, + quic_params.incremental_priority, + rng.clone(), + enable_pacing, + enable_gso, + ); + let mut lk = dispatching_connections.lock().unwrap(); + lk.insert( + scid.clone(), + DispatchingData { + sender: client_message_sx.clone(), + filters, + }, + ); + let mut clients_lk = clients_by_id.lock().unwrap(); + clients_lk.insert(scid, current_client_id); + client_messsage_channel_by_id.insert(current_client_id, client_message_sx); + } else { + // get the existing client + let client_id = match clients_lk.get(&hdr.dcid) { + Some(v) => *v, + None => *clients_lk + .get(&conn_id) + .expect("The client should exist in the map"), + }; + + let recv_info = quiche::RecvInfo { + to: socket.local_addr().unwrap(), + from, + }; + match client_messsage_channel_by_id.get_mut(&client_id) { + Some(channel) => { + if channel + .send(InternalMessage::Packet(recv_info, pkt_buf.to_vec())) + .is_err() + { + // client is closed + clients_lk.remove(&hdr.dcid); + clients_lk.remove(&conn_id); + client_messsage_channel_by_id.remove(&client_id); + } + } + None => { + log::error!("channel with client id {client_id} not found"); } } - } + }; } + Ok(()) } #[allow(clippy::too_many_arguments)] fn create_client_task( + socket: Arc, connection: quiche::Connection, client_id: u64, client_id_by_scid: Arc, u64>>>, - mut receiver: mio_channel::Receiver<(quiche::RecvInfo, Vec)>, - sender: mio_channel::Sender<(quiche::SendInfo, Vec)>, - message_channel: mpsc::Receiver<(Vec, u8)>, + receiver: mpsc::Receiver, filters: Arc>>, maximum_concurrent_streams_id: u64, stop_laggy_client: bool, - messages_in_queue: Arc, incremental_priority: bool, rng: SystemRandom, + enable_pacing: bool, + enable_gso: bool, ) { std::thread::spawn(move || { let mut partial_responses = PartialResponses::new(); @@ -324,94 +294,95 @@ fn create_client_task( let mut instance = Instant::now(); let mut closed = false; let mut out = [0; 65535]; - - let mut poll = mio::Poll::new().unwrap(); - let mut events = mio::Events::with_capacity(1024); - - poll.registry() - .register(&mut receiver, mio::Token(0), mio::Interest::READABLE) - .unwrap(); - - let number_of_loops = Arc::new(AtomicU64::new(0)); - let number_of_meesages_from_network = Arc::new(AtomicU64::new(0)); - let number_of_meesages_to_network = Arc::new(AtomicU64::new(0)); - let number_of_readable_streams = Arc::new(AtomicU64::new(0)); - let number_of_writable_streams = Arc::new(AtomicU64::new(0)); - let messages_added = Arc::new(AtomicU64::new(0)); + let mut datagram_size = MAX_DATAGRAM_SIZE; + let mut logged_is_draining = false; let quit = Arc::new(AtomicBool::new(false)); - let max_burst_size = MAX_DATAGRAM_SIZE * 10; - { - let number_of_loops = number_of_loops.clone(); - let number_of_meesages_from_network = number_of_meesages_from_network.clone(); - let number_of_meesages_to_network = number_of_meesages_to_network.clone(); - let number_of_readable_streams = number_of_readable_streams.clone(); - let number_of_writable_streams = number_of_writable_streams.clone(); - let messages_added = messages_added.clone(); - let messages_in_queue = messages_in_queue.clone(); - let quit = quit.clone(); - std::thread::spawn(move || { - while !quit.load(std::sync::atomic::Ordering::Relaxed) { - std::thread::sleep(Duration::from_secs(1)); - log::debug!("---------------------------------"); - log::debug!( - "number of loop : {}", - number_of_loops.swap(0, std::sync::atomic::Ordering::Relaxed) - ); - log::debug!( - "number of packets read : {}", - number_of_meesages_from_network - .swap(0, std::sync::atomic::Ordering::Relaxed) - ); - log::debug!( - "number of packets write : {}", - number_of_meesages_to_network.swap(0, std::sync::atomic::Ordering::Relaxed) - ); - log::debug!( - "number_of_readable_streams : {}", - number_of_readable_streams.swap(0, std::sync::atomic::Ordering::Relaxed) - ); - log::debug!( - "number_of_writable_streams : {}", - number_of_writable_streams.swap(0, std::sync::atomic::Ordering::Relaxed) - ); - log::debug!( - "messages_added : {}", - messages_added.swap(0, std::sync::atomic::Ordering::Relaxed) - ); - log::debug!( - "messages in queue to be sent : {}", - messages_in_queue.load(std::sync::atomic::Ordering::Relaxed) - ); - } - }); - } + let mut continue_write = true; loop { - number_of_loops.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - poll.poll(&mut events, Some(Duration::from_millis(1))) - .unwrap(); + let mut timeout = if continue_write { + Duration::from_secs(0) + } else { + connection.timeout().unwrap_or(Duration::from_secs(1)) + }; - if !events.is_empty() { - while let Ok((info, mut buf)) = receiver.try_recv() { - let buf = buf.as_mut_slice(); - match connection.recv(buf, info) { - Ok(_) => { - number_of_meesages_from_network - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - Err(e) => { - log::error!("{} recv failed: {:?}", connection.trace_id(), e); + let mut did_read = false; + + while let Ok(internal_message) = receiver.recv_timeout(timeout) { + did_read = true; + timeout = Duration::from_secs(0); + + match internal_message { + InternalMessage::Packet(info, mut buf) => { + // handle packet from udp socket + let buf = buf.as_mut_slice(); + match connection.recv(buf, info) { + Ok(_) => {} + Err(e) => { + log::error!("{} recv failed: {:?}", connection.trace_id(), e); + break; + } + }; + } + InternalMessage::ClientMessage(message, priority) => { + // handle message from client + let stream_id = next_stream; + next_stream = + get_next_unidi(stream_id, true, maximum_concurrent_streams_id); + + let close = if let Err(e) = + connection.stream_priority(stream_id, priority, incremental_priority) + { + if !closed { + log::error!( + "Unable to set priority for the stream {}, error {}", + stream_id, + e + ); + } + true + } else { + match send_message( + &mut connection, + &mut partial_responses, + stream_id, + message, + ) { + Ok(_) => { + // do nothing + false + } + Err(e) => { + // done writing / queue is full + log::error!("got error sending message client : {}", e); + true + } + } + }; + + if close && !closed && stop_laggy_client { + if let Err(e) = connection.close(true, 1, b"laggy client") { + if e != quiche::Error::Done { + log::error!("error closing client : {}", e); + } + } else { + log::info!("Stopping laggy client : {}", connection.trace_id(),); + } + closed = true; break; } - }; + } } - continue; } + if !did_read && !continue_write { + connection.on_timeout(); + } + continue_write = false; + if connection.is_in_early_data() || connection.is_established() { // Process all readable streams. for stream in connection.readable() { - number_of_readable_streams.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let message = recv_message(&mut connection, &mut read_streams, stream); match message { Ok(Some(message)) => match message { @@ -437,13 +408,12 @@ fn create_client_task( if !connection.is_closed() && (connection.is_established() || connection.is_in_early_data()) { - let mut is_writable = true; + datagram_size = connection.max_send_udp_payload_size(); for stream_id in connection.writable() { if let Err(e) = handle_writable(&mut connection, &mut partial_responses, stream_id) { if e == quiche::Error::Done { - is_writable = false; break; } if !closed { @@ -454,83 +424,11 @@ fn create_client_task( } } } - - if is_writable { - loop { - let close = match message_channel.try_recv() { - Ok((message, priority)) => { - messages_in_queue - .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); - let stream_id = next_stream; - next_stream = - get_next_unidi(stream_id, true, maximum_concurrent_streams_id); - - if let Err(e) = connection.stream_priority( - stream_id, - priority, - incremental_priority, - ) { - if !closed { - log::error!( - "Unable to set priority for the stream {}, error {}", - stream_id, - e - ); - } - true - } else { - messages_added - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - match send_message( - &mut connection, - &mut partial_responses, - stream_id, - message, - ) { - Ok(_) => false, - Err(quiche::Error::Done) => { - // done writing / queue is full - break; - } - Err(e) => { - log::error!("error sending message : {e:?}"); - true - } - } - } - } - Err(e) => { - match e { - mpsc::TryRecvError::Empty => { - break; - } - mpsc::TryRecvError::Disconnected => { - // too many message the connection is lagging - log::error!("channel disconnected by dispatcher"); - true - } - } - } - }; - - if close && !closed && stop_laggy_client { - if let Err(e) = connection.close(true, 1, b"laggy client") { - if e != quiche::Error::Done { - log::error!("error closing client : {}", e); - } - } else { - log::info!("Stopping laggy client : {}", connection.trace_id(),); - } - closed = true; - break; - } - } - } } if instance.elapsed() > Duration::from_secs(1) { + log::debug!("other tasks"); instance = Instant::now(); - connection.on_timeout(); handle_path_events(&mut connection); // See whether source Connection IDs have been retired. @@ -553,52 +451,79 @@ fn create_client_task( let mut send_message_to = None; let mut total_length = 0; - let mut done_writing = false; - 'writing_loop: while !done_writing { - while total_length < max_burst_size { - match connection.send(&mut out[total_length..max_burst_size]) { - Ok((len, send_info)) => { - number_of_meesages_to_network - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - send_message_to.get_or_insert(send_info); - total_length += len; - if len < MAX_DATAGRAM_SIZE { - break; - } - } - Err(quiche::Error::BufferTooShort) => { - // retry later - log::trace!("{} buffer to short", connection.trace_id()); + let max_burst_size = if enable_gso { + std::cmp::min(datagram_size * 10, out.len()) + } else { + datagram_size + }; + + while total_length < max_burst_size { + match connection.send(&mut out[total_length..max_burst_size]) { + Ok((len, send_info)) => { + send_message_to.get_or_insert(send_info); + total_length += len; + if len < datagram_size { break; } - Err(quiche::Error::Done) => { - done_writing = true; - break; - } - Err(e) => { - log::error!( - "{} send failed: {:?}, closing connection", - connection.trace_id(), - e - ); - connection.close(false, 0x1, b"fail").ok(); - break 'writing_loop; - } - }; - } + } + Err(quiche::Error::BufferTooShort) => { + // retry later + log::trace!("{} buffer to short", connection.trace_id()); + break; + } + Err(quiche::Error::Done) => { + break; + } + Err(e) => { + log::error!( + "{} send failed: {:?}, closing connection", + connection.trace_id(), + e + ); + connection.close(false, 0x1, b"fail").ok(); + } + }; + } - if total_length > 0 && send_message_to.is_some() { - sender - .send((send_message_to.unwrap(), out[..total_length].to_vec())) - .unwrap(); - total_length = 0; + if total_length > 0 && send_message_to.is_some() { + log::debug!("sending :{total_length:?}"); + continue_write = true; + let send_result = if enable_pacing { + send_with_pacing( + &socket, + &out[..total_length], + &send_message_to.unwrap(), + enable_gso, + datagram_size as u16, + ) } else { - break; + socket.send(&out[..total_length]) + }; + match send_result { + Ok(_written) => { + log::debug!("finished sending"); + } + Err(e) => { + log::error!("sending failed with error : {e:?}"); + } } } + if !logged_is_draining && connection.is_draining() { + log::warn!("connection is draining"); + logged_is_draining = true; + } + if connection.is_closed() { + if let Some(e) = connection.peer_error() { + log::error!("peer error : {e:?} "); + } + + if let Some(e) = connection.local_error() { + log::error!("local error : {e:?} "); + } + log::info!( "{} connection closed {:?}", connection.trace_id(), @@ -615,10 +540,8 @@ fn create_dispatching_thread( message_send_queue: mpsc::Receiver, dispatching_connections: DispachingConnections, compression_type: CompressionType, - max_messages_in_queue: u64, ) { std::thread::spawn(move || { - let max_messages_in_queue = max_messages_in_queue as usize; while let Ok(message) = message_send_queue.recv() { let mut dispatching_connections_lk = dispatching_connections.lock().unwrap(); @@ -670,11 +593,10 @@ fn create_dispatching_thread( bincode::serialize(&message).expect("Message should be serializable in binary"); for id in dispatching_connections.iter() { let data = dispatching_connections_lk.get(id).unwrap(); - let messages_in_queue = data - .messages_in_queue - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if data.sender.send((binary.clone(), priority)).is_err() - || messages_in_queue > max_messages_in_queue + if data + .sender + .send(InternalMessage::ClientMessage(binary.clone(), priority)) + .is_err() { // client is closed dispatching_connections_lk.remove(id); @@ -685,7 +607,7 @@ fn create_dispatching_thread( }); } -fn set_txtime_sockopt(sock: &mio::net::UdpSocket) -> std::io::Result<()> { +fn set_txtime_sockopt(sock: &UdpSocket) -> std::io::Result<()> { use nix::sys::socket::setsockopt; use nix::sys::socket::sockopt::TxTime; use std::os::unix::io::AsRawFd; @@ -695,11 +617,10 @@ fn set_txtime_sockopt(sock: &mio::net::UdpSocket) -> std::io::Result<()> { flags: 0, }; - // mio::net::UdpSocket doesn't implement AsFd (yet?). let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(sock.as_raw_fd()) }; setsockopt(&fd, TxTime, &config)?; - + setsockopt(&fd, ReusePort, &true)?; Ok(()) } @@ -714,13 +635,12 @@ fn std_time_to_u64(time: &std::time::Instant) -> u64 { sec * NANOS_PER_SEC + nsec as u64 } -const GSO_SEGMENT_SIZE: u16 = MAX_DATAGRAM_SIZE as u16; - fn send_with_pacing( - socket: &mio::net::UdpSocket, + socket: &UdpSocket, buf: &[u8], send_info: &quiche::SendInfo, enable_gso: bool, + segment_size: u16, ) -> std::io::Result { use nix::sys::socket::sendmsg; use nix::sys::socket::ControlMessage; @@ -739,7 +659,7 @@ fn send_with_pacing( let mut cmgs = vec![cmsg_txtime]; if enable_gso { - cmgs.push(ControlMessage::UdpGsoSegments(&GSO_SEGMENT_SIZE)); + cmgs.push(ControlMessage::UdpGsoSegments(&segment_size)); } match sendmsg(sockfd, &iov, &cmgs, MsgFlags::empty(), Some(&dst)) { @@ -748,7 +668,7 @@ fn send_with_pacing( } } -pub fn detect_gso(socket: &mio::net::UdpSocket, segment_size: usize) -> bool { +pub fn detect_gso(socket: &UdpSocket, segment_size: usize) -> bool { use nix::sys::socket::setsockopt; use nix::sys::socket::sockopt::UdpGsoSegment; use std::os::unix::io::AsRawFd;