From 7c8b846344e2bf776caa008d064fe70f58e9b5b6 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 11 Jan 2023 10:08:22 -0800 Subject: [PATCH] Update quinn versions (#29603) * chore: bump quinn-udp from 0.1.3 to 0.3.2 Bumps [quinn-udp](https://github.com/quinn-rs/quinn) from 0.1.3 to 0.3.2. - [Release notes](https://github.com/quinn-rs/quinn/releases) - [Commits](https://github.com/quinn-rs/quinn/commits) --- updated-dependencies: - dependency-name: quinn-udp dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] * Try to use quinn 0.9.3 and quinn-proto 0.9.2 * Update streamer and client for quic to support qunn 0.9.3 * Update Cargo.lock * Fixed unit test failure for quic tests Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 89 ++++++--- client/Cargo.toml | 2 +- programs/sbf/Cargo.lock | 89 ++++++--- quic-client/Cargo.toml | 6 +- quic-client/src/nonblocking/quic_client.rs | 36 ++-- streamer/Cargo.toml | 6 +- streamer/src/nonblocking/quic.rs | 210 ++++++++++----------- 7 files changed, 255 insertions(+), 183 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2fd11f40d..5d2a408fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1798,15 +1798,6 @@ dependencies = [ "slab", ] -[[package]] -name = "fxhash" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" -dependencies = [ - "byteorder", -] - [[package]] name = "gag" version = "1.0.0" @@ -3707,16 +3698,15 @@ checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" [[package]] name = "quinn" -version = "0.8.4" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21afdc492bf2a8688cb386be6605d1163b6ace89afa5e3b529037d2b4334b860" +checksum = "445cbfe2382fa023c4f2f3c7e1c95c03dcc1df2bf23cebcb2b13e1402c4394d1" dependencies = [ "bytes", - "futures-channel", - "futures-util", - "fxhash", + "pin-project-lite", "quinn-proto", "quinn-udp", + "rustc-hash", "rustls 0.20.6", "thiserror", "tokio", @@ -3726,17 +3716,16 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.8.4" +version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fce546b9688f767a57530652488420d419a8b1f44a478b451c3d1ab6d992a55" +checksum = "72ef4ced82a24bb281af338b9e8f94429b6eca01b4e66d899f40031f074e74c9" dependencies = [ "bytes", - "fxhash", "rand 0.8.5", "ring", + "rustc-hash", "rustls 0.20.6", "rustls-native-certs", - "rustls-pemfile 0.2.1", "slab", "thiserror", "tinyvec", @@ -3746,16 +3735,15 @@ dependencies = [ [[package]] name = "quinn-udp" -version = "0.1.3" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f832d8958db3e84d2ec93b5eb2272b45aa23cf7f8fe6e79f578896f4e6c231b" +checksum = "641538578b21f5e5c8ea733b736895576d0fe329bb883b937db6f4d163dbaaf4" dependencies = [ - "futures-util", "libc", "quinn-proto", "socket2", - "tokio", "tracing", + "windows-sys 0.42.0", ] [[package]] @@ -8314,6 +8302,27 @@ dependencies = [ "windows_x86_64_msvc 0.36.1", ] +[[package]] +name = "windows-sys" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc 0.42.0", + "windows_i686_gnu 0.42.0", + "windows_i686_msvc 0.42.0", + "windows_x86_64_gnu 0.42.0", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc 0.42.0", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" + [[package]] name = "windows_aarch64_msvc" version = "0.32.0" @@ -8326,6 +8335,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" + [[package]] name = "windows_i686_gnu" version = "0.32.0" @@ -8338,6 +8353,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" +[[package]] +name = "windows_i686_gnu" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" + [[package]] name = "windows_i686_msvc" version = "0.32.0" @@ -8350,6 +8371,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" +[[package]] +name = "windows_i686_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" + [[package]] name = "windows_x86_64_gnu" version = "0.32.0" @@ -8362,6 +8389,18 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" + [[package]] name = "windows_x86_64_msvc" version = "0.32.0" @@ -8374,6 +8413,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" + [[package]] name = "winreg" version = "0.10.1" diff --git a/client/Cargo.toml b/client/Cargo.toml index 49383c7b6..2738b1bf3 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -18,7 +18,7 @@ futures-util = "0.3.25" indexmap = "1.9.1" indicatif = { version = "0.17.1" } log = "0.4.17" -quinn = "0.8.4" +quinn = "0.9.3" rand = "0.7.0" rayon = "1.5.3" solana-measure = { path = "../measure", version = "=1.15.0" } diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index fadc25631..a5af4e9d3 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -1593,15 +1593,6 @@ dependencies = [ "slab", ] -[[package]] -name = "fxhash" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" -dependencies = [ - "byteorder 1.4.3", -] - [[package]] name = "generic-array" version = "0.12.4" @@ -3385,16 +3376,15 @@ dependencies = [ [[package]] name = "quinn" -version = "0.8.4" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21afdc492bf2a8688cb386be6605d1163b6ace89afa5e3b529037d2b4334b860" +checksum = "445cbfe2382fa023c4f2f3c7e1c95c03dcc1df2bf23cebcb2b13e1402c4394d1" dependencies = [ "bytes", - "futures-channel", - "futures-util", - "fxhash", + "pin-project-lite", "quinn-proto", "quinn-udp", + "rustc-hash", "rustls 0.20.6", "thiserror", "tokio", @@ -3404,17 +3394,16 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.8.4" +version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fce546b9688f767a57530652488420d419a8b1f44a478b451c3d1ab6d992a55" +checksum = "72ef4ced82a24bb281af338b9e8f94429b6eca01b4e66d899f40031f074e74c9" dependencies = [ "bytes", - "fxhash", "rand 0.8.5", "ring", + "rustc-hash", "rustls 0.20.6", "rustls-native-certs", - "rustls-pemfile 0.2.1", "slab", "thiserror", "tinyvec", @@ -3424,16 +3413,15 @@ dependencies = [ [[package]] name = "quinn-udp" -version = "0.1.3" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f832d8958db3e84d2ec93b5eb2272b45aa23cf7f8fe6e79f578896f4e6c231b" +checksum = "641538578b21f5e5c8ea733b736895576d0fe329bb883b937db6f4d163dbaaf4" dependencies = [ - "futures-util", "libc", "quinn-proto", "socket2", - "tokio", "tracing", + "windows-sys 0.42.0", ] [[package]] @@ -7431,6 +7419,27 @@ dependencies = [ "windows_x86_64_msvc 0.36.1", ] +[[package]] +name = "windows-sys" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc 0.42.0", + "windows_i686_gnu 0.42.0", + "windows_i686_msvc 0.42.0", + "windows_x86_64_gnu 0.42.0", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc 0.42.0", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" + [[package]] name = "windows_aarch64_msvc" version = "0.32.0" @@ -7443,6 +7452,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" + [[package]] name = "windows_i686_gnu" version = "0.32.0" @@ -7455,6 +7470,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" +[[package]] +name = "windows_i686_gnu" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" + [[package]] name = "windows_i686_msvc" version = "0.32.0" @@ -7467,6 +7488,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" +[[package]] +name = "windows_i686_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" + [[package]] name = "windows_x86_64_gnu" version = "0.32.0" @@ -7479,6 +7506,18 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" + [[package]] name = "windows_x86_64_msvc" version = "0.32.0" @@ -7491,6 +7530,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" + [[package]] name = "winreg" version = "0.10.1" diff --git a/quic-client/Cargo.toml b/quic-client/Cargo.toml index 95cb1e196..1eb68f037 100644 --- a/quic-client/Cargo.toml +++ b/quic-client/Cargo.toml @@ -16,9 +16,9 @@ futures = "0.3" itertools = "0.10.5" lazy_static = "1.4.0" log = "0.4.17" -quinn = "0.8.4" -quinn-proto = "0.8.4" -quinn-udp = "0.1.3" +quinn = "0.9.3" +quinn-proto = "0.9.2" +quinn-udp = "0.3.2" rustls = { version = "0.20.6", features = ["dangerous_configuration"] } solana-measure = { path = "../measure", version = "=1.15.0" } solana-metrics = { path = "../metrics", version = "=1.15.0" } diff --git a/quic-client/src/nonblocking/quic_client.rs b/quic-client/src/nonblocking/quic_client.rs index edb3c2edc..612dc414c 100644 --- a/quic-client/src/nonblocking/quic_client.rs +++ b/quic-client/src/nonblocking/quic_client.rs @@ -8,8 +8,8 @@ use { itertools::Itertools, log::*, quinn::{ - ClientConfig, ConnectError, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, - NewConnection, VarInt, WriteError, + ClientConfig, ConnectError, Connection, ConnectionError, Endpoint, EndpointConfig, + IdleTimeout, TokioRuntime, TransportConfig, VarInt, WriteError, }, solana_measure::measure::Measure, solana_net_utils::VALIDATOR_PORT_RANGE, @@ -128,11 +128,12 @@ impl QuicLazyInitializedEndpoint { crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()]; let mut config = ClientConfig::new(Arc::new(crypto)); - let transport_config = Arc::get_mut(&mut config.transport) - .expect("QuicLazyInitializedEndpoint::create_endpoint Arc::get_mut"); + let mut transport_config = TransportConfig::default(); + let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS)); transport_config.max_idle_timeout(Some(timeout)); transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS))); + config.transport_config(Arc::new(transport_config)); endpoint.set_default_client_config(config); @@ -185,7 +186,7 @@ impl Default for QuicLazyInitializedEndpoint { #[derive(Clone)] struct QuicNewConnection { endpoint: Arc, - connection: Arc, + connection: Arc, } impl QuicNewConnection { @@ -226,9 +227,8 @@ impl QuicNewConnection { } fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint { - quinn::Endpoint::new(config, None, client_socket) + quinn::Endpoint::new(config, None, client_socket, TokioRuntime) .expect("QuicNewConnection::create_endpoint quinn::Endpoint::new") - .0 } // Attempts to make a faster connection by taking advantage of pre-existing key material. @@ -237,7 +237,7 @@ impl QuicNewConnection { &mut self, addr: SocketAddr, stats: &ClientStats, - ) -> Result, QuicError> { + ) -> Result, QuicError> { let connecting = self.endpoint.connect(addr, "connect")?; stats.total_connections.fetch_add(1, Ordering::Relaxed); let connection = match connecting.into_0rtt() { @@ -303,9 +303,9 @@ impl QuicClient { async fn _send_buffer_using_conn( data: &[u8], - connection: &NewConnection, + connection: &Connection, ) -> Result<(), QuicError> { - let mut send_stream = connection.connection.open_uni().await?; + let mut send_stream = connection.open_uni().await?; send_stream.write_all(data).await?; send_stream.finish().await?; @@ -319,7 +319,7 @@ impl QuicClient { data: &[u8], stats: &ClientStats, connection_stats: Arc, - ) -> Result, QuicError> { + ) -> Result, QuicError> { let mut connection_try_count = 0; let mut last_connection_id = 0; let mut last_error = None; @@ -331,7 +331,7 @@ impl QuicClient { let maybe_conn = conn_guard.as_mut(); match maybe_conn { Some(conn) => { - if conn.connection.connection.stable_id() == last_connection_id { + if conn.connection.stable_id() == last_connection_id { // this is the problematic connection we had used before, create a new one let conn = conn.make_connection_0rtt(self.addr, stats).await; match conn { @@ -339,7 +339,7 @@ impl QuicClient { info!( "Made 0rtt connection to {} with id {} try_count {}, last_connection_id: {}, last_error: {:?}", self.addr, - conn.connection.stable_id(), + conn.stable_id(), connection_try_count, last_connection_id, last_error, @@ -373,7 +373,7 @@ impl QuicClient { info!( "Made connection to {} id {} try_count {}", self.addr, - conn.connection.connection.stable_id(), + conn.connection.stable_id(), connection_try_count ); connection_try_count += 1; @@ -388,7 +388,7 @@ impl QuicClient { } }; - let new_stats = connection.connection.stats(); + let new_stats = connection.stats(); connection_stats .total_client_stats @@ -416,7 +416,7 @@ impl QuicClient { .tx_acks .update_stat(&self.stats.tx_acks, new_stats.frame_tx.acks); - last_connection_id = connection.connection.stable_id(); + last_connection_id = connection.stable_id(); match Self::_send_buffer_using_conn(data, &connection).await { Ok(()) => { return Ok(connection); @@ -429,7 +429,7 @@ impl QuicClient { info!( "Error sending to {} with id {}, error {:?} thread: {:?}", self.addr, - connection.connection.stable_id(), + connection.stable_id(), err, thread::current().id(), ); @@ -494,7 +494,7 @@ impl QuicClient { // Used to avoid dereferencing the Arc multiple times below // by just getting a reference to the NewConnection once - let connection_ref: &NewConnection = &connection; + let connection_ref: &Connection = &connection; let chunks = buffers[1..buffers.len()].iter().chunks(self.chunk_size); diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index df4b2c5d7..01aef33cf 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -21,9 +21,9 @@ nix = "0.25.0" pem = "1.1.0" percentage = "0.1.0" pkcs8 = { version = "0.8.0", features = ["alloc"] } -quinn = "0.8.4" -quinn-proto = "0.8.4" -quinn-udp = "0.1.3" +quinn = "0.9.3" +quinn-proto = "0.9.2" +quinn-udp = "0.3.2" rand = "0.7.0" rcgen = "0.10.0" diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 8361dab3f..e7ddf44d5 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -5,13 +5,9 @@ use { tls_certificates::get_pubkey_from_tls_certificate, }, crossbeam_channel::Sender, - futures_util::stream::StreamExt, indexmap::map::{Entry, IndexMap}, percentage::Percentage, - quinn::{ - Connecting, Connection, Endpoint, EndpointConfig, Incoming, IncomingUniStreams, - NewConnection, VarInt, - }, + quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, quinn_proto::VarIntBoundsExceeded, rand::{thread_rng, Rng}, solana_perf::packet::PacketBatch, @@ -75,13 +71,13 @@ pub fn spawn_server( info!("Start quic server on {:?}", sock); let (config, _cert) = configure_server(keypair, gossip_host)?; - let (endpoint, incoming) = { - Endpoint::new(EndpointConfig::default(), Some(config), sock) + let endpoint = { + Endpoint::new(EndpointConfig::default(), Some(config), sock, TokioRuntime) .map_err(|_e| QuicServerError::EndpointFailed)? }; let handle = tokio::spawn(run_server( - incoming, + endpoint.clone(), packet_sender, exit, max_connections_per_peer, @@ -95,7 +91,7 @@ pub fn spawn_server( } pub async fn run_server( - mut incoming: Incoming, + incoming: Endpoint, packet_sender: Sender, exit: Arc, max_connections_per_peer: usize, @@ -117,7 +113,7 @@ pub async fn run_server( const WAIT_BETWEEN_NEW_CONNECTIONS_US: u64 = 1000; let timeout_connection = timeout( Duration::from_millis(WAIT_FOR_CONNECTION_TIMEOUT_MS), - incoming.next(), + incoming.accept(), ) .await; @@ -258,18 +254,12 @@ impl NewConnectionHandlerParams { } fn handle_and_cache_new_connection( - new_connection: NewConnection, + connection: Connection, mut connection_table_l: MutexGuard, connection_table: Arc>, params: &NewConnectionHandlerParams, wait_for_chunk_timeout_ms: u64, ) -> Result<(), ConnectionHandlerError> { - let NewConnection { - connection, - uni_streams, - .. - } = new_connection; - if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams( connection_table_l.peer_type, params.stake, @@ -303,7 +293,7 @@ fn handle_and_cache_new_connection( if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection( ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), remote_addr.port(), - Some(connection), + Some(connection.clone()), params.stake, timing::timestamp(), params.max_connections_per_peer, @@ -311,7 +301,7 @@ fn handle_and_cache_new_connection( let peer_type = connection_table_l.peer_type; drop(connection_table_l); tokio::spawn(handle_connection( - uni_streams, + connection, params.packet_sender.clone(), remote_addr, params.remote_pubkey, @@ -345,7 +335,7 @@ fn handle_and_cache_new_connection( } fn prune_unstaked_connections_and_add_new_connection( - new_connection: NewConnection, + connection: Connection, mut connection_table_l: MutexGuard, connection_table: Arc>, max_connections: usize, @@ -356,14 +346,14 @@ fn prune_unstaked_connections_and_add_new_connection( if max_connections > 0 { prune_unstaked_connection_table(&mut connection_table_l, max_connections, stats); handle_and_cache_new_connection( - new_connection, + connection, connection_table_l, connection_table, params, wait_for_chunk_timeout_ms, ) } else { - new_connection.connection.close( + connection.close( CONNECTION_CLOSE_CODE_DISALLOWED.into(), CONNECTION_CLOSE_REASON_DISALLOWED, ); @@ -437,26 +427,23 @@ async fn setup_connection( if let Ok(new_connection) = connecting_result { stats.total_new_connections.fetch_add(1, Ordering::Relaxed); - let params = get_connection_stake(&new_connection.connection, staked_nodes.clone()) - .map_or( - NewConnectionHandlerParams::new_unstaked( - packet_sender.clone(), - max_connections_per_peer, - stats.clone(), - ), - |(pubkey, stake, total_stake, max_stake, min_stake)| { - NewConnectionHandlerParams { - packet_sender, - remote_pubkey: Some(pubkey), - stake, - total_stake, - max_connections_per_peer, - stats: stats.clone(), - max_stake, - min_stake, - } - }, - ); + let params = get_connection_stake(&new_connection, staked_nodes.clone()).map_or( + NewConnectionHandlerParams::new_unstaked( + packet_sender.clone(), + max_connections_per_peer, + stats.clone(), + ), + |(pubkey, stake, total_stake, max_stake, min_stake)| NewConnectionHandlerParams { + packet_sender, + remote_pubkey: Some(pubkey), + stake, + total_stake, + max_connections_per_peer, + stats: stats.clone(), + max_stake, + min_stake, + }, + ); if params.stake > 0 { let mut connection_table_l = staked_connection_table.lock().unwrap(); @@ -529,7 +516,7 @@ async fn setup_connection( #[allow(clippy::too_many_arguments)] async fn handle_connection( - mut uni_streams: IncomingUniStreams, + connection: Connection, packet_sender: Sender, remote_addr: SocketAddr, remote_pubkey: Option, @@ -551,69 +538,63 @@ async fn handle_connection( while !stream_exit.load(Ordering::Relaxed) { if let Ok(stream) = tokio::time::timeout( Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS), - uni_streams.next(), + connection.accept_uni(), ) .await { match stream { - Some(stream_result) => match stream_result { - Ok(mut stream) => { - stats.total_streams.fetch_add(1, Ordering::Relaxed); - stats.total_new_streams.fetch_add(1, Ordering::Relaxed); - let stream_exit = stream_exit.clone(); - let stats = stats.clone(); - let packet_sender = packet_sender.clone(); - let last_update = last_update.clone(); - tokio::spawn(async move { - let mut maybe_batch = None; - // The min is to guard against a value too small which can wake up unnecessarily - // frequently and wasting CPU cycles. The max guard against waiting for too long - // which delay exit and cause some test failures when the timeout value is large. - // Within this value, the heuristic is to wake up 10 times to check for exit - // for the set timeout if there are no data. - let exit_check_interval = - (wait_for_chunk_timeout_ms / 10).clamp(10, 1000); - let mut start = Instant::now(); - while !stream_exit.load(Ordering::Relaxed) { - if let Ok(chunk) = tokio::time::timeout( - Duration::from_millis(exit_check_interval), - stream.read_chunk(PACKET_DATA_SIZE, false), - ) - .await - { - if handle_chunk( - &chunk, - &mut maybe_batch, - &remote_addr, - &packet_sender, - stats.clone(), - stake, - peer_type, - ) { - last_update.store(timing::timestamp(), Ordering::Relaxed); - break; - } - start = Instant::now(); - } else { - let elapse = Instant::now() - start; - if elapse.as_millis() as u64 > wait_for_chunk_timeout_ms { - debug!("Timeout in receiving on stream"); - stats - .total_stream_read_timeouts - .fetch_add(1, Ordering::Relaxed); - break; - } + Ok(mut stream) => { + stats.total_streams.fetch_add(1, Ordering::Relaxed); + stats.total_new_streams.fetch_add(1, Ordering::Relaxed); + let stream_exit = stream_exit.clone(); + let stats = stats.clone(); + let packet_sender = packet_sender.clone(); + let last_update = last_update.clone(); + tokio::spawn(async move { + let mut maybe_batch = None; + // The min is to guard against a value too small which can wake up unnecessarily + // frequently and wasting CPU cycles. The max guard against waiting for too long + // which delay exit and cause some test failures when the timeout value is large. + // Within this value, the heuristic is to wake up 10 times to check for exit + // for the set timeout if there are no data. + let exit_check_interval = (wait_for_chunk_timeout_ms / 10).clamp(10, 1000); + let mut start = Instant::now(); + while !stream_exit.load(Ordering::Relaxed) { + if let Ok(chunk) = tokio::time::timeout( + Duration::from_millis(exit_check_interval), + stream.read_chunk(PACKET_DATA_SIZE, false), + ) + .await + { + if handle_chunk( + &chunk, + &mut maybe_batch, + &remote_addr, + &packet_sender, + stats.clone(), + stake, + peer_type, + ) { + last_update.store(timing::timestamp(), Ordering::Relaxed); + break; + } + start = Instant::now(); + } else { + let elapse = Instant::now() - start; + if elapse.as_millis() as u64 > wait_for_chunk_timeout_ms { + debug!("Timeout in receiving on stream"); + stats + .total_stream_read_timeouts + .fetch_add(1, Ordering::Relaxed); + break; } } - stats.total_streams.fetch_sub(1, Ordering::Relaxed); - }); - } - Err(e) => { - debug!("stream error: {:?}", e); - break; - } - }, - None => { + } + stats.total_streams.fetch_sub(1, Ordering::Relaxed); + }); + } + Err(e) => { + debug!("stream error: {:?}", e); break; } } @@ -948,7 +929,7 @@ pub mod test { tls_certificates::new_self_signed_tls_certificate_chain, }, crossbeam_channel::{unbounded, Receiver}, - quinn::{ClientConfig, IdleTimeout, VarInt}, + quinn::{ClientConfig, IdleTimeout, TransportConfig, VarInt}, solana_sdk::{ quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS}, signature::Keypair, @@ -996,10 +977,11 @@ pub mod test { let mut config = ClientConfig::new(Arc::new(crypto)); - let transport_config = Arc::get_mut(&mut config.transport).unwrap(); + let mut transport_config = TransportConfig::default(); let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS)); transport_config.max_idle_timeout(Some(timeout)); transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS))); + config.transport_config(Arc::new(transport_config)); config } @@ -1041,11 +1023,11 @@ pub mod test { pub async fn make_client_endpoint( addr: &SocketAddr, client_keypair: Option<&Keypair>, - ) -> NewConnection { + ) -> Connection { let client_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); - let mut endpoint = quinn::Endpoint::new(EndpointConfig::default(), None, client_socket) - .unwrap() - .0; + let mut endpoint = + quinn::Endpoint::new(EndpointConfig::default(), None, client_socket, TokioRuntime) + .unwrap(); let default_keypair = Keypair::new(); endpoint.set_default_client_config(get_client_config( client_keypair.unwrap_or(&default_keypair), @@ -1061,7 +1043,7 @@ pub mod test { let conn1 = make_client_endpoint(&server_address, None).await; let total = 30; for i in 0..total { - let mut s1 = conn1.connection.open_uni().await.unwrap(); + let mut s1 = conn1.open_uni().await.unwrap(); s1.write_all(&[0u8]).await.unwrap(); s1.finish().await.unwrap(); info!("done {}", i); @@ -1082,8 +1064,8 @@ pub mod test { pub async fn check_block_multiple_connections(server_address: SocketAddr) { let conn1 = make_client_endpoint(&server_address, None).await; let conn2 = make_client_endpoint(&server_address, None).await; - let mut s1 = conn1.connection.open_uni().await.unwrap(); - let mut s2 = conn2.connection.open_uni().await.unwrap(); + let mut s1 = conn1.open_uni().await.unwrap(); + let mut s2 = conn2.open_uni().await.unwrap(); s1.write_all(&[0u8]).await.unwrap(); s1.finish().await.unwrap(); // Send enough data to create more than 1 chunks. @@ -1109,8 +1091,8 @@ pub mod test { info!("sending: {}", i); let c1 = conn1.clone(); let c2 = conn2.clone(); - let mut s1 = c1.connection.open_uni().await.unwrap(); - let mut s2 = c2.connection.open_uni().await.unwrap(); + let mut s1 = c1.open_uni().await.unwrap(); + let mut s2 = c2.open_uni().await.unwrap(); s1.write_all(&[0u8]).await.unwrap(); s1.finish().await.unwrap(); s2.write_all(&[0u8]).await.unwrap(); @@ -1148,7 +1130,7 @@ pub mod test { // Send a full size packet with single byte writes. let num_bytes = PACKET_DATA_SIZE; let num_expected_packets = 1; - let mut s1 = conn1.connection.open_uni().await.unwrap(); + let mut s1 = conn1.open_uni().await.unwrap(); for _ in 0..num_bytes { s1.write_all(&[0u8]).await.unwrap(); } @@ -1178,7 +1160,7 @@ pub mod test { let conn1 = Arc::new(make_client_endpoint(&server_address, None).await); // Send a full size packet with single byte writes. - if let Ok(mut s1) = conn1.connection.open_uni().await { + if let Ok(mut s1) = conn1.open_uni().await { for _ in 0..PACKET_DATA_SIZE { // Ignoring any errors here. s1.finish() will test the error condition s1.write_all(&[0u8]).await.unwrap_or_default(); @@ -1213,7 +1195,7 @@ pub mod test { assert_eq!(stats.total_stream_read_timeouts.load(Ordering::Relaxed), 0); // Send one byte to start the stream - let mut s1 = conn1.connection.open_uni().await.unwrap(); + let mut s1 = conn1.open_uni().await.unwrap(); s1.write_all(&[0u8]).await.unwrap_or_default(); // Wait long enough for the stream to timeout in receiving chunks