diff --git a/Cargo.lock b/Cargo.lock index 85485eb..64b3077 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -65,9 +65,9 @@ dependencies = [ [[package]] name = "agave-geyser-plugin-interface" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5189afece0a0b7b49419cf1cf6b71bdd6ed77d6f4a1b5ce0a0528c78d9ac822f" +checksum = "1dfc81497159bc6abcc6e0af5cfeec43d7ac219a946ac90d58a7b01cab2bffa2" dependencies = [ "log", "solana-sdk", @@ -2937,7 +2937,10 @@ dependencies = [ "bincode", "itertools 0.10.5", "lazy_static", + "libc", "log", + "mio 0.8.11", + "nix", "prometheus", "quic-geyser-common", "quic-geyser-server", @@ -2957,11 +2960,9 @@ dependencies = [ "boring", "itertools 0.10.5", "lazy_static", - "libc", "log", "mio 0.8.11", "mio_channel", - "nix", "prometheus", "quic-geyser-common", "quic-geyser-quiche-utils", @@ -3692,9 +3693,9 @@ dependencies = [ [[package]] name = "solana-account-decoder" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c4e77c6e0b4e1557e738239cfbc18e92412ad393707bfa3f0861a7dd39cbc43" +checksum = "15dacdeeb5368f079dabd980e674a026bb63c7a16cdeefce3f0a388db1fb8413" dependencies = [ "Inflector", "base64 0.22.1", @@ -3717,9 +3718,9 @@ dependencies = [ [[package]] name = "solana-compute-budget" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c04e01f296142c78d36be3f53da362b71a605d2f7a3c3d98f1c943403ce5581" +checksum = "52f59a50c002a9073361968bb7cc2baacd3ed41debb3af6845ad0ff4a9f66159" dependencies = [ "rustc_version", "solana-sdk", @@ -3727,9 +3728,9 @@ dependencies = [ [[package]] name = "solana-config-program" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71169087810ede13cf2bd58f59293b45efbff7ea272a8b726dfa6c0c355d63da" +checksum = "ca01faa9447969f633559eb85817e9f719341179805faf5f99271a977f04bde8" dependencies = [ "bincode", "chrono", @@ -3741,9 +3742,9 @@ dependencies = [ [[package]] name = "solana-curve25519" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c94998b7dd9eb9e356e2fbe57955dcaaeca8f22f62de50bff9c3c73bcc700f57" +checksum = "a416f5db2943034d1a0992e124f62b9d3ee701c16da999b5e3828bee2c119760" dependencies = [ "bytemuck", "bytemuck_derive", @@ -3754,9 +3755,9 @@ dependencies = [ [[package]] name = "solana-inline-spl" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "501e25b91ebb2c1ab7360111be8c0ff8c70e17609a1c2e58c9762ffd29d88f28" +checksum = "542bb8ff2e80dd7c7e7eb558a875f736afd23682b03d587ee51ebe88a656d6ee" dependencies = [ "bytemuck", "rustc_version", @@ -3765,9 +3766,9 @@ dependencies = [ [[package]] name = "solana-logger" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcc155d62e0639b392216d33ce0382e5cd53a855cfaf3a988c0a72fdfc721b2d" +checksum = "d0444c6b92204a6d3ba922ae2c02cc39833405803a50aad2f7c56f66b6bc9eeb" dependencies = [ "env_logger", "lazy_static", @@ -3776,9 +3777,9 @@ dependencies = [ [[package]] name = "solana-measure" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d8d2d32b3ea5dd96af0c5201bb2b139eee3fb8a7a42a614e0d4a09715a36527" +checksum = "6d209e45a3beb408d302e9bda3146c0ba5e60fdc4f05c0d8ee7ae4f270d9a551" dependencies = [ "log", "solana-sdk", @@ -3786,9 +3787,9 @@ dependencies = [ [[package]] name = "solana-metrics" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e174b0ee130287ede3caf28de3f6cd2f527d113ed7ccb1ccbbae716d1a3dda8" +checksum = "50c3fafc2c2844cdcba3405bc4cc6d15e88099f87e0436e2057ec581d596b538" dependencies = [ "crossbeam-channel", "gethostname", @@ -3801,9 +3802,9 @@ dependencies = [ [[package]] name = "solana-program" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "549e1d73169e3b55ca832d498de82590df9a19a486318841c7ccddf755abdc00" +checksum = "6f9daefe400235569b5ae3a59d68901ed9c8cdc774ff6fa618989ca996909464" dependencies = [ "ark-bn254", "ark-ec", @@ -3847,9 +3848,9 @@ dependencies = [ [[package]] name = "solana-program-runtime" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8650cd041818feffda703dd174db3d17b25f2fdb658017ede33e30ff37f2eff" +checksum = "51401b829c540d23b1cd8607065f9ad7cf47324eef5ae707091691ca043d9973" dependencies = [ "base64 0.22.1", "bincode", @@ -3876,9 +3877,9 @@ dependencies = [ [[package]] name = "solana-rpc-client" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "350c5b759e03eced58de2ac44c2bba5e56f6a07c1f28a50db56bfdd5a0899718" +checksum = "794eda91017f943ae3e8a4327647840b9ef5b3b2ee47747c77a19910881f6b1c" dependencies = [ "async-trait", "base64 0.22.1", @@ -3903,9 +3904,9 @@ dependencies = [ [[package]] name = "solana-rpc-client-api" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59b44d991c6c9235adc436f545dad56a90b3bfa78dc831e17823685203f5dd8d" +checksum = "07c89966d2cba3075545088653c0ff47903ce7a8c402d3058929e5008459ce49" dependencies = [ "anyhow", "base64 0.22.1", @@ -3927,9 +3928,9 @@ dependencies = [ [[package]] name = "solana-sdk" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb4f2423204dc2ce50d0d17a7594bab1f247351699d7ad6b3cf562c76b92139a" +checksum = "1812f69de2881c4ecef49d32ccabe605af7078ea2ecb285c70029caee3101916" dependencies = [ "bincode", "bitflags 2.6.0", @@ -3976,9 +3977,9 @@ dependencies = [ [[package]] name = "solana-sdk-macro" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "879caa7c8d816376bc9d11a76d7897c243d865d51cb03653e2e1dd5f5245d457" +checksum = "e1c0c428030f6a74ebe1b8c2e0307db0a8ac77a5569822f590617f0fcef43c66" dependencies = [ "bs58", "proc-macro2", @@ -3995,9 +3996,9 @@ checksum = "468aa43b7edb1f9b7b7b686d5c3aeb6630dc1708e86e31343499dd5c4d775183" [[package]] name = "solana-transaction-status" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04f45de90e145dcce7b99922e1d35de88612211b7d616e3c57baa64e7c7af9b6" +checksum = "a575b13d2169d92f4a3ad4a0145b6a2e7dea7ab657eaffcd7d2b5710394e0bc9" dependencies = [ "Inflector", "base64 0.22.1", @@ -4022,9 +4023,9 @@ dependencies = [ [[package]] name = "solana-type-overrides" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b381201d14e9677e11ae61fae614da4765e9a59f078b9c46c3e6c3dc1cd903f9" +checksum = "e94b4007992cdc31b726e4c322998baadb8af7562b775373f6bb1e4595e5971a" dependencies = [ "lazy_static", "rand 0.8.5", @@ -4032,9 +4033,9 @@ dependencies = [ [[package]] name = "solana-version" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "815b3a6d5411f06ab5b9cfc4c9a3b983537c7f0b5c3b560e71305f493ddc6999" +checksum = "516efbf38be854c39254cce3217bbe1e73640281827cf220cb88dd71f4a814c2" dependencies = [ "log", "rustc_version", @@ -4046,9 +4047,9 @@ dependencies = [ [[package]] name = "solana-vote" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac76c326bb4e8a73df482a9afa88bbaafa0a78a307835ffa1e5c2fe09bf839f5" +checksum = "c2dc59dce70c415a02f38723839e634bbbc2e51ee215bd9bfdf458fd148b2f04" dependencies = [ "itertools 0.12.1", "log", @@ -4061,9 +4062,9 @@ dependencies = [ [[package]] name = "solana-vote-program" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce6857a073f5a3563919d0301600e47d7b62259f3fae76bf08c16703e105b77c" +checksum = "7f6ca9ffc24d510cf9a6b6ea568f5d835e97a0cc164ee9f2ef2c6e4b1bb4df0f" dependencies = [ "bincode", "log", @@ -4081,9 +4082,9 @@ dependencies = [ [[package]] name = "solana-zk-token-sdk" -version = "2.0.18" +version = "2.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "818713988d1257168a84cf2c3cda06df7b092da7e84192a7fb88f9c86756366d" +checksum = "5ef00dbdad3e384055aef00226e0eed830e4a54e2c634ec3538a15cce0357405" dependencies = [ "aes-gcm-siv", "base64 0.22.1", diff --git a/Cargo.toml b/Cargo.toml index 9a2e431..636b8f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,14 +21,14 @@ license = "AGPL" edition = "2021" [workspace.dependencies] -solana-sdk = "~2.0.18" -solana-program = "~2.0.18" -solana-transaction-status = "~2.0.18" -solana-logger = "~2.0.18" -solana-rpc-client = "~2.0.18" -solana-rpc-client-api = "~2.0.18" -solana-account-decoder = "~2.0.18" -agave-geyser-plugin-interface = "=2.0.18" +solana-sdk = "~2.0.19" +solana-program = "~2.0.19" +solana-transaction-status = "~2.0.19" +solana-logger = "~2.0.19" +solana-rpc-client = "~2.0.19" +solana-rpc-client-api = "~2.0.19" +solana-account-decoder = "~2.0.19" +agave-geyser-plugin-interface = "=2.0.19" itertools = "0.10.5" serde = "1.0.201" diff --git a/blocking_client/src/client.rs b/blocking_client/src/client.rs index 78b5021..de1a94a 100644 --- a/blocking_client/src/client.rs +++ b/blocking_client/src/client.rs @@ -1,4 +1,3 @@ -use crate::configure_client::configure_client; use crate::quiche_client_loop::client_loop; use quic_geyser_common::filters::Filter; use quic_geyser_common::message::Message; @@ -18,13 +17,7 @@ impl Client { server_address: String, connection_parameters: ConnectionParameters, ) -> anyhow::Result<(Client, std::sync::mpsc::Receiver)> { - let config = configure_client( - connection_parameters.max_number_of_streams, - connection_parameters.recieve_window_size, - connection_parameters.timeout_in_seconds, - connection_parameters.max_ack_delay, - connection_parameters.ack_exponent, - )?; + log::info!("client configured : {connection_parameters:?}"); let server_address: SocketAddr = parse_host_port(&server_address)?; let socket_addr: SocketAddr = parse_host_port("[::]:0").expect("Socket address should be returned"); @@ -35,7 +28,7 @@ impl Client { let is_connected_client = is_connected.clone(); let _client_loop_jh = std::thread::spawn(move || { if let Err(e) = client_loop( - config, + connection_parameters, socket_addr, server_address, rx_sent_queue, @@ -168,7 +161,8 @@ mod tests { timeout_in_seconds: 10, max_ack_delay: 25, ack_exponent: 3, - enable_gso: false, + enable_gso: true, + enable_pacing: true, }, ) .unwrap(); diff --git a/blocking_client/src/configure_client.rs b/blocking_client/src/configure_client.rs index 37e61fd..a5d9b4a 100644 --- a/blocking_client/src/configure_client.rs +++ b/blocking_client/src/configure_client.rs @@ -1,30 +1,41 @@ -use quic_geyser_common::defaults::{ALPN_GEYSER_PROTOCOL_ID, MAX_DATAGRAM_SIZE}; +use quic_geyser_common::{ + defaults::{ALPN_GEYSER_PROTOCOL_ID, MAX_DATAGRAM_SIZE}, + types::connections_parameters::ConnectionParameters, +}; pub fn configure_client( - maximum_concurrent_streams: u64, - recieve_window_size: u64, - timeout_in_seconds: u64, - maximum_ack_delay: u64, - ack_exponent: u64, + connection_parameters: &ConnectionParameters, ) -> anyhow::Result { let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap(); config .set_application_protos(&[ALPN_GEYSER_PROTOCOL_ID]) .unwrap(); - config.set_max_idle_timeout(timeout_in_seconds * 1000); + config.set_max_idle_timeout(connection_parameters.timeout_in_seconds * 1000); config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE); config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE); - config.set_initial_max_data(recieve_window_size); - config.set_initial_max_stream_data_bidi_local(recieve_window_size); - config.set_initial_max_stream_data_bidi_remote(recieve_window_size); - config.set_initial_max_stream_data_uni(recieve_window_size); - config.set_initial_max_streams_bidi(maximum_concurrent_streams); - config.set_initial_max_streams_uni(maximum_concurrent_streams); + config.set_initial_max_data(connection_parameters.recieve_window_size); + config.set_initial_max_stream_data_bidi_local(connection_parameters.recieve_window_size); + config.set_initial_max_stream_data_bidi_remote(connection_parameters.recieve_window_size); + config.set_initial_max_stream_data_uni(connection_parameters.recieve_window_size); + config.set_initial_max_streams_bidi(connection_parameters.max_number_of_streams); + config.set_initial_max_streams_uni(connection_parameters.max_number_of_streams); + config.set_disable_active_migration(true); - config.set_cc_algorithm(quiche::CongestionControlAlgorithm::BBR2); - config.set_max_ack_delay(maximum_ack_delay); - config.set_ack_delay_exponent(ack_exponent); - config.enable_pacing(false); + config.set_cc_algorithm(quiche::CongestionControlAlgorithm::CUBIC); + config.set_max_connection_window(48 * 1024 * 1024); + config.set_max_stream_window(16 * 1024 * 1024); + + config.enable_early_data(); + config.grease(true); + config.enable_hystart(true); + config.discover_pmtu(true); + + config.set_active_connection_id_limit(16); + config.set_max_ack_delay(connection_parameters.max_ack_delay); + config.set_ack_delay_exponent(connection_parameters.ack_exponent); + config.set_initial_congestion_window_packets(1024); + + config.enable_pacing(true); Ok(config) } diff --git a/blocking_client/src/quiche_client_loop.rs b/blocking_client/src/quiche_client_loop.rs index 1a25ad7..7652736 100644 --- a/blocking_client/src/quiche_client_loop.rs +++ b/blocking_client/src/quiche_client_loop.rs @@ -5,19 +5,27 @@ use std::{ }; use log::{debug, error, info, trace}; -use quic_geyser_common::{defaults::MAX_DATAGRAM_SIZE, message::Message}; +use quic_geyser_common::{ + defaults::MAX_DATAGRAM_SIZE, message::Message, + types::connections_parameters::ConnectionParameters, +}; use quic_geyser_quiche_utils::{ quiche_reciever::{recv_message, ReadStreams}, quiche_sender::{handle_writable, send_message}, - quiche_utils::{generate_cid_and_reset_token, get_next_unidi, StreamBufferMap}, + quiche_utils::{ + detect_gso, generate_cid_and_reset_token, get_next_unidi, send_with_pacing, + set_txtime_sockopt, StreamBufferMap, + }, }; use anyhow::bail; use ring::rand::{SecureRandom, SystemRandom}; +use crate::configure_client::configure_client; + pub fn client_loop( - mut config: quiche::Config, + connection_parameters: ConnectionParameters, socket_addr: SocketAddr, server_address: SocketAddr, mut message_send_queue: mio_channel::Receiver, @@ -25,13 +33,51 @@ pub fn client_loop( is_connected: Arc, ) -> anyhow::Result<()> { let mut socket = mio::net::UdpSocket::bind(socket_addr)?; + + let mut config = configure_client(&connection_parameters)?; + + let enable_pacing = if connection_parameters.enable_pacing { + set_txtime_sockopt(&socket).is_ok() + } else { + false + }; + + let enable_gso = if connection_parameters.enable_gso { + detect_gso(&socket, MAX_DATAGRAM_SIZE) + } else { + false + }; + + let (message_binary_channel_sx, message_binary_channel_rx) = + std::sync::mpsc::channel::>(); + let _message_deserializing_task = std::thread::spawn(move || loop { + match message_binary_channel_rx.recv() { + Ok(message_binary) => match bincode::deserialize::(&message_binary) { + Ok(message) => { + if let Err(e) = message_recv_queue.send(message) { + log::error!("Error sending message on the channel : {e}"); + break; + } + } + Err(e) => { + log::error!("Error deserializing message : {e:?}"); + } + }, + Err(e) => { + log::error!("recv failed: {:?}", e); + break; + } + } + }); + let mut poll = mio::Poll::new()?; let mut events = mio::Events::with_capacity(1024); let mut buf = [0; 65535]; let mut out = [0; MAX_DATAGRAM_SIZE]; let mut read_streams = ReadStreams::new(); - const READ_BUFFER_SIZE: usize = 1024 * 1024; // 1 MB - let send_stream_id = get_next_unidi(3, false, u64::MAX); + const READ_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16 MBs + // client always sends on same stream + let send_stream_id = get_next_unidi(4, false, u64::MAX); let mut has_connected = false; // Generate a random source connection ID for the connection. let rng = SystemRandom::new(); @@ -50,7 +96,13 @@ pub fn client_loop( let scid = quiche::ConnectionId::from_ref(&scid); let local_addr = socket.local_addr()?; - let mut conn = quiche::connect(None, &scid, local_addr, server_address, &mut config)?; + let mut conn = quiche::connect( + Some("quiche_plugin_server"), + &scid, + local_addr, + server_address, + &mut config, + )?; // sending initial connection request { @@ -61,29 +113,39 @@ pub fn client_loop( } } - poll.registry() - .register( - &mut message_send_queue, - mio::Token(1), - mio::Interest::READABLE, - ) - .unwrap(); - let mut instance = Instant::now(); + let mut connection_recently_established = false; + let ping_message = Message::Ping.to_binary_stream(); + let mut loss_rate = 0.0; + let mut max_send_burst = MAX_DATAGRAM_SIZE * 10; + let mut continue_write = true; + let max_datagram_size = MAX_DATAGRAM_SIZE; loop { - poll.poll(&mut events, conn.timeout()).unwrap(); + let timeout = match continue_write { + true => Some(std::time::Duration::from_secs(0)), + false => conn.timeout(), + }; - if conn.is_established() && !conn.is_closed() { + poll.poll(&mut events, timeout).unwrap(); + + if connection_recently_established || events.iter().any(|e| e.token() == mio::Token(1)) { + if connection_recently_established { + connection_recently_established = false; + } match message_send_queue.try_recv() { Ok(message) => { let binary_message = message.to_binary_stream(); - log::debug!("send message : {message:?}"); - let _ = send_message( + log::info!("send message : {message:?}"); + if let Err(e) = send_message( &mut conn, &mut stream_sender_map, send_stream_id, binary_message, - ); + ) { + log::error!( + "Error sending filters : {e}, probably because filter is too long" + ); + } } Err(e) => { match e { @@ -154,7 +216,7 @@ pub fn client_loop( &mut conn, &mut stream_sender_map, send_stream_id, - Message::Ping.to_binary_stream(), + ping_message.clone(), ) { log::error!("Error sending ping message : {e}"); } @@ -162,8 +224,17 @@ pub fn client_loop( } if !has_connected && conn.is_established() { + log::info!("connection established"); has_connected = true; + connection_recently_established = true; is_connected.store(true, std::sync::atomic::Ordering::Relaxed); + poll.registry() + .register( + &mut message_send_queue, + mio::Token(1), + mio::Interest::READABLE, + ) + .unwrap(); } // See whether source Connection IDs have been retired. while let Some(retired_scid) = conn.retired_scid_next() { @@ -187,7 +258,7 @@ pub fn client_loop( Ok(Some(messages)) => { log::debug!("got messages: {}", messages.len()); for message in messages { - if let Err(e) = message_recv_queue.send(message) { + if let Err(e) = message_binary_channel_sx.send(message) { log::error!("Error sending message on the channel : {e}"); break; } @@ -210,35 +281,65 @@ pub fn client_loop( } } - // Generate outgoing QUIC packets and send them on the UDP socket, until - // quiche reports that there are no more packets to be sent. - loop { - let (write, send_info) = match conn.send(&mut out) { - Ok(v) => v, + // Reduce max_send_burst by 25% if loss is increasing more than 0.1%. + let calculated_loss_rate = conn.stats().lost as f64 / conn.stats().sent as f64; + if calculated_loss_rate > loss_rate + 0.001 { + max_send_burst = max_send_burst / 4 * 3; + // Minimun bound of 10xMSS. + max_send_burst = max_send_burst.max(max_datagram_size * 10); + loss_rate = calculated_loss_rate; + } + let max_send_burst = + conn.send_quantum().min(max_send_burst) / max_datagram_size * max_datagram_size; + let mut total_write = 0; + let mut dst_info = None; + + while total_write < max_send_burst { + let (write, mut send_info) = match conn.send(&mut buf[total_write..max_send_burst]) { + Ok(v) => v, Err(quiche::Error::Done) => { - debug!("done writing"); + trace!("{} done writing", conn.trace_id()); break; } - Err(e) => { - error!("send failed: {:?}", e); - + log::error!("{} send failed: {:?}", conn.trace_id(), e); conn.close(false, 0x1, b"fail").ok(); break; } }; - if let Err(e) = socket.send_to(&out[..write], send_info.to) { - if e.kind() == std::io::ErrorKind::WouldBlock { - debug!("send() would block"); - break; - } + total_write += write; - panic!("send() failed: {:?}", e); + // Use the first packet time to send, not the last. + send_info.to = server_address; + let _ = dst_info.get_or_insert(send_info); + + if write < max_datagram_size { + continue_write = true; + break; } + } - debug!("written {}", write); + if total_write == 0 || dst_info.is_none() { + continue; + } + + let send_result = if enable_pacing { + send_with_pacing( + &socket, + &buf[..total_write], + &dst_info.unwrap(), + enable_gso, + max_datagram_size as u16, + ) + } else { + socket.send(&buf[..total_write]) + }; + + if let Err(e) = send_result { + log::error!("sending failed with error : {e:?}"); + break; } if conn.is_closed() { @@ -270,11 +371,9 @@ mod tests { filters::Filter, message::Message, net::parse_host_port, - types::block_meta::SlotMeta, + types::{block_meta::SlotMeta, connections_parameters::ConnectionParameters}, }; - use crate::configure_client::configure_client; - use super::client_loop; #[test] @@ -376,8 +475,10 @@ mod tests { let (sx_recv_queue, client_rx_queue) = mpsc::channel(); let _client_loop_jh = std::thread::spawn(move || { - let client_config = - configure_client(maximum_concurrent_streams, 20_000_000, 1, 25, 3).unwrap(); + let client_config = ConnectionParameters { + max_number_of_streams: maximum_concurrent_streams, + ..Default::default() + }; let socket_addr: SocketAddr = parse_host_port("[::]:0").unwrap(); let is_connected = Arc::new(AtomicBool::new(false)); if let Err(e) = client_loop( diff --git a/client/src/non_blocking/client.rs b/client/src/non_blocking/client.rs index 90560a3..410ccef 100644 --- a/client/src/non_blocking/client.rs +++ b/client/src/non_blocking/client.rs @@ -348,6 +348,7 @@ mod tests { max_ack_delay: 25, ack_exponent: 3, enable_gso: false, + enable_pacing: false, }, ) .await diff --git a/common/src/message.rs b/common/src/message.rs index 9ab3815..8b70a7c 100644 --- a/common/src/message.rs +++ b/common/src/message.rs @@ -36,6 +36,18 @@ impl Message { Some((message, size + 8)) } + // used by the network + pub fn from_binary_stream_binary(stream: &[u8]) -> Option<(Vec, usize)> { + if stream.len() < 8 { + return None; + } + let size = u64::from_le_bytes(stream[0..8].try_into().unwrap()) as usize; + if stream.len() < size + 8 { + return None; + } + Some((stream[8..size + 8].to_vec(), size + 8)) + } + pub fn to_binary_stream(&self) -> Vec { let binary = bincode::serialize(self).unwrap(); let size = binary.len().to_le_bytes(); diff --git a/common/src/stream_manager.rs b/common/src/stream_manager.rs index 1d847e4..8b76102 100644 --- a/common/src/stream_manager.rs +++ b/common/src/stream_manager.rs @@ -1,3 +1,5 @@ +use std::io::BufRead; + pub struct StreamBuffer { buffer: Box>, } @@ -25,11 +27,11 @@ impl StreamBuffer { } pub fn consume(&mut self, nb_bytes: usize) -> bool { - if self.buffer.len() < nb_bytes { + let len = self.buffer.len(); + if len < nb_bytes { return false; } - let d = self.buffer.drain(..nb_bytes); - assert_eq!(d.len(), nb_bytes); + self.buffer.consume(nb_bytes); true } diff --git a/common/src/types/block.rs b/common/src/types/block.rs index f652137..7057f2b 100644 --- a/common/src/types/block.rs +++ b/common/src/types/block.rs @@ -5,6 +5,7 @@ use crate::compression::CompressionType; use super::{account::Account, block_meta::BlockMeta, transaction::Transaction}; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[repr(C)] pub struct Block { pub meta: BlockMeta, pub transactions: Vec, // compressed transaction::Transaction diff --git a/common/src/types/connections_parameters.rs b/common/src/types/connections_parameters.rs index f3223bf..f3e7b94 100644 --- a/common/src/types/connections_parameters.rs +++ b/common/src/types/connections_parameters.rs @@ -1,8 +1,8 @@ use serde::{Deserialize, Serialize}; use crate::defaults::{ - DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_ENABLE_GSO, DEFAULT_MAX_ACK_DELAY, - DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS, + DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_ENABLE_GSO, DEFAULT_ENABLE_PACING, + DEFAULT_MAX_ACK_DELAY, DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS, }; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] @@ -14,6 +14,7 @@ pub struct ConnectionParameters { pub max_ack_delay: u64, pub ack_exponent: u64, pub enable_gso: bool, + pub enable_pacing: bool, } impl Default for ConnectionParameters { @@ -25,6 +26,7 @@ impl Default for ConnectionParameters { max_ack_delay: DEFAULT_MAX_ACK_DELAY, ack_exponent: DEFAULT_ACK_EXPONENT, enable_gso: DEFAULT_ENABLE_GSO, + enable_pacing: DEFAULT_ENABLE_PACING, } } } diff --git a/examples/tester-client/src/main.rs b/examples/tester-client/src/main.rs index 8ad8de6..e7d58e3 100644 --- a/examples/tester-client/src/main.rs +++ b/examples/tester-client/src/main.rs @@ -130,6 +130,7 @@ fn blocking(args: Args, client_stats: ClientStats, break_thread: Arc filters.push(Filter::AccountsAll); } + sleep(Duration::from_millis(100)); println!("Subscribing"); client.subscribe(filters).unwrap(); println!("Subscribed"); diff --git a/plugin/src/quic_plugin.rs b/plugin/src/quic_plugin.rs index 6e3ab20..7646bf0 100644 --- a/plugin/src/quic_plugin.rs +++ b/plugin/src/quic_plugin.rs @@ -33,7 +33,13 @@ impl GeyserPlugin for QuicGeyserPlugin { fn on_load(&mut self, config_file: &str, _is_reload: bool) -> PluginResult<()> { log::info!("loading quic_geyser plugin"); - let config = Config::load_from_file(config_file)?; + let config = match Config::load_from_file(config_file) { + Ok(config) => config, + Err(e) => { + log::error!("Error loading config file: {}", e); + return Err(e); + } + }; let compression_type = config.quic_plugin.compression_parameters.compression_type; let enable_block_builder = config.quic_plugin.enable_block_builder; let build_blocks_with_accounts = config.quic_plugin.build_blocks_with_accounts; diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 89f8a7d..9011a85 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -25,6 +25,7 @@ pub fn main() -> anyhow::Result<()> { max_ack_delay: args.max_ack_delay, ack_exponent: args.ack_exponent, enable_gso: true, + enable_pacing: true, }, )?; diff --git a/quiche/Cargo.toml b/quiche/Cargo.toml index dae400e..82a004b 100644 --- a/quiche/Cargo.toml +++ b/quiche/Cargo.toml @@ -18,6 +18,10 @@ quiche = { workspace = true } prometheus = { workspace = true } lazy_static = { workspace = true } +mio = { workspace = true } +nix = { version = "0.27", features = ["net", "socket", "uio"] } +libc = "0.2" + [dev-dependencies] rand = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/quiche/src/quiche_reciever.rs b/quiche/src/quiche_reciever.rs index d24b6a0..b891bbf 100644 --- a/quiche/src/quiche_reciever.rs +++ b/quiche/src/quiche_reciever.rs @@ -11,7 +11,7 @@ pub fn recv_message( connection: &mut quiche::Connection, read_streams: &mut ReadStreams, stream_id: u64, -) -> anyhow::Result>> { +) -> anyhow::Result>>> { let mut buf = [0; MAX_DATAGRAM_SIZE]; if let Some(total_buf) = read_streams.get_mut(&stream_id) { loop { @@ -23,12 +23,14 @@ pub fn recv_message( Err(e) => match &e { quiche::Error::Done => { let mut messages = vec![]; - if let Some((message, size)) = - Message::from_binary_stream(total_buf.as_slices().0) + + while let Some((message, size)) = + Message::from_binary_stream_binary(total_buf.as_slices().0) { total_buf.consume(size); messages.push(message); } + return Ok(if messages.is_empty() { None } else { @@ -52,8 +54,8 @@ pub fn recv_message( Err(e) => match &e { quiche::Error::Done => { let mut messages = vec![]; - if let Some((message, size)) = - Message::from_binary_stream(total_buf.as_slices().0) + while let Some((message, size)) = + Message::from_binary_stream_binary(total_buf.as_slices().0) { total_buf.consume(size); messages.push(message); diff --git a/quiche/src/quiche_utils.rs b/quiche/src/quiche_utils.rs index 1be3c47..90eae06 100644 --- a/quiche/src/quiche_utils.rs +++ b/quiche/src/quiche_utils.rs @@ -162,6 +162,97 @@ pub fn generate_cid_and_reset_token( (scid, reset_token) } +pub fn detect_gso(socket: &mio::net::UdpSocket, segment_size: usize) -> bool { + use nix::sys::socket::setsockopt; + use nix::sys::socket::sockopt::UdpGsoSegment; + use std::os::unix::io::AsRawFd; + + // mio::net::UdpSocket doesn't implement AsFd (yet?). + let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(socket.as_raw_fd()) }; + + setsockopt(&fd, UdpGsoSegment, &(segment_size as i32)).is_ok() +} + +/// Set SO_TXTIME socket option. +/// +/// This socket option is set to send to kernel the outgoing UDP +/// packet transmission time in the sendmsg syscall. +/// +/// Note that this socket option is set only on linux platforms. +#[cfg(target_os = "linux")] +pub fn set_txtime_sockopt(sock: &mio::net::UdpSocket) -> std::io::Result<()> { + use nix::sys::socket::setsockopt; + use nix::sys::socket::sockopt::TxTime; + use std::os::unix::io::AsRawFd; + + let config = nix::libc::sock_txtime { + clockid: libc::CLOCK_MONOTONIC, + flags: 0, + }; + + // mio::net::UdpSocket doesn't implement AsFd (yet?). + let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(sock.as_raw_fd()) }; + + setsockopt(&fd, TxTime, &config)?; + + Ok(()) +} + +#[cfg(not(target_os = "linux"))] +pub fn set_txtime_sockopt(_: &mio::net::UdpSocket) -> io::Result<()> { + use std::io::Error; + use std::io::ErrorKind; + + Err(Error::new( + ErrorKind::Other, + "Not supported on this platform", + )) +} + +const NANOS_PER_SEC: u64 = 1_000_000_000; + +const INSTANT_ZERO: std::time::Instant = unsafe { std::mem::transmute(std::time::UNIX_EPOCH) }; + +pub fn std_time_to_u64(time: &std::time::Instant) -> u64 { + let raw_time = time.duration_since(INSTANT_ZERO); + let sec = raw_time.as_secs(); + let nsec = raw_time.subsec_nanos(); + sec * NANOS_PER_SEC + nsec as u64 +} + +pub fn send_with_pacing( + socket: &mio::net::UdpSocket, + buf: &[u8], + send_info: &quiche::SendInfo, + enable_gso: bool, + segment_size: u16, +) -> std::io::Result { + use nix::sys::socket::sendmsg; + use nix::sys::socket::ControlMessage; + use nix::sys::socket::MsgFlags; + use nix::sys::socket::SockaddrStorage; + use std::io::IoSlice; + use std::os::unix::io::AsRawFd; + + let iov = [IoSlice::new(buf)]; + let dst = SockaddrStorage::from(send_info.to); + let sockfd = socket.as_raw_fd(); + + // Pacing option. + let send_time = std_time_to_u64(&send_info.at); + let cmsg_txtime = ControlMessage::TxTime(&send_time); + + let mut cmgs = vec![cmsg_txtime]; + if enable_gso { + cmgs.push(ControlMessage::UdpGsoSegments(&segment_size)); + } + + match sendmsg(sockfd, &iov, &cmgs, MsgFlags::empty(), Some(&dst)) { + Ok(v) => Ok(v), + Err(e) => Err(e.into()), + } +} + // 16 MB per buffer pub const SEND_BUFFER_LEN: usize = 32 * 1024 * 1024; pub type StreamBufferMap = BTreeMap>; diff --git a/server/Cargo.toml b/server/Cargo.toml index ce3b669..dbc3343 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -21,10 +21,8 @@ boring = { workspace = true } mio = { workspace = true } mio_channel = { workspace = true } -libc = "0.2" -nix = { version = "0.27", features = ["net", "socket", "uio"] } - quic-geyser-common = { workspace = true } + prometheus = { workspace = true } lazy_static = { workspace = true } diff --git a/server/src/quiche_server_loop.rs b/server/src/quiche_server_loop.rs index fd4eb33..ae879f8 100644 --- a/server/src/quiche_server_loop.rs +++ b/server/src/quiche_server_loop.rs @@ -46,15 +46,19 @@ use quic_geyser_quiche_utils::quiche_reciever::recv_message; use quic_geyser_quiche_utils::quiche_reciever::ReadStreams; use quic_geyser_quiche_utils::quiche_sender::handle_writable; use quic_geyser_quiche_utils::quiche_sender::send_message; +use quic_geyser_quiche_utils::quiche_utils::detect_gso; use quic_geyser_quiche_utils::quiche_utils::generate_cid_and_reset_token; use quic_geyser_quiche_utils::quiche_utils::get_next_unidi; +use quic_geyser_quiche_utils::quiche_utils::handle_path_events; +use quic_geyser_quiche_utils::quiche_utils::mint_token; +use quic_geyser_quiche_utils::quiche_utils::send_with_pacing; +use quic_geyser_quiche_utils::quiche_utils::set_txtime_sockopt; +use quic_geyser_quiche_utils::quiche_utils::validate_token; use quic_geyser_quiche_utils::quiche_utils::StreamBufferMap; use quic_geyser_quiche_utils::quiche_utils::SEND_BUFFER_LEN; use quiche::ConnectionId; use ring::rand::*; use std::collections::HashMap; -use std::io; -use std::net; use std::net::SocketAddr; lazy_static::lazy_static! { @@ -556,6 +560,8 @@ pub fn server_loop( match message { Ok(Some(messages)) => { for message in messages { + let message = bincode::deserialize::(&message) + .expect("Should be a message"); match message { Message::Filters(mut f) => { client.filters.append(&mut f); @@ -604,7 +610,7 @@ pub fn server_loop( } } - handle_path_events(client); + handle_path_events(&mut client.conn); // See whether source Connection IDs have been retired. while let Some(retired_scid) = client.conn.retired_scid_next() { @@ -770,215 +776,3 @@ pub fn server_loop( }); } } - -/// Generate a stateless retry token. -/// -/// The token includes the static string `"quiche"` followed by the IP address -/// of the client and by the original destination connection ID generated by the -/// client. -/// -/// Note that this function is only an example and doesn't do any cryptographic -/// authenticate of the token. *It should not be used in production system*. -fn mint_token(hdr: &quiche::Header, src: &net::SocketAddr) -> Vec { - let mut token = Vec::new(); - - token.extend_from_slice(b"quiche"); - - let addr = match src.ip() { - std::net::IpAddr::V4(a) => a.octets().to_vec(), - std::net::IpAddr::V6(a) => a.octets().to_vec(), - }; - - token.extend_from_slice(&addr); - token.extend_from_slice(&hdr.dcid); - - token -} - -/// Validates a stateless retry token. -/// -/// This checks that the ticket includes the `"quiche"` static string, and that -/// the client IP address matches the address stored in the ticket. -/// -/// Note that this function is only an example and doesn't do any cryptographic -/// authenticate of the token. *It should not be used in production system*. -fn validate_token<'a>(src: &net::SocketAddr, token: &'a [u8]) -> Option> { - if token.len() < 6 { - return None; - } - - if &token[..6] != b"quiche" { - return None; - } - - let token = &token[6..]; - - let addr = match src.ip() { - std::net::IpAddr::V4(a) => a.octets().to_vec(), - std::net::IpAddr::V6(a) => a.octets().to_vec(), - }; - - if token.len() < addr.len() || &token[..addr.len()] != addr.as_slice() { - return None; - } - - Some(quiche::ConnectionId::from_ref(&token[addr.len()..])) -} - -fn handle_path_events(client: &mut Client) { - while let Some(qe) = client.conn.path_event_next() { - match qe { - quiche::PathEvent::New(local_addr, peer_addr) => { - log::info!( - "{} Seen new path ({}, {})", - client.conn.trace_id(), - local_addr, - peer_addr - ); - - // Directly probe the new path. - client - .conn - .probe_path(local_addr, peer_addr) - .expect("cannot probe"); - } - - quiche::PathEvent::Validated(local_addr, peer_addr) => { - log::info!( - "{} Path ({}, {}) is now validated", - client.conn.trace_id(), - local_addr, - peer_addr - ); - } - - quiche::PathEvent::FailedValidation(local_addr, peer_addr) => { - log::info!( - "{} Path ({}, {}) failed validation", - client.conn.trace_id(), - local_addr, - peer_addr - ); - } - - quiche::PathEvent::Closed(local_addr, peer_addr) => { - log::info!( - "{} Path ({}, {}) is now closed and unusable", - client.conn.trace_id(), - local_addr, - peer_addr - ); - } - - quiche::PathEvent::ReusedSourceConnectionId(cid_seq, old, new) => { - log::info!( - "{} Peer reused cid seq {} (initially {:?}) on {:?}", - client.conn.trace_id(), - cid_seq, - old, - new - ); - } - - quiche::PathEvent::PeerMigrated(local_addr, peer_addr) => { - log::info!( - "{} Connection migrated to ({}, {})", - client.conn.trace_id(), - local_addr, - peer_addr - ); - } - } - } -} - -pub fn detect_gso(socket: &mio::net::UdpSocket, segment_size: usize) -> bool { - use nix::sys::socket::setsockopt; - use nix::sys::socket::sockopt::UdpGsoSegment; - use std::os::unix::io::AsRawFd; - - // mio::net::UdpSocket doesn't implement AsFd (yet?). - let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(socket.as_raw_fd()) }; - - setsockopt(&fd, UdpGsoSegment, &(segment_size as i32)).is_ok() -} - -/// Set SO_TXTIME socket option. -/// -/// This socket option is set to send to kernel the outgoing UDP -/// packet transmission time in the sendmsg syscall. -/// -/// Note that this socket option is set only on linux platforms. -#[cfg(target_os = "linux")] -fn set_txtime_sockopt(sock: &mio::net::UdpSocket) -> io::Result<()> { - use nix::sys::socket::setsockopt; - use nix::sys::socket::sockopt::TxTime; - use std::os::unix::io::AsRawFd; - - let config = nix::libc::sock_txtime { - clockid: libc::CLOCK_MONOTONIC, - flags: 0, - }; - - // mio::net::UdpSocket doesn't implement AsFd (yet?). - let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(sock.as_raw_fd()) }; - - setsockopt(&fd, TxTime, &config)?; - - Ok(()) -} - -#[cfg(not(target_os = "linux"))] -fn set_txtime_sockopt(_: &mio::net::UdpSocket) -> io::Result<()> { - use std::io::Error; - use std::io::ErrorKind; - - Err(Error::new( - ErrorKind::Other, - "Not supported on this platform", - )) -} - -const NANOS_PER_SEC: u64 = 1_000_000_000; - -const INSTANT_ZERO: std::time::Instant = unsafe { std::mem::transmute(std::time::UNIX_EPOCH) }; - -fn std_time_to_u64(time: &std::time::Instant) -> u64 { - let raw_time = time.duration_since(INSTANT_ZERO); - let sec = raw_time.as_secs(); - let nsec = raw_time.subsec_nanos(); - sec * NANOS_PER_SEC + nsec as u64 -} - -fn send_with_pacing( - socket: &mio::net::UdpSocket, - buf: &[u8], - send_info: &quiche::SendInfo, - enable_gso: bool, - segment_size: u16, -) -> std::io::Result { - use nix::sys::socket::sendmsg; - use nix::sys::socket::ControlMessage; - use nix::sys::socket::MsgFlags; - use nix::sys::socket::SockaddrStorage; - use std::io::IoSlice; - use std::os::unix::io::AsRawFd; - - let iov = [IoSlice::new(buf)]; - let dst = SockaddrStorage::from(send_info.to); - let sockfd = socket.as_raw_fd(); - - // Pacing option. - let send_time = std_time_to_u64(&send_info.at); - let cmsg_txtime = ControlMessage::TxTime(&send_time); - - let mut cmgs = vec![cmsg_txtime]; - if enable_gso { - cmgs.push(ControlMessage::UdpGsoSegments(&segment_size)); - } - - match sendmsg(sockfd, &iov, &cmgs, MsgFlags::empty(), Some(&dst)) { - Ok(v) => Ok(v), - Err(e) => Err(e.into()), - } -}