From f2dbdd8deaece3ee8f92be0ec3982488790c4572 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 26 Jul 2023 23:33:49 +0200 Subject: [PATCH] add cli option --- lite-rpc/src/bridge.rs | 3 +- lite-rpc/src/cli.rs | 2 + lite-rpc/src/main.rs | 16 ++++++++ quic-forward-proxy/Cargo.toml | 2 +- quic-forward-proxy/src/proxy.rs | 7 ++-- quic-forward-proxy/src/tpu_quic_client.rs | 40 ++++++++----------- .../quic_proxy_connection_manager.rs | 1 + .../src/tpu_utils/tpu_connection_manager.rs | 2 - 8 files changed, 41 insertions(+), 32 deletions(-) diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index e01b41fa..d2434eda 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -83,6 +83,7 @@ impl LiteBridge { validator_identity: Arc, retry_after: Duration, max_retries: usize, + tpu_connection_path: TpuConnectionPath, ) -> anyhow::Result { let rpc_client = Arc::new(RpcClient::new(rpc_url.clone())); let current_slot = rpc_client @@ -108,7 +109,7 @@ impl LiteBridge { write_timeout: Duration::from_secs(1), number_of_transactions_per_unistream: 8, }, - tpu_connection_path: TpuConnectionPath::QuicDirectPath, + tpu_connection_path, }; let tpu_service = TpuService::new( diff --git a/lite-rpc/src/cli.rs b/lite-rpc/src/cli.rs index fb6c7f81..5fc91bda 100644 --- a/lite-rpc/src/cli.rs +++ b/lite-rpc/src/cli.rs @@ -29,4 +29,6 @@ pub struct Args { pub maximum_retries_per_tx: usize, #[arg(long, default_value_t = DEFAULT_RETRY_TIMEOUT)] pub transaction_retry_after_secs: u64, + #[arg(long)] + pub experimental_quic_proxy_addr: Option, } diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index 2bd0e644..3f5e0b99 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -10,6 +10,8 @@ use lite_rpc::{bridge::LiteBridge, cli::Args}; use solana_sdk::signature::Keypair; use std::env; use std::sync::Arc; +use clap::builder::TypedValueParser; +use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath; use crate::rpc_tester::RpcTester; @@ -48,12 +50,15 @@ pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> { identity_keypair, maximum_retries_per_tx, transaction_retry_after_secs, + experimental_quic_proxy_addr, } = args; let validator_identity = Arc::new(get_identity_keypair(&identity_keypair).await); let retry_after = Duration::from_secs(transaction_retry_after_secs); + let tpu_connection_path = configure_tpu_connection_path(experimental_quic_proxy_addr); + LiteBridge::new( rpc_addr, ws_addr, @@ -61,6 +66,7 @@ pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> { validator_identity, retry_after, maximum_retries_per_tx, + tpu_connection_path ) .await .context("Error building LiteBridge")? @@ -73,6 +79,16 @@ pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> { .await } +fn configure_tpu_connection_path(experimental_quic_proxy_addr: Option) -> TpuConnectionPath { + match experimental_quic_proxy_addr { + None => TpuConnectionPath::QuicDirectPath, + Some(prox_address) => TpuConnectionPath::QuicForwardProxyPath { + // e.g. "127.0.0.1:11111" + forward_proxy_address: prox_address.parse().unwrap() + }, + } +} + fn get_args() -> Args { let mut args = Args::parse(); diff --git a/quic-forward-proxy/Cargo.toml b/quic-forward-proxy/Cargo.toml index 34dd1e5d..18e5a264 100644 --- a/quic-forward-proxy/Cargo.toml +++ b/quic-forward-proxy/Cargo.toml @@ -27,7 +27,7 @@ log = { workspace = true } clap = { workspace = true } dashmap = { workspace = true } itertools = { workspace = true } -tracing = "0.1.37" +tracing = { workspace = true } tracing-subscriber = { workspace = true } native-tls = { workspace = true } prometheus = { workspace = true } diff --git a/quic-forward-proxy/src/proxy.rs b/quic-forward-proxy/src/proxy.rs index 04b513dd..52cd15de 100644 --- a/quic-forward-proxy/src/proxy.rs +++ b/quic-forward-proxy/src/proxy.rs @@ -60,8 +60,7 @@ impl QuicForwardProxy { transport_config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into()); let endpoint = Endpoint::server(quinn_server_config, proxy_listener_addr).unwrap(); - info!("tpu forward proxy listening on {}", endpoint.local_addr()?); - info!("staking from validator identity {}", validator_identity.pubkey()); + info!("Quic proxy uses validator identity {}", validator_identity.pubkey()); let tpu_quic_client = TpuQuicClient::new_with_validator_identity(validator_identity.as_ref()).await; @@ -77,7 +76,7 @@ impl QuicForwardProxy { let endpoint = self.endpoint.clone(); let quic_proxy: AnyhowJoinHandle = tokio::spawn(async move { - info!("TPU Quic Proxy server start on {}", endpoint.local_addr()?); + info!("TPU Quic Proxy server listening on {}", endpoint.local_addr()?); while let Some(connecting) = endpoint.accept().await { @@ -87,7 +86,7 @@ impl QuicForwardProxy { let tpu_quic_client = self.tpu_quic_client.clone(); tokio::spawn(async move { - let connection = connecting.await.context("accept connection").unwrap(); + let connection = connecting.await.context("handshake").unwrap(); match accept_client_connection(connection, tpu_quic_client, exit_signal, validator_identity_copy) .await { Ok(()) => {} diff --git a/quic-forward-proxy/src/tpu_quic_client.rs b/quic-forward-proxy/src/tpu_quic_client.rs index d6299688..ceda5066 100644 --- a/quic-forward-proxy/src/tpu_quic_client.rs +++ b/quic-forward-proxy/src/tpu_quic_client.rs @@ -93,11 +93,23 @@ impl SingleTPUConnectionManager for TpuQuicClient { } } - let connection = match self.create_new(tpu_address).await { - Ok(value) => value, - Err(err) => return Err(err), - }; + let connection = + // TODO try 0rff + match QuicConnectionUtils::make_connection_0rtt( + self.endpoint.clone(), tpu_address, QUIC_CONNECTION_TIMEOUT) + .await { + Ok(conn) => conn, + Err(err) => { + warn!("Failed to open Quic connection to TPU {}: {}", tpu_address, err); + return Err(anyhow!("Failed to create Quic connection to TPU {}: {}", tpu_address, err)); + }, + }; + let old_value = self.connection_per_tpunode.insert(tpu_address, connection.clone()); + assert!(old_value.is_none(), "no prev value must be overridden"); + + debug!("Created new Quic connection {} to TPU node {}, total connections is now {}", + connection.stable_id(), tpu_address, self.connection_per_tpunode.len()); return Ok(connection); } @@ -256,26 +268,6 @@ impl TpuQuicClient { } } - pub(crate) async fn create_new(&self, tpu_address: SocketAddr) -> anyhow::Result { - let connection = - // TODO try 0rff - match QuicConnectionUtils::make_connection_0rtt( - self.endpoint.clone(), tpu_address, QUIC_CONNECTION_TIMEOUT) - .await { - Ok(conn) => conn, - Err(err) => { - warn!("Failed to open Quic connection to TPU {}: {}", tpu_address, err); - return Err(anyhow!("Failed to create Quic connection to TPU {}: {}", tpu_address, err)); - }, - }; - - let old_value = self.connection_per_tpunode.insert(tpu_address, connection.clone()); - assert!(old_value.is_none(), "no prev value must be overridden"); - - debug!("Created new Quic connection {} to TPU node {}, total connections is now {}", - connection.stable_id(), tpu_address, self.connection_per_tpunode.len()); - Ok(connection) - } } diff --git a/services/src/tpu_utils/quic_proxy_connection_manager.rs b/services/src/tpu_utils/quic_proxy_connection_manager.rs index 8a1e1511..e9005273 100644 --- a/services/src/tpu_utils/quic_proxy_connection_manager.rs +++ b/services/src/tpu_utils/quic_proxy_connection_manager.rs @@ -46,6 +46,7 @@ impl QuicProxyConnectionManager { validator_identity: Arc, proxy_addr: SocketAddr, ) -> Self { + info!("Configure Quic proxy connection manager to {}", proxy_addr); let endpoint = Self::create_proxy_client_endpoint(certificate.clone(), key.clone()); Self { diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index 1fba7461..8f665121 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -88,8 +88,6 @@ impl ActiveConnection { identity_stakes.stakes, identity_stakes.total_stakes, ) as u64; - // TODO remove - println!("max_uni_stream_connections {}", max_uni_stream_connections); let number_of_transactions_per_unistream = self .connection_parameters .number_of_transactions_per_unistream;