diff --git a/Cargo.lock b/Cargo.lock index 2e812733..e6da24f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -310,6 +310,17 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-recursion" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e97ce7de6cf12de5d7226c73f5ba9811622f4db3a5b91b55c53e987e5f91cba" +dependencies = [ + "proc-macro2 1.0.56", + "quote 1.0.26", + "syn 2.0.13", +] + [[package]] name = "async-trait" version = "0.1.64" @@ -2247,6 +2258,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-channel", + "async-recursion", "base64 0.21.0", "bench", "bincode", diff --git a/Cargo.toml b/Cargo.toml index db22b469..1ca888c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ dotenv = "0.15.0" async-channel = "1.8.0" quinn = "0.9.3" rustls = { version = "0.20.6", default-features = false } +async-recursion = "1.0.4" [dev-dependencies] bench = { path = "./bench" } @@ -84,3 +85,4 @@ dotenv = { workspace = true } async-channel = { workspace = true } quinn = { workspace = true } rustls = { workspace = true } +async-recursion = { workspace = true } diff --git a/src/workers/tpu_utils/tpu_connection_manager.rs b/src/workers/tpu_utils/tpu_connection_manager.rs index fde8ce82..56a3a760 100644 --- a/src/workers/tpu_utils/tpu_connection_manager.rs +++ b/src/workers/tpu_utils/tpu_connection_manager.rs @@ -8,12 +8,13 @@ use std::{ time::Duration, }; +use async_recursion::async_recursion; use dashmap::DashMap; use log::{error, info, trace, warn}; use prometheus::{core::GenericGauge, opts, register_int_gauge}; use quinn::{ - ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, - TransportConfig, + ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, + TokioRuntime, TransportConfig, }; use solana_sdk::pubkey::Pubkey; use tokio::{ @@ -24,7 +25,8 @@ use tokio::{ use super::rotating_queue::RotatingQueue; pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; -const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: u64 = 5; +const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(10); +const CONNECTION_RETRY_COUNT: usize = 10; lazy_static::lazy_static! { static ref NB_QUIC_CONNECTIONS: GenericGauge = @@ -54,13 +56,7 @@ impl ActiveConnection { async fn make_connection(endpoint: Endpoint, addr: SocketAddr) -> anyhow::Result { let connecting = endpoint.connect(addr, "connect")?; - - // let res = timeout( - // Duration::from_secs(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC), - // connecting, - // ) - // .await?; - let res = connecting.await; + let res = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, connecting).await?; Ok(res.unwrap()) } @@ -71,23 +67,15 @@ impl ActiveConnection { let connecting = endpoint.connect(addr, "connect")?; let connection = match connecting.into_0rtt() { Ok((connection, zero_rtt)) => { - if let Ok(_) = timeout( - Duration::from_secs(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC), - zero_rtt, - ) - .await - { + if let Ok(_) = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, zero_rtt).await { connection } else { return Err(ConnectionError::TimedOut.into()); } } Err(connecting) => { - if let Ok(connecting_result) = timeout( - Duration::from_millis(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC), - connecting, - ) - .await + if let Ok(connecting_result) = + timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, connecting).await { connecting_result? } else { @@ -98,6 +86,98 @@ impl ActiveConnection { Ok(connection) } + async fn connect( + identity: Pubkey, + already_connected: bool, + endpoint: Endpoint, + addr: SocketAddr, + exit_signal: Arc, + ) -> Option> { + for _i in 0..CONNECTION_RETRY_COUNT { + let conn = if already_connected { + info!("making make_connection_0rtt"); + Self::make_connection_0rtt(endpoint.clone(), addr.clone()).await + } else { + info!("making make_connection"); + Self::make_connection(endpoint.clone(), addr.clone()).await + }; + match conn { + Ok(conn) => { + NB_QUIC_CONNECTIONS.inc(); + return Some(Arc::new(conn)); + } + Err(e) => { + trace!("Could not connect to {} because of error {}", identity, e); + if exit_signal.load(Ordering::Relaxed) { + break; + } + } + } + } + None + } + + #[async_recursion] + async fn open_unistream( + connection: &mut Option>, + reconnect: bool, + identity: Pubkey, + already_connected: bool, + endpoint: Endpoint, + addr: SocketAddr, + exit_signal: Arc, + ) -> Option { + let (unistream, reconnect_and_try_again) = match connection { + Some(conn) => { + let unistream_maybe_timeout = + timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, conn.open_uni()).await; + match unistream_maybe_timeout { + Ok(unistream_res) => match unistream_res { + Ok(unistream) => (Some(unistream), false), + Err(_) => (None, reconnect), + }, + Err(_) => { + // timed out + (None, false) + } + } + } + None => (None, true), + }; + + if reconnect_and_try_again { + let conn = Self::connect( + identity, + already_connected, + endpoint.clone(), + addr.clone(), + exit_signal.clone(), + ) + .await; + match conn { + Some(conn) => { + *connection = Some(conn); + Self::open_unistream( + connection, + false, + identity, + already_connected, + endpoint, + addr, + exit_signal, + ) + .await + } + None => { + // connection with the peer is not possible + None + } + } + } else { + unistream + } + } + async fn listen( transaction_reciever: Receiver>, exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>, @@ -108,9 +188,10 @@ impl ActiveConnection { ) { NB_QUIC_TASKS.inc(); let mut already_connected = false; - let mut connection: Option = None; + let mut connection: Option> = None; let mut transaction_reciever = transaction_reciever; let mut exit_oneshot_channel = exit_oneshot_channel; + loop { // exit signal set if exit_signal.load(Ordering::Relaxed) { @@ -118,7 +199,7 @@ impl ActiveConnection { } tokio::select! { - tx_or_timeout = timeout(Duration::from_secs(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC), transaction_reciever.recv() ) => { + tx_or_timeout = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, transaction_reciever.recv() ) => { // exit signal set if exit_signal.load(Ordering::Relaxed) { break; @@ -136,76 +217,65 @@ impl ActiveConnection { continue; } }; - let mut send_stream = match &connection { - Some(conn) => { - let unistream = conn.open_uni().await; - if let Err(_) = unistream { - // reconnect as connection is closed by the server and then retry - let conn = Self::make_connection_0rtt(endpoint.clone(), addr.clone()).await; - match conn { - Ok(conn) => { - let unistream = conn.open_uni().await; - connection = Some(conn); - match unistream{ - Ok(stream) => stream, - Err(e) => { - warn!("error opening a unistream for {} error {}", identity, e); - break; - } - } - }, - Err(e) => { - // validator no longer accepting connection - trace!("Could not reconnect to {} because of error {}", identity, e); - break; + let unistream = Self::open_unistream( + &mut connection, + true, + identity.clone(), + already_connected, + endpoint.clone(), + addr.clone(), + exit_signal.clone(), + ).await; + + if !already_connected && connection.is_some() { + already_connected = true; + } + + match unistream { + Some(mut send_stream) => { + trace!("Sending {} transaction", identity); + let write_timeout_res = timeout( QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, send_stream.write_all(tx.as_slice())).await; + match write_timeout_res { + Ok(write_res) => { + if let Err(e) = write_res { + warn!( + "Error while writing transaction for {}, error {}", + identity, + e + ); } + }, + Err(_) => { + warn!( + "timeout while writing transaction for {}", + identity + ); + } + } + + let finish_timeout_res = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, send_stream.finish()).await; + match finish_timeout_res { + Ok(finish_res) => { + if let Err(e) = finish_res { + warn!( + "Error while writing transaction for {}, error {}", + identity, + e + ); + } + }, + Err(_) => { + warn!( + "timeout while writing transaction for {}", + identity + ); } - } else { - unistream.unwrap() } }, None => { - let conn = if already_connected { - info!("making make_connection_0rtt"); - Self::make_connection_0rtt(endpoint.clone(), addr.clone()).await - } else { - info!("making make_connection"); - Self::make_connection(endpoint.clone(), addr.clone()).await - }; - match conn { - Ok(conn) => { - NB_QUIC_CONNECTIONS.inc(); - already_connected = true; - let unistream = conn.open_uni().await; - if let Err(e) = unistream { - warn!("error opening a unistream for {} error {}", identity, e); - continue; - } - - connection = Some(conn); - unistream.unwrap() - }, - Err(e) => { - warn!("Could not connect to {} because of error {}", identity, e); - break; - } - } - + trace!("could not create a unistream for {}", identity); + break; } - }; - - trace!("Sending {} transaction", identity); - if let Err(e) = send_stream.write_all(tx.as_slice()).await { - warn!( - "Error while writing transaction for {} error {}", - identity, e - ); - } - if let Err(e) = send_stream.finish().await { - warn!( - "Error finishing for {}, error {}", - identity, e, - ) } }, Err(_) => {