From a2207da37a1095a7d7212245aac8674a62a5df89 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Tue, 25 Jul 2023 09:04:09 +0200 Subject: [PATCH] wip works --- quic-forward-proxy/src/main.rs | 1 - quic-forward-proxy/src/proxy.rs | 6 +- .../src/quic_connection_utils.rs | 2 + quic-forward-proxy/src/tpu_quic_client.rs | 23 ++++-- .../quic_proxy_connection_manager.rs | 73 ++++++++++++------- .../src/tpu_utils/tpu_connection_manager.rs | 1 + 6 files changed, 72 insertions(+), 34 deletions(-) diff --git a/quic-forward-proxy/src/main.rs b/quic-forward-proxy/src/main.rs index 0d840af0..7d4a5947 100644 --- a/quic-forward-proxy/src/main.rs +++ b/quic-forward-proxy/src/main.rs @@ -15,7 +15,6 @@ pub mod tls_config_provicer; pub mod proxy; pub mod proxy_request_format; pub mod tpu_quic_client; -pub mod active_connection; pub mod cli; pub mod test_client; mod util; diff --git a/quic-forward-proxy/src/proxy.rs b/quic-forward-proxy/src/proxy.rs index a6f45efa..0a7444f0 100644 --- a/quic-forward-proxy/src/proxy.rs +++ b/quic-forward-proxy/src/proxy.rs @@ -124,6 +124,7 @@ async fn accept_client_connection(client_connection: Connection, tpu_quic_client let exit_signal_copy = exit_signal.clone(); let validator_identity_copy = validator_identity.clone(); let tpu_quic_client_copy = tpu_quic_client.clone(); + tokio::spawn(async move { let raw_request = recv_stream.read_to_end(10_000_000).await // TODO extract to const @@ -137,15 +138,16 @@ async fn accept_client_connection(client_connection: Connection, tpu_quic_client let tpu_address = proxy_request.get_tpu_socket_addr(); let txs = proxy_request.get_transactions(); + // TODO join get_or_create_connection future and read_to_end let tpu_connection = tpu_quic_client_copy.get_or_create_connection(tpu_address).await; info!("send transaction batch of size {} to address {}", txs.len(), tpu_address); tpu_quic_client_copy.send_txs_to_tpu(tpu_connection, &txs, exit_signal_copy).await; - // active_tpu_connection_copy.send_txs_to_tpu(exit_signal_copy, validator_identity_copy, tpu_identity, tpu_address, &txs).await; - }).instrument(debug_span!("send_txs_to_tpu")).await.unwrap(); + }) + .await.unwrap(); } // -- loop } diff --git a/quic-forward-proxy/src/quic_connection_utils.rs b/quic-forward-proxy/src/quic_connection_utils.rs index be4b2a57..32883a55 100644 --- a/quic-forward-proxy/src/quic_connection_utils.rs +++ b/quic-forward-proxy/src/quic_connection_utils.rs @@ -13,6 +13,7 @@ use std::{ use anyhow::bail; use tokio::{sync::RwLock, time::timeout}; use tokio::time::error::Elapsed; +use tracing::instrument; const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; @@ -185,6 +186,7 @@ impl QuicConnectionUtils { } #[allow(clippy::too_many_arguments)] + #[tracing::instrument(skip_all, level = "debug")] pub async fn send_transaction_batch( connection: Connection, txs: Vec>, diff --git a/quic-forward-proxy/src/tpu_quic_client.rs b/quic-forward-proxy/src/tpu_quic_client.rs index 88db04bc..57b3a5f4 100644 --- a/quic-forward-proxy/src/tpu_quic_client.rs +++ b/quic-forward-proxy/src/tpu_quic_client.rs @@ -1,7 +1,9 @@ +use std::collections::HashMap; +use std::io::Write; use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::Path; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicBool, AtomicU64}; use std::time::Duration; use anyhow::{anyhow, bail}; @@ -31,6 +33,7 @@ pub const CONNECTION_RETRY_COUNT: usize = 10; pub struct TpuQuicClient { endpoint: Endpoint, // naive single non-recoverable connection - TODO moke it smarter + // TODO consider using DashMap again connection_per_tpunode: Arc>, } @@ -58,8 +61,18 @@ impl TpuQuicClient { #[tracing::instrument(skip(self), level = "debug")] pub async fn get_or_create_connection(&self, tpu_address: SocketAddr) -> Connection { - if let Some(conn) = self.connection_per_tpunode.get(&tpu_address) { - return conn.clone(); + info!("looking up {}", tpu_address); + // TODO try 0rff + // QuicConnectionUtils::make_connection( + // self.endpoint.clone(), tpu_address, QUIC_CONNECTION_TIMEOUT) + // .await.unwrap() + + + { + if let Some(conn) = self.connection_per_tpunode.get(&tpu_address) { + debug!("reusing connection {:?}", conn); + return conn.clone(); + } } let connection = @@ -68,8 +81,8 @@ impl TpuQuicClient { self.endpoint.clone(), tpu_address, QUIC_CONNECTION_TIMEOUT) .await.unwrap(); - - self.connection_per_tpunode.insert(tpu_address, connection.clone()); + let old_value = self.connection_per_tpunode.insert(tpu_address, connection.clone()); + assert!(old_value.is_none(), "no prev value must be overridden"); debug!("Created new Quic connection to TPU node {}, total connections is now {}", tpu_address, self.connection_per_tpunode.len()); return connection; diff --git a/services/src/tpu_utils/quic_proxy_connection_manager.rs b/services/src/tpu_utils/quic_proxy_connection_manager.rs index a2de3ec8..2deceb44 100644 --- a/services/src/tpu_utils/quic_proxy_connection_manager.rs +++ b/services/src/tpu_utils/quic_proxy_connection_manager.rs @@ -11,7 +11,7 @@ use async_trait::async_trait; use futures::FutureExt; use itertools::Itertools; use log::{debug, error, info, warn}; -use quinn::{ClientConfig, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig}; +use quinn::{ClientConfig, Connection, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signer::Signer; use solana_sdk::signature::Keypair; @@ -65,8 +65,6 @@ impl QuicProxyConnectionManager { { - let mut lock = self.current_tpu_nodes.write().await; - let list_of_nodes = connections_to_keep.iter().map(|(identity, tpu_address)| { TpuNode { tpu_identity: identity.clone(), @@ -74,6 +72,7 @@ impl QuicProxyConnectionManager { } }).collect_vec(); + let mut lock = self.current_tpu_nodes.write().await; *lock = list_of_nodes; } @@ -82,6 +81,7 @@ impl QuicProxyConnectionManager { // already started return; } + self.simple_thread_started.store(true, Relaxed); info!("Starting very simple proxy thread"); @@ -141,6 +141,10 @@ impl QuicProxyConnectionManager { proxy_addr: SocketAddr, endpoint: Endpoint, ) { + + let mut connection = endpoint.connect(proxy_addr, "localhost").unwrap() + .await.unwrap(); + loop { // TODO exit signal ??? @@ -150,7 +154,7 @@ impl QuicProxyConnectionManager { // exit signal??? - let the_tx: Vec = match tx { + let first_tx: Vec = match tx { Ok((sig, tx)) => { // if Self::check_for_confirmation(&txs_sent_store, sig) { // // transaction is already confirmed/ no need to send @@ -165,20 +169,31 @@ impl QuicProxyConnectionManager { } }; - // TODO read all txs from channel (see "let mut txs = vec![first_tx];") - let txs = vec![the_tx]; + let number_of_transactions_per_unistream = 8; // TODO read from QuicConnectionParameters - let tpu_fanout_nodes = current_tpu_nodes.read().await; + let mut txs = vec![first_tx]; + // TODO comment in + // for _ in 1..number_of_transactions_per_unistream { + // if let Ok((signature, tx)) = transaction_receiver.try_recv() { + // // if Self::check_for_confirmation(&txs_sent_store, signature) { + // // continue; + // // } + // txs.push(tx); + // } + // } - info!("Sending copy of transaction batch of {} to {} tpu nodes via quic proxy", + let tpu_fanout_nodes = current_tpu_nodes.read().await.clone(); + + info!("Sending copy of transaction batch of {} txs to {} tpu nodes via quic proxy", txs.len(), tpu_fanout_nodes.len()); - for target_tpu_node in &*tpu_fanout_nodes { + for target_tpu_node in tpu_fanout_nodes { Self::send_copy_of_txs_to_quicproxy( &txs, endpoint.clone(), - proxy_addr, + proxy_addr, target_tpu_node.tpu_address, - target_tpu_node.tpu_identity).await.unwrap(); + target_tpu_node.tpu_identity) + .await.unwrap(); } }, @@ -190,7 +205,11 @@ impl QuicProxyConnectionManager { proxy_address: SocketAddr, tpu_target_address: SocketAddr, target_tpu_identity: Pubkey) -> anyhow::Result<()> { - info!("sending vecvec: {}", raw_tx_batch.iter().map(|tx| tx.len()).into_iter().join(",")); + info!("sending vecvec {} to quic proxy for TPU node {}", + raw_tx_batch.iter().map(|tx| tx.len()).into_iter().join(","), tpu_target_address); + + // TODO add timeout + // let mut send_stream = timeout(Duration::from_millis(500), connection.open_uni()).await??; let raw_tx_batch_copy = raw_tx_batch.clone(); @@ -207,6 +226,7 @@ impl QuicProxyConnectionManager { } let forwarding_request = TpuForwardingRequest::new(tpu_target_address, target_tpu_identity, txs); + debug!("forwarding_request: {}", forwarding_request); let proxy_request_raw = bincode::serialize(&forwarding_request).expect("Expect to serialize transactions"); @@ -222,24 +242,25 @@ impl QuicProxyConnectionManager { bail!("Failed to send data to quic proxy: {:?}", e); } } - Ok(()) - } - - - async fn send_proxy_request(endpoint: Endpoint, proxy_address: SocketAddr, proxy_request_raw: &Vec) -> anyhow::Result<()> { - info!("sending {} bytes to proxy", proxy_request_raw.len()); - - let mut connecting = endpoint.connect(proxy_address, "localhost")?; - let connection = timeout(Duration::from_millis(500), connecting).await??; - let mut send = connection.open_uni().await?; - - send.write_all(proxy_request_raw).await?; - - send.finish().await?; Ok(()) } + async fn send_proxy_request(endpoint: Endpoint, proxy_address: SocketAddr, proxy_request_raw: &Vec) -> anyhow::Result<()> { + info!("sending {} bytes to proxy", proxy_request_raw.len()); + + let mut connecting = endpoint.connect(proxy_address, "localhost")?; + let connection = timeout(Duration::from_millis(500), connecting).await??; + let mut send = connection.open_uni().await?; + + send.write_all(proxy_request_raw).await?; + + send.finish().await?; + + Ok(()) + } + + } diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index 70fc9bae..8f665121 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -237,6 +237,7 @@ impl TpuConnectionManager { ) -> Self { let number_of_clients = fanout * 2; Self { + // TODO endpoints: RotatingQueue::new(number_of_clients, || { QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone()) })