Update quinn versions (#29603)

* chore: bump quinn-udp from 0.1.3 to 0.3.2

Bumps [quinn-udp](https://github.com/quinn-rs/quinn) from 0.1.3 to 0.3.2.
- [Release notes](https://github.com/quinn-rs/quinn/releases)
- [Commits](https://github.com/quinn-rs/quinn/commits)

---
updated-dependencies:
- dependency-name: quinn-udp
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Try to use quinn 0.9.3 and quinn-proto 0.9.2

* Update streamer and client for quic to support qunn 0.9.3

* Update Cargo.lock

* Fixed unit test failure for quic tests

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
This commit is contained in:
Lijun Wang 2023-01-11 10:08:22 -08:00 committed by GitHub
parent fadf027851
commit 7c8b846344
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 255 additions and 183 deletions

89
Cargo.lock generated
View File

@ -1798,15 +1798,6 @@ dependencies = [
"slab",
]
[[package]]
name = "fxhash"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
dependencies = [
"byteorder",
]
[[package]]
name = "gag"
version = "1.0.0"
@ -3707,16 +3698,15 @@ checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3"
[[package]]
name = "quinn"
version = "0.8.4"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21afdc492bf2a8688cb386be6605d1163b6ace89afa5e3b529037d2b4334b860"
checksum = "445cbfe2382fa023c4f2f3c7e1c95c03dcc1df2bf23cebcb2b13e1402c4394d1"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"fxhash",
"pin-project-lite",
"quinn-proto",
"quinn-udp",
"rustc-hash",
"rustls 0.20.6",
"thiserror",
"tokio",
@ -3726,17 +3716,16 @@ dependencies = [
[[package]]
name = "quinn-proto"
version = "0.8.4"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fce546b9688f767a57530652488420d419a8b1f44a478b451c3d1ab6d992a55"
checksum = "72ef4ced82a24bb281af338b9e8f94429b6eca01b4e66d899f40031f074e74c9"
dependencies = [
"bytes",
"fxhash",
"rand 0.8.5",
"ring",
"rustc-hash",
"rustls 0.20.6",
"rustls-native-certs",
"rustls-pemfile 0.2.1",
"slab",
"thiserror",
"tinyvec",
@ -3746,16 +3735,15 @@ dependencies = [
[[package]]
name = "quinn-udp"
version = "0.1.3"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f832d8958db3e84d2ec93b5eb2272b45aa23cf7f8fe6e79f578896f4e6c231b"
checksum = "641538578b21f5e5c8ea733b736895576d0fe329bb883b937db6f4d163dbaaf4"
dependencies = [
"futures-util",
"libc",
"quinn-proto",
"socket2",
"tokio",
"tracing",
"windows-sys 0.42.0",
]
[[package]]
@ -8314,6 +8302,27 @@ dependencies = [
"windows_x86_64_msvc 0.36.1",
]
[[package]]
name = "windows-sys"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc 0.42.0",
"windows_i686_gnu 0.42.0",
"windows_i686_msvc 0.42.0",
"windows_x86_64_gnu 0.42.0",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc 0.42.0",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e"
[[package]]
name = "windows_aarch64_msvc"
version = "0.32.0"
@ -8326,6 +8335,12 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4"
[[package]]
name = "windows_i686_gnu"
version = "0.32.0"
@ -8338,6 +8353,12 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
[[package]]
name = "windows_i686_gnu"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7"
[[package]]
name = "windows_i686_msvc"
version = "0.32.0"
@ -8350,6 +8371,12 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
[[package]]
name = "windows_i686_msvc"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246"
[[package]]
name = "windows_x86_64_gnu"
version = "0.32.0"
@ -8362,6 +8389,18 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028"
[[package]]
name = "windows_x86_64_msvc"
version = "0.32.0"
@ -8374,6 +8413,12 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5"
[[package]]
name = "winreg"
version = "0.10.1"

View File

@ -18,7 +18,7 @@ futures-util = "0.3.25"
indexmap = "1.9.1"
indicatif = { version = "0.17.1" }
log = "0.4.17"
quinn = "0.8.4"
quinn = "0.9.3"
rand = "0.7.0"
rayon = "1.5.3"
solana-measure = { path = "../measure", version = "=1.15.0" }

View File

@ -1593,15 +1593,6 @@ dependencies = [
"slab",
]
[[package]]
name = "fxhash"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
dependencies = [
"byteorder 1.4.3",
]
[[package]]
name = "generic-array"
version = "0.12.4"
@ -3385,16 +3376,15 @@ dependencies = [
[[package]]
name = "quinn"
version = "0.8.4"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21afdc492bf2a8688cb386be6605d1163b6ace89afa5e3b529037d2b4334b860"
checksum = "445cbfe2382fa023c4f2f3c7e1c95c03dcc1df2bf23cebcb2b13e1402c4394d1"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"fxhash",
"pin-project-lite",
"quinn-proto",
"quinn-udp",
"rustc-hash",
"rustls 0.20.6",
"thiserror",
"tokio",
@ -3404,17 +3394,16 @@ dependencies = [
[[package]]
name = "quinn-proto"
version = "0.8.4"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fce546b9688f767a57530652488420d419a8b1f44a478b451c3d1ab6d992a55"
checksum = "72ef4ced82a24bb281af338b9e8f94429b6eca01b4e66d899f40031f074e74c9"
dependencies = [
"bytes",
"fxhash",
"rand 0.8.5",
"ring",
"rustc-hash",
"rustls 0.20.6",
"rustls-native-certs",
"rustls-pemfile 0.2.1",
"slab",
"thiserror",
"tinyvec",
@ -3424,16 +3413,15 @@ dependencies = [
[[package]]
name = "quinn-udp"
version = "0.1.3"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f832d8958db3e84d2ec93b5eb2272b45aa23cf7f8fe6e79f578896f4e6c231b"
checksum = "641538578b21f5e5c8ea733b736895576d0fe329bb883b937db6f4d163dbaaf4"
dependencies = [
"futures-util",
"libc",
"quinn-proto",
"socket2",
"tokio",
"tracing",
"windows-sys 0.42.0",
]
[[package]]
@ -7431,6 +7419,27 @@ dependencies = [
"windows_x86_64_msvc 0.36.1",
]
[[package]]
name = "windows-sys"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc 0.42.0",
"windows_i686_gnu 0.42.0",
"windows_i686_msvc 0.42.0",
"windows_x86_64_gnu 0.42.0",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc 0.42.0",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e"
[[package]]
name = "windows_aarch64_msvc"
version = "0.32.0"
@ -7443,6 +7452,12 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4"
[[package]]
name = "windows_i686_gnu"
version = "0.32.0"
@ -7455,6 +7470,12 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
[[package]]
name = "windows_i686_gnu"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7"
[[package]]
name = "windows_i686_msvc"
version = "0.32.0"
@ -7467,6 +7488,12 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
[[package]]
name = "windows_i686_msvc"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246"
[[package]]
name = "windows_x86_64_gnu"
version = "0.32.0"
@ -7479,6 +7506,18 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028"
[[package]]
name = "windows_x86_64_msvc"
version = "0.32.0"
@ -7491,6 +7530,12 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5"
[[package]]
name = "winreg"
version = "0.10.1"

View File

@ -16,9 +16,9 @@ futures = "0.3"
itertools = "0.10.5"
lazy_static = "1.4.0"
log = "0.4.17"
quinn = "0.8.4"
quinn-proto = "0.8.4"
quinn-udp = "0.1.3"
quinn = "0.9.3"
quinn-proto = "0.9.2"
quinn-udp = "0.3.2"
rustls = { version = "0.20.6", features = ["dangerous_configuration"] }
solana-measure = { path = "../measure", version = "=1.15.0" }
solana-metrics = { path = "../metrics", version = "=1.15.0" }

View File

@ -8,8 +8,8 @@ use {
itertools::Itertools,
log::*,
quinn::{
ClientConfig, ConnectError, ConnectionError, Endpoint, EndpointConfig, IdleTimeout,
NewConnection, VarInt, WriteError,
ClientConfig, ConnectError, Connection, ConnectionError, Endpoint, EndpointConfig,
IdleTimeout, TokioRuntime, TransportConfig, VarInt, WriteError,
},
solana_measure::measure::Measure,
solana_net_utils::VALIDATOR_PORT_RANGE,
@ -128,11 +128,12 @@ impl QuicLazyInitializedEndpoint {
crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
let mut config = ClientConfig::new(Arc::new(crypto));
let transport_config = Arc::get_mut(&mut config.transport)
.expect("QuicLazyInitializedEndpoint::create_endpoint Arc::get_mut");
let mut transport_config = TransportConfig::default();
let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS));
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS)));
config.transport_config(Arc::new(transport_config));
endpoint.set_default_client_config(config);
@ -185,7 +186,7 @@ impl Default for QuicLazyInitializedEndpoint {
#[derive(Clone)]
struct QuicNewConnection {
endpoint: Arc<Endpoint>,
connection: Arc<NewConnection>,
connection: Arc<Connection>,
}
impl QuicNewConnection {
@ -226,9 +227,8 @@ impl QuicNewConnection {
}
fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint {
quinn::Endpoint::new(config, None, client_socket)
quinn::Endpoint::new(config, None, client_socket, TokioRuntime)
.expect("QuicNewConnection::create_endpoint quinn::Endpoint::new")
.0
}
// Attempts to make a faster connection by taking advantage of pre-existing key material.
@ -237,7 +237,7 @@ impl QuicNewConnection {
&mut self,
addr: SocketAddr,
stats: &ClientStats,
) -> Result<Arc<NewConnection>, QuicError> {
) -> Result<Arc<Connection>, QuicError> {
let connecting = self.endpoint.connect(addr, "connect")?;
stats.total_connections.fetch_add(1, Ordering::Relaxed);
let connection = match connecting.into_0rtt() {
@ -303,9 +303,9 @@ impl QuicClient {
async fn _send_buffer_using_conn(
data: &[u8],
connection: &NewConnection,
connection: &Connection,
) -> Result<(), QuicError> {
let mut send_stream = connection.connection.open_uni().await?;
let mut send_stream = connection.open_uni().await?;
send_stream.write_all(data).await?;
send_stream.finish().await?;
@ -319,7 +319,7 @@ impl QuicClient {
data: &[u8],
stats: &ClientStats,
connection_stats: Arc<ConnectionCacheStats>,
) -> Result<Arc<NewConnection>, QuicError> {
) -> Result<Arc<Connection>, QuicError> {
let mut connection_try_count = 0;
let mut last_connection_id = 0;
let mut last_error = None;
@ -331,7 +331,7 @@ impl QuicClient {
let maybe_conn = conn_guard.as_mut();
match maybe_conn {
Some(conn) => {
if conn.connection.connection.stable_id() == last_connection_id {
if conn.connection.stable_id() == last_connection_id {
// this is the problematic connection we had used before, create a new one
let conn = conn.make_connection_0rtt(self.addr, stats).await;
match conn {
@ -339,7 +339,7 @@ impl QuicClient {
info!(
"Made 0rtt connection to {} with id {} try_count {}, last_connection_id: {}, last_error: {:?}",
self.addr,
conn.connection.stable_id(),
conn.stable_id(),
connection_try_count,
last_connection_id,
last_error,
@ -373,7 +373,7 @@ impl QuicClient {
info!(
"Made connection to {} id {} try_count {}",
self.addr,
conn.connection.connection.stable_id(),
conn.connection.stable_id(),
connection_try_count
);
connection_try_count += 1;
@ -388,7 +388,7 @@ impl QuicClient {
}
};
let new_stats = connection.connection.stats();
let new_stats = connection.stats();
connection_stats
.total_client_stats
@ -416,7 +416,7 @@ impl QuicClient {
.tx_acks
.update_stat(&self.stats.tx_acks, new_stats.frame_tx.acks);
last_connection_id = connection.connection.stable_id();
last_connection_id = connection.stable_id();
match Self::_send_buffer_using_conn(data, &connection).await {
Ok(()) => {
return Ok(connection);
@ -429,7 +429,7 @@ impl QuicClient {
info!(
"Error sending to {} with id {}, error {:?} thread: {:?}",
self.addr,
connection.connection.stable_id(),
connection.stable_id(),
err,
thread::current().id(),
);
@ -494,7 +494,7 @@ impl QuicClient {
// Used to avoid dereferencing the Arc multiple times below
// by just getting a reference to the NewConnection once
let connection_ref: &NewConnection = &connection;
let connection_ref: &Connection = &connection;
let chunks = buffers[1..buffers.len()].iter().chunks(self.chunk_size);

View File

@ -21,9 +21,9 @@ nix = "0.25.0"
pem = "1.1.0"
percentage = "0.1.0"
pkcs8 = { version = "0.8.0", features = ["alloc"] }
quinn = "0.8.4"
quinn-proto = "0.8.4"
quinn-udp = "0.1.3"
quinn = "0.9.3"
quinn-proto = "0.9.2"
quinn-udp = "0.3.2"
rand = "0.7.0"
rcgen = "0.10.0"

View File

@ -5,13 +5,9 @@ use {
tls_certificates::get_pubkey_from_tls_certificate,
},
crossbeam_channel::Sender,
futures_util::stream::StreamExt,
indexmap::map::{Entry, IndexMap},
percentage::Percentage,
quinn::{
Connecting, Connection, Endpoint, EndpointConfig, Incoming, IncomingUniStreams,
NewConnection, VarInt,
},
quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt},
quinn_proto::VarIntBoundsExceeded,
rand::{thread_rng, Rng},
solana_perf::packet::PacketBatch,
@ -75,13 +71,13 @@ pub fn spawn_server(
info!("Start quic server on {:?}", sock);
let (config, _cert) = configure_server(keypair, gossip_host)?;
let (endpoint, incoming) = {
Endpoint::new(EndpointConfig::default(), Some(config), sock)
let endpoint = {
Endpoint::new(EndpointConfig::default(), Some(config), sock, TokioRuntime)
.map_err(|_e| QuicServerError::EndpointFailed)?
};
let handle = tokio::spawn(run_server(
incoming,
endpoint.clone(),
packet_sender,
exit,
max_connections_per_peer,
@ -95,7 +91,7 @@ pub fn spawn_server(
}
pub async fn run_server(
mut incoming: Incoming,
incoming: Endpoint,
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
max_connections_per_peer: usize,
@ -117,7 +113,7 @@ pub async fn run_server(
const WAIT_BETWEEN_NEW_CONNECTIONS_US: u64 = 1000;
let timeout_connection = timeout(
Duration::from_millis(WAIT_FOR_CONNECTION_TIMEOUT_MS),
incoming.next(),
incoming.accept(),
)
.await;
@ -258,18 +254,12 @@ impl NewConnectionHandlerParams {
}
fn handle_and_cache_new_connection(
new_connection: NewConnection,
connection: Connection,
mut connection_table_l: MutexGuard<ConnectionTable>,
connection_table: Arc<Mutex<ConnectionTable>>,
params: &NewConnectionHandlerParams,
wait_for_chunk_timeout_ms: u64,
) -> Result<(), ConnectionHandlerError> {
let NewConnection {
connection,
uni_streams,
..
} = new_connection;
if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams(
connection_table_l.peer_type,
params.stake,
@ -303,7 +293,7 @@ fn handle_and_cache_new_connection(
if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection(
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
remote_addr.port(),
Some(connection),
Some(connection.clone()),
params.stake,
timing::timestamp(),
params.max_connections_per_peer,
@ -311,7 +301,7 @@ fn handle_and_cache_new_connection(
let peer_type = connection_table_l.peer_type;
drop(connection_table_l);
tokio::spawn(handle_connection(
uni_streams,
connection,
params.packet_sender.clone(),
remote_addr,
params.remote_pubkey,
@ -345,7 +335,7 @@ fn handle_and_cache_new_connection(
}
fn prune_unstaked_connections_and_add_new_connection(
new_connection: NewConnection,
connection: Connection,
mut connection_table_l: MutexGuard<ConnectionTable>,
connection_table: Arc<Mutex<ConnectionTable>>,
max_connections: usize,
@ -356,14 +346,14 @@ fn prune_unstaked_connections_and_add_new_connection(
if max_connections > 0 {
prune_unstaked_connection_table(&mut connection_table_l, max_connections, stats);
handle_and_cache_new_connection(
new_connection,
connection,
connection_table_l,
connection_table,
params,
wait_for_chunk_timeout_ms,
)
} else {
new_connection.connection.close(
connection.close(
CONNECTION_CLOSE_CODE_DISALLOWED.into(),
CONNECTION_CLOSE_REASON_DISALLOWED,
);
@ -437,26 +427,23 @@ async fn setup_connection(
if let Ok(new_connection) = connecting_result {
stats.total_new_connections.fetch_add(1, Ordering::Relaxed);
let params = get_connection_stake(&new_connection.connection, staked_nodes.clone())
.map_or(
NewConnectionHandlerParams::new_unstaked(
packet_sender.clone(),
max_connections_per_peer,
stats.clone(),
),
|(pubkey, stake, total_stake, max_stake, min_stake)| {
NewConnectionHandlerParams {
packet_sender,
remote_pubkey: Some(pubkey),
stake,
total_stake,
max_connections_per_peer,
stats: stats.clone(),
max_stake,
min_stake,
}
},
);
let params = get_connection_stake(&new_connection, staked_nodes.clone()).map_or(
NewConnectionHandlerParams::new_unstaked(
packet_sender.clone(),
max_connections_per_peer,
stats.clone(),
),
|(pubkey, stake, total_stake, max_stake, min_stake)| NewConnectionHandlerParams {
packet_sender,
remote_pubkey: Some(pubkey),
stake,
total_stake,
max_connections_per_peer,
stats: stats.clone(),
max_stake,
min_stake,
},
);
if params.stake > 0 {
let mut connection_table_l = staked_connection_table.lock().unwrap();
@ -529,7 +516,7 @@ async fn setup_connection(
#[allow(clippy::too_many_arguments)]
async fn handle_connection(
mut uni_streams: IncomingUniStreams,
connection: Connection,
packet_sender: Sender<PacketBatch>,
remote_addr: SocketAddr,
remote_pubkey: Option<Pubkey>,
@ -551,69 +538,63 @@ async fn handle_connection(
while !stream_exit.load(Ordering::Relaxed) {
if let Ok(stream) = tokio::time::timeout(
Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS),
uni_streams.next(),
connection.accept_uni(),
)
.await
{
match stream {
Some(stream_result) => match stream_result {
Ok(mut stream) => {
stats.total_streams.fetch_add(1, Ordering::Relaxed);
stats.total_new_streams.fetch_add(1, Ordering::Relaxed);
let stream_exit = stream_exit.clone();
let stats = stats.clone();
let packet_sender = packet_sender.clone();
let last_update = last_update.clone();
tokio::spawn(async move {
let mut maybe_batch = None;
// The min is to guard against a value too small which can wake up unnecessarily
// frequently and wasting CPU cycles. The max guard against waiting for too long
// which delay exit and cause some test failures when the timeout value is large.
// Within this value, the heuristic is to wake up 10 times to check for exit
// for the set timeout if there are no data.
let exit_check_interval =
(wait_for_chunk_timeout_ms / 10).clamp(10, 1000);
let mut start = Instant::now();
while !stream_exit.load(Ordering::Relaxed) {
if let Ok(chunk) = tokio::time::timeout(
Duration::from_millis(exit_check_interval),
stream.read_chunk(PACKET_DATA_SIZE, false),
)
.await
{
if handle_chunk(
&chunk,
&mut maybe_batch,
&remote_addr,
&packet_sender,
stats.clone(),
stake,
peer_type,
) {
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
}
start = Instant::now();
} else {
let elapse = Instant::now() - start;
if elapse.as_millis() as u64 > wait_for_chunk_timeout_ms {
debug!("Timeout in receiving on stream");
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
break;
}
Ok(mut stream) => {
stats.total_streams.fetch_add(1, Ordering::Relaxed);
stats.total_new_streams.fetch_add(1, Ordering::Relaxed);
let stream_exit = stream_exit.clone();
let stats = stats.clone();
let packet_sender = packet_sender.clone();
let last_update = last_update.clone();
tokio::spawn(async move {
let mut maybe_batch = None;
// The min is to guard against a value too small which can wake up unnecessarily
// frequently and wasting CPU cycles. The max guard against waiting for too long
// which delay exit and cause some test failures when the timeout value is large.
// Within this value, the heuristic is to wake up 10 times to check for exit
// for the set timeout if there are no data.
let exit_check_interval = (wait_for_chunk_timeout_ms / 10).clamp(10, 1000);
let mut start = Instant::now();
while !stream_exit.load(Ordering::Relaxed) {
if let Ok(chunk) = tokio::time::timeout(
Duration::from_millis(exit_check_interval),
stream.read_chunk(PACKET_DATA_SIZE, false),
)
.await
{
if handle_chunk(
&chunk,
&mut maybe_batch,
&remote_addr,
&packet_sender,
stats.clone(),
stake,
peer_type,
) {
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
}
start = Instant::now();
} else {
let elapse = Instant::now() - start;
if elapse.as_millis() as u64 > wait_for_chunk_timeout_ms {
debug!("Timeout in receiving on stream");
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
break;
}
}
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
});
}
Err(e) => {
debug!("stream error: {:?}", e);
break;
}
},
None => {
}
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
});
}
Err(e) => {
debug!("stream error: {:?}", e);
break;
}
}
@ -948,7 +929,7 @@ pub mod test {
tls_certificates::new_self_signed_tls_certificate_chain,
},
crossbeam_channel::{unbounded, Receiver},
quinn::{ClientConfig, IdleTimeout, VarInt},
quinn::{ClientConfig, IdleTimeout, TransportConfig, VarInt},
solana_sdk::{
quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS},
signature::Keypair,
@ -996,10 +977,11 @@ pub mod test {
let mut config = ClientConfig::new(Arc::new(crypto));
let transport_config = Arc::get_mut(&mut config.transport).unwrap();
let mut transport_config = TransportConfig::default();
let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS));
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS)));
config.transport_config(Arc::new(transport_config));
config
}
@ -1041,11 +1023,11 @@ pub mod test {
pub async fn make_client_endpoint(
addr: &SocketAddr,
client_keypair: Option<&Keypair>,
) -> NewConnection {
) -> Connection {
let client_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
let mut endpoint = quinn::Endpoint::new(EndpointConfig::default(), None, client_socket)
.unwrap()
.0;
let mut endpoint =
quinn::Endpoint::new(EndpointConfig::default(), None, client_socket, TokioRuntime)
.unwrap();
let default_keypair = Keypair::new();
endpoint.set_default_client_config(get_client_config(
client_keypair.unwrap_or(&default_keypair),
@ -1061,7 +1043,7 @@ pub mod test {
let conn1 = make_client_endpoint(&server_address, None).await;
let total = 30;
for i in 0..total {
let mut s1 = conn1.connection.open_uni().await.unwrap();
let mut s1 = conn1.open_uni().await.unwrap();
s1.write_all(&[0u8]).await.unwrap();
s1.finish().await.unwrap();
info!("done {}", i);
@ -1082,8 +1064,8 @@ pub mod test {
pub async fn check_block_multiple_connections(server_address: SocketAddr) {
let conn1 = make_client_endpoint(&server_address, None).await;
let conn2 = make_client_endpoint(&server_address, None).await;
let mut s1 = conn1.connection.open_uni().await.unwrap();
let mut s2 = conn2.connection.open_uni().await.unwrap();
let mut s1 = conn1.open_uni().await.unwrap();
let mut s2 = conn2.open_uni().await.unwrap();
s1.write_all(&[0u8]).await.unwrap();
s1.finish().await.unwrap();
// Send enough data to create more than 1 chunks.
@ -1109,8 +1091,8 @@ pub mod test {
info!("sending: {}", i);
let c1 = conn1.clone();
let c2 = conn2.clone();
let mut s1 = c1.connection.open_uni().await.unwrap();
let mut s2 = c2.connection.open_uni().await.unwrap();
let mut s1 = c1.open_uni().await.unwrap();
let mut s2 = c2.open_uni().await.unwrap();
s1.write_all(&[0u8]).await.unwrap();
s1.finish().await.unwrap();
s2.write_all(&[0u8]).await.unwrap();
@ -1148,7 +1130,7 @@ pub mod test {
// Send a full size packet with single byte writes.
let num_bytes = PACKET_DATA_SIZE;
let num_expected_packets = 1;
let mut s1 = conn1.connection.open_uni().await.unwrap();
let mut s1 = conn1.open_uni().await.unwrap();
for _ in 0..num_bytes {
s1.write_all(&[0u8]).await.unwrap();
}
@ -1178,7 +1160,7 @@ pub mod test {
let conn1 = Arc::new(make_client_endpoint(&server_address, None).await);
// Send a full size packet with single byte writes.
if let Ok(mut s1) = conn1.connection.open_uni().await {
if let Ok(mut s1) = conn1.open_uni().await {
for _ in 0..PACKET_DATA_SIZE {
// Ignoring any errors here. s1.finish() will test the error condition
s1.write_all(&[0u8]).await.unwrap_or_default();
@ -1213,7 +1195,7 @@ pub mod test {
assert_eq!(stats.total_stream_read_timeouts.load(Ordering::Relaxed), 0);
// Send one byte to start the stream
let mut s1 = conn1.connection.open_uni().await.unwrap();
let mut s1 = conn1.open_uni().await.unwrap();
s1.write_all(&[0u8]).await.unwrap_or_default();
// Wait long enough for the stream to timeout in receiving chunks