diff --git a/Cargo.lock b/Cargo.lock index 91bccd4..219c224 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2165,6 +2165,18 @@ dependencies = [ "mio", ] +[[package]] +name = "nix" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +dependencies = [ + "bitflags 2.5.0", + "cfg-if", + "libc", + "memoffset", +] + [[package]] name = "nom" version = "7.1.3" @@ -2763,9 +2775,11 @@ dependencies = [ "bincode", "boring", "itertools", + "libc", "log", "mio", "mio_channel", + "nix", "quic-geyser-common", "quic-geyser-quiche-utils", "quiche", diff --git a/blocking_client/src/quiche_client_loop.rs b/blocking_client/src/quiche_client_loop.rs index cb923a2..33470c0 100644 --- a/blocking_client/src/quiche_client_loop.rs +++ b/blocking_client/src/quiche_client_loop.rs @@ -278,7 +278,7 @@ mod tests { }; use itertools::Itertools; - use quic_geyser_server::{configure_server::configure_server, quiche_server_loop::server_loop}; + use quic_geyser_server::quiche_server_loop::server_loop; use solana_sdk::{account::Account, pubkey::Pubkey}; use quic_geyser_common::{ @@ -371,9 +371,8 @@ mod tests { // server loop let (server_send_queue, rx_sent_queue) = mpsc::channel::(); let _server_loop_jh = std::thread::spawn(move || { - let config = configure_server(QuicParameters::default()).unwrap(); if let Err(e) = server_loop( - config, + QuicParameters::default(), socket_addr, rx_sent_queue, CompressionType::Lz4Fast(8), diff --git a/common/src/config.rs b/common/src/config.rs index 97878ae..07ea9df 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -5,8 +5,9 @@ use serde::{Deserialize, Serialize}; use crate::{ compression::CompressionType, defaults::{ - DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_MAX_ACK_DELAY, - DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS, + DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_ENABLE_PACING, + DEFAULT_MAX_ACK_DELAY, DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE, + DEFAULT_MAX_STREAMS, DEFAULT_USE_CC_BBR, }, }; @@ -62,6 +63,8 @@ pub struct QuicParameters { pub max_number_of_connections: u64, pub max_ack_delay: u64, pub ack_exponent: u64, + pub enable_pacing: bool, + pub use_cc_bbr: bool, } impl Default for QuicParameters { @@ -73,6 +76,8 @@ impl Default for QuicParameters { max_number_of_connections: DEFAULT_MAX_NB_CONNECTIONS, max_ack_delay: DEFAULT_MAX_ACK_DELAY, ack_exponent: DEFAULT_ACK_EXPONENT, + enable_pacing: DEFAULT_ENABLE_PACING, + use_cc_bbr: DEFAULT_USE_CC_BBR, } } } diff --git a/common/src/defaults.rs b/common/src/defaults.rs index cf2fd92..a504fb6 100644 --- a/common/src/defaults.rs +++ b/common/src/defaults.rs @@ -3,7 +3,9 @@ pub const MAX_ALLOWED_PARTIAL_RESPONSES: u64 = DEFAULT_MAX_STREAMS - 1; pub const DEFAULT_MAX_RECIEVE_WINDOW_SIZE: u64 = 256 * 1024 * 1024; // 256 MBs pub const DEFAULT_CONNECTION_TIMEOUT: u64 = 10; pub const DEFAULT_MAX_NB_CONNECTIONS: u64 = 10; -pub const DEFAULT_MAX_ACK_DELAY: u64 = 100; +pub const DEFAULT_MAX_ACK_DELAY: u64 = 200; pub const DEFAULT_ACK_EXPONENT: u64 = 3; pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"geyser"; pub const MAX_DATAGRAM_SIZE: usize = 1350; // MAX: 65527 +pub const DEFAULT_ENABLE_PACING: bool = true; +pub const DEFAULT_USE_CC_BBR: bool = false; diff --git a/proxy/src/main.rs b/proxy/src/main.rs index ba5813c..730259d 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -49,6 +49,7 @@ pub fn main() -> anyhow::Result<()> { max_number_of_connections: args.max_number_of_connections, max_ack_delay: args.max_ack_delay, ack_exponent: args.ack_exponent, + ..Default::default() }, compression_parameters: CompressionParameters { compression_type: quic_geyser_common::compression::CompressionType::Lz4Fast( diff --git a/quiche/src/quiche_utils.rs b/quiche/src/quiche_utils.rs index 1e988f4..270395c 100644 --- a/quiche/src/quiche_utils.rs +++ b/quiche/src/quiche_utils.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, net::SocketAddr}; +use std::collections::HashMap; pub fn validate_token<'a>( src: &std::net::SocketAddr, @@ -89,20 +89,3 @@ pub struct PartialResponse { } pub type PartialResponses = HashMap; - -// returns true if the socket will block the writing of socket -// return false otherwise -pub fn write_to_socket(socket: &mio::net::UdpSocket, buf: &[u8], to: SocketAddr) -> bool { - match socket.send_to(buf, to) { - Ok(_len) => false, - Err(e) => { - if e.kind() == std::io::ErrorKind::WouldBlock { - log::warn!("writing would block"); - true - } else { - log::error!("send() failed: {:?}", e); - false - } - } - } -} diff --git a/server/Cargo.toml b/server/Cargo.toml index da6ef41..a2003aa 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -22,6 +22,9 @@ boring = { workspace = true } mio = { workspace = true } mio_channel = { workspace = true } +libc = "0.2" +nix = { version = "0.27", features = ["net", "socket", "uio"] } + quic-geyser-common = { workspace = true } [dev-dependencies] diff --git a/server/src/configure_server.rs b/server/src/configure_server.rs index 958d8b7..f1a5077 100644 --- a/server/src/configure_server.rs +++ b/server/src/configure_server.rs @@ -12,6 +12,8 @@ pub fn configure_server(quic_parameter: QuicParameters) -> anyhow::Result anyhow::Result anyhow::Result { - let server_config = configure_server(config.quic_parameters)?; let socket = config.address; let compression_type = config.compression_parameters.compression_type; + let quic_parameters = config.quic_parameters; let (data_channel_sender, data_channel_tx) = mpsc::channel(); let _server_loop_jh = std::thread::spawn(move || { if let Err(e) = server_loop( - server_config, + quic_parameters, socket, data_channel_tx, compression_type, diff --git a/server/src/quiche_server_loop.rs b/server/src/quiche_server_loop.rs index b173416..3476a77 100644 --- a/server/src/quiche_server_loop.rs +++ b/server/src/quiche_server_loop.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, net::SocketAddr, sync::{ atomic::{AtomicBool, AtomicU64, AtomicUsize}, @@ -17,6 +17,7 @@ use ring::rand::SystemRandom; use quic_geyser_common::{ channel_message::ChannelMessage, compression::CompressionType, + config::QuicParameters, defaults::{MAX_ALLOWED_PARTIAL_RESPONSES, MAX_DATAGRAM_SIZE}, filters::Filter, message::Message, @@ -26,9 +27,11 @@ use quic_geyser_common::{ use quic_geyser_quiche_utils::{ quiche_reciever::{recv_message, ReadStreams}, quiche_sender::{handle_writable, send_message}, - quiche_utils::{get_next_unidi, mint_token, validate_token, write_to_socket, PartialResponses}, + quiche_utils::{get_next_unidi, mint_token, validate_token, PartialResponses}, }; +use crate::configure_server::configure_server; + struct DispatchingData { pub sender: Sender<(Vec, u8)>, pub filters: Arc>>, @@ -37,21 +40,15 @@ struct DispatchingData { type DispachingConnections = Arc, DispatchingData>>>; -const ACCEPTABLE_PACING_DELAY: Duration = Duration::from_millis(100); - -struct Packet { - pub buffer: Vec, - pub to: SocketAddr, -} - pub fn server_loop( - mut config: quiche::Config, + quic_params: QuicParameters, socket_addr: SocketAddr, message_send_queue: mpsc::Receiver, compression_type: CompressionType, stop_laggy_client: bool, ) -> anyhow::Result<()> { let maximum_concurrent_streams_id = u64::MAX; + let mut config = configure_server(quic_params)?; let mut socket = mio::net::UdpSocket::bind(socket_addr)?; let mut poll = mio::Poll::new()?; @@ -82,7 +79,11 @@ pub fn server_loop( // mio::Interest::READABLE, // )?; - let mut packets_to_send: BTreeMap = BTreeMap::new(); + let enable_pacing = if quic_params.enable_pacing { + set_txtime_sockopt(&socket).is_ok() + } else { + false + }; let dispatching_connections: DispachingConnections = Arc::new(Mutex::new(HashMap::< ConnectionId<'static>, @@ -169,8 +170,8 @@ pub fn server_loop( ) .unwrap(); - if write_to_socket(&socket, &out[..len], from) { - break; + if let Err(e) = socket.send_to(&out[..len], from) { + log::error!("Error sending retry messages : {e:?}"); } continue 'read; } @@ -253,38 +254,19 @@ pub fn server_loop( }; } - let instant = Instant::now(); - // send remaining packets - // log::debug!("packets to send : {}", packets_to_send.len()); - while let Some((to_send, packet)) = packets_to_send.first_key_value() { - if instant + ACCEPTABLE_PACING_DELAY <= *to_send { - break; - } - if write_to_socket(&socket, &packet.buffer, packet.to) { - break; - } - packets_to_send.pop_first(); - } - - let instant = Instant::now(); while let Ok((send_info, buffer)) = write_reciver.try_recv() { - if send_info.at > instant + ACCEPTABLE_PACING_DELAY { - packets_to_send.insert( - send_info.at, - Packet { - buffer, - to: send_info.to, - }, - ); - } else if write_to_socket(&socket, &buffer, send_info.to) { - packets_to_send.insert( - send_info.at, - Packet { - buffer, - to: send_info.to, - }, - ); - break; + let send_result = if enable_pacing { + send_with_pacing(&socket, &buffer, &send_info) + } else { + socket.send_to(&buffer, send_info.to) + }; + match send_result { + Ok(written) => { + log::debug!("written {written:?} to {:?}", send_info.to); + } + Err(e) => { + log::error!("sending failed with error : {e:?}"); + } } } } @@ -598,3 +580,58 @@ fn create_dispatching_thread( } }); } + +fn set_txtime_sockopt(sock: &mio::net::UdpSocket) -> std::io::Result<()> { + use nix::sys::socket::setsockopt; + use nix::sys::socket::sockopt::TxTime; + use std::os::unix::io::AsRawFd; + + let config = nix::libc::sock_txtime { + clockid: libc::CLOCK_MONOTONIC, + flags: 0, + }; + + // mio::net::UdpSocket doesn't implement AsFd (yet?). + let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(sock.as_raw_fd()) }; + + setsockopt(&fd, TxTime, &config)?; + + Ok(()) +} + +const NANOS_PER_SEC: u64 = 1_000_000_000; + +const INSTANT_ZERO: std::time::Instant = unsafe { std::mem::transmute(std::time::UNIX_EPOCH) }; + +fn std_time_to_u64(time: &std::time::Instant) -> u64 { + let raw_time = time.duration_since(INSTANT_ZERO); + let sec = raw_time.as_secs(); + let nsec = raw_time.subsec_nanos(); + sec * NANOS_PER_SEC + nsec as u64 +} + +fn send_with_pacing( + socket: &mio::net::UdpSocket, + buf: &[u8], + send_info: &quiche::SendInfo, +) -> std::io::Result { + use nix::sys::socket::sendmsg; + use nix::sys::socket::ControlMessage; + use nix::sys::socket::MsgFlags; + use nix::sys::socket::SockaddrStorage; + use std::io::IoSlice; + use std::os::unix::io::AsRawFd; + + let iov = [IoSlice::new(buf)]; + let dst = SockaddrStorage::from(send_info.to); + let sockfd = socket.as_raw_fd(); + + // Pacing option. + let send_time = std_time_to_u64(&send_info.at); + let cmsg_txtime = ControlMessage::TxTime(&send_time); + + match sendmsg(sockfd, &iov, &[cmsg_txtime], MsgFlags::empty(), Some(&dst)) { + Ok(v) => Ok(v), + Err(e) => Err(e.into()), + } +}