diff --git a/quic-forward-proxy/src/proxy.rs b/quic-forward-proxy/src/proxy.rs index 41a88d95..e912c065 100644 --- a/quic-forward-proxy/src/proxy.rs +++ b/quic-forward-proxy/src/proxy.rs @@ -23,10 +23,14 @@ use solana_streamer::tls_certificates::new_self_signed_tls_certificate; use tokio::sync::RwLock; use crate::proxy_request_format::TpuForwardingRequest; use crate::quic_connection_utils::QuicConnectionUtils; -use crate::tpu_quic_client::{TpuQuicClient}; +use crate::tpu_quic_client::{SingleTPUConnectionManager, TpuQuicClient}; use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProvider}; use crate::util::AnyhowJoinHandle; +// TODO tweak this value - solana server sets 256 +// setting this to "1" did not make a difference! +const MAX_CONCURRENT_UNI_STREAMS: u32 = 24; + pub struct QuicForwardProxy { endpoint: Endpoint, validator_identity: Arc, @@ -39,12 +43,19 @@ impl QuicForwardProxy { tls_config: &SelfSignedTlsConfigProvider, validator_identity: Arc) -> anyhow::Result { let server_tls_config = tls_config.get_server_tls_crypto_config(); - let mut quinn_server_config = ServerConfig::with_crypto(Arc::new(server_tls_config)); - let mut transport_config = TransportConfig::default(); - transport_config.max_idle_timeout(None); + + // note: this config must be aligned with lite-rpc's client config + let transport_config = Arc::get_mut(&mut quinn_server_config.transport).unwrap(); + // TODO experiment with this value + transport_config.max_concurrent_uni_streams(VarInt::from_u32(MAX_CONCURRENT_UNI_STREAMS)); + // no bidi streams used + transport_config.max_concurrent_bidi_streams(VarInt::from_u32(0)); + let timeout = Duration::from_secs(10).try_into().unwrap(); + transport_config.max_idle_timeout(Some(timeout)); transport_config.keep_alive_interval(Some(Duration::from_millis(500))); - quinn_server_config.transport_config(Arc::new(transport_config)); + transport_config.stream_receive_window((PACKET_DATA_SIZE as u32).into()); + transport_config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into()); let endpoint = Endpoint::server(quinn_server_config, proxy_listener_addr).unwrap(); info!("tpu forward proxy listening on {}", endpoint.local_addr()?); diff --git a/quic-forward-proxy/src/quic_connection_utils.rs b/quic-forward-proxy/src/quic_connection_utils.rs index 2b3cb17c..abc77ca4 100644 --- a/quic-forward-proxy/src/quic_connection_utils.rs +++ b/quic-forward-proxy/src/quic_connection_utils.rs @@ -1,5 +1,5 @@ use log::{debug, error, info, trace, warn}; -use quinn::{ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, TokioRuntime, TransportConfig, WriteError}; +use quinn::{ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, TokioRuntime, TransportConfig, VarInt, WriteError}; use solana_sdk::pubkey::Pubkey; use std::{ collections::VecDeque, @@ -13,6 +13,7 @@ use std::{ use anyhow::bail; use futures::future::join_all; use itertools::Itertools; +use solana_sdk::quic::QUIC_MAX_TIMEOUT_MS; use tokio::{sync::RwLock, time::timeout}; use tokio::time::error::Elapsed; use tracing::instrument; @@ -22,7 +23,8 @@ const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; pub struct QuicConnectionUtils {} impl QuicConnectionUtils { - pub fn create_endpoint(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Endpoint { + // TODO move to a more specific place + pub fn create_tpu_client_endpoint(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Endpoint { let mut endpoint = { let client_socket = solana_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::UNSPECIFIED), (8000, 10000)) @@ -40,18 +42,19 @@ impl QuicConnectionUtils { .expect("Failed to set QUIC client certificates"); crypto.enable_early_data = true; - // FIXME TEMP HACK TO ALLOW PROXY PROTOCOL - const ALPN_TPU_FORWARDPROXY_PROTOCOL_ID: &[u8] = b"solana-tpu-forward-proxy"; - crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec(), ALPN_TPU_FORWARDPROXY_PROTOCOL_ID.to_vec()]; + crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()]; let mut config = ClientConfig::new(Arc::new(crypto)); - let mut transport_config = TransportConfig::default(); - // TODO check timing - let timeout = IdleTimeout::try_from(Duration::from_secs(5)).unwrap(); + // note: this should be aligned with solana quic server's endpoint config + let mut transport_config = TransportConfig::default(); + // no remotely-initiated streams required + transport_config.max_concurrent_uni_streams(VarInt::from_u32(0)); + transport_config.max_concurrent_bidi_streams(VarInt::from_u32(0)); + let timeout = IdleTimeout::try_from(Duration::from_millis(QUIC_MAX_TIMEOUT_MS as u64)).unwrap(); transport_config.max_idle_timeout(Some(timeout)); - transport_config.keep_alive_interval(Some(Duration::from_millis(500))); + transport_config.keep_alive_interval(None); config.transport_config(Arc::new(transport_config)); endpoint.set_default_client_config(config); diff --git a/services/src/tpu_utils/quic_proxy_connection_manager.rs b/services/src/tpu_utils/quic_proxy_connection_manager.rs index c24e6a3e..52dd7f48 100644 --- a/services/src/tpu_utils/quic_proxy_connection_manager.rs +++ b/services/src/tpu_utils/quic_proxy_connection_manager.rs @@ -11,7 +11,7 @@ use async_trait::async_trait; use futures::FutureExt; use itertools::Itertools; use log::{debug, error, info, warn}; -use quinn::{ClientConfig, Connection, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig}; +use quinn::{ClientConfig, Connection, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt}; use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; use solana_sdk::signer::Signer; @@ -123,13 +123,18 @@ impl QuicProxyConnectionManager { crypto.alpn_protocols = vec![ALPN_TPU_FORWARDPROXY_PROTOCOL_ID.to_vec()]; let mut config = ClientConfig::new(Arc::new(crypto)); - let mut transport_config = TransportConfig::default(); + // note: this config must be aligned with quic-proxy's server config + let mut transport_config = TransportConfig::default(); let timeout = IdleTimeout::try_from(Duration::from_secs(1)).unwrap(); + // no remotely-initiated streams required + transport_config.max_concurrent_uni_streams(VarInt::from_u32(0)); + transport_config.max_concurrent_bidi_streams(VarInt::from_u32(0)); + let timeout = Duration::from_secs(10).try_into().unwrap(); transport_config.max_idle_timeout(Some(timeout)); transport_config.keep_alive_interval(Some(Duration::from_millis(500))); - config.transport_config(Arc::new(transport_config)); + config.transport_config(Arc::new(transport_config)); endpoint.set_default_client_config(config); endpoint