From 4b51dc0460019284e7618d1c38b9715ac45ad411 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Tue, 19 Sep 2023 13:35:58 +0200 Subject: [PATCH] establish connection before using QuicConnection --- core/src/quic_connection.rs | 24 +++++++----- .../src/tpu_utils/tpu_connection_manager.rs | 37 +++++++------------ 2 files changed, 28 insertions(+), 33 deletions(-) diff --git a/core/src/quic_connection.rs b/core/src/quic_connection.rs index ca2720e5..dfb14a34 100644 --- a/core/src/quic_connection.rs +++ b/core/src/quic_connection.rs @@ -29,6 +29,7 @@ pub struct QuicConnection { connection_params: QuicConnectionParameters, exit_signal: Arc, timeout_counters: Arc, + has_connected_once: Arc, } impl QuicConnection { @@ -48,6 +49,7 @@ impl QuicConnection { connection_params, exit_signal, timeout_counters: Arc::new(AtomicU64::new(0)), + has_connected_once: Arc::new(AtomicBool::new(false)), } } @@ -64,7 +66,7 @@ impl QuicConnection { .await } - async fn get_connection(&self) -> Option { + pub async fn get_connection(&self) -> Option { // get new connection reset if necessary let last_stable_id = self.last_stable_id.load(Ordering::Relaxed) as usize; let conn = self.connection.read().await.clone(); @@ -95,6 +97,7 @@ impl QuicConnection { None => { let connection = self.connect().await; *self.connection.write().await = connection.clone(); + self.has_connected_once.store(true, Ordering::Relaxed); connection } } @@ -111,10 +114,6 @@ impl QuicConnection { let mut do_retry = false; let connection = self.get_connection().await; - if self.exit_signal.load(Ordering::Relaxed) { - return; - } - if let Some(connection) = connection { let current_stable_id = connection.stable_id() as u64; match QuicConnectionUtils::open_unistream( @@ -175,6 +174,10 @@ impl QuicConnection { pub fn reset_timeouts(&self) { self.timeout_counters.store(0, Ordering::Relaxed); } + + pub fn has_connected_atleast_once(&self) -> bool { + self.has_connected_once.load(Ordering::Relaxed) + } } #[derive(Clone)] @@ -232,11 +235,14 @@ impl QuicConnectionPool { ) .await; drop(_others); + + // establish a connection if the connection has not yet been used + let connection = self.connections[index].clone(); + if !connection.has_connected_atleast_once() { + connection.get_connection().await; + } let permit = permit.context("Cannot aquire permit, connection pool erased")?; - Ok(PooledConnection { - connection: self.connections[index].clone(), - permit, - }) + Ok(PooledConnection { connection, permit }) } pub fn len(&self) -> usize { diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index b0eb34d2..ee28fe10 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -80,16 +80,14 @@ impl ActiveConnection { let mut exit_oneshot_channel = exit_oneshot_channel; let identity = self.identity; - let number_of_transactions_per_unistream = self - .connection_parameters - .number_of_transactions_per_unistream; let max_number_of_connections = self.connection_parameters.max_number_of_connections; let max_uni_stream_connections = compute_max_allowed_uni_streams( identity_stakes.peer_type, identity_stakes.stakes, identity_stakes.total_stakes, - ); + ) + .saturating_sub(1); let exit_signal = self.exit_signal.clone(); let connection_pool = QuicConnectionPool::new( identity, @@ -114,7 +112,7 @@ impl ActiveConnection { break; } - let first_tx: Vec = match tx { + let tx: Vec = match tx { Ok((sig, tx)) => { if Self::check_for_confirmation(&txs_sent_store, sig) { // transaction is already confirmed/ no need to send @@ -131,31 +129,22 @@ impl ActiveConnection { } }; - let mut txs = vec![first_tx]; - for _ in 1..number_of_transactions_per_unistream { - if let Ok((sig, tx)) = transaction_reciever.try_recv() { - if Self::check_for_confirmation(&txs_sent_store, sig) { - continue; - } - txs.push(tx); - } - } - - let connection_pool = match connection_pool.get_pooled_connection().await { + let PooledConnection { + connection, + permit + } = match connection_pool.get_pooled_connection().await { Ok(connection_pool) => connection_pool, - Err(_) => break, + Err(e) => { + error!("error getting pooled connection {e:?}"); + break; + }, }; + tokio::spawn(async move { - let PooledConnection { - connection, - permit - } = connection_pool; // permit will be used to send all the transaction and then destroyed let _permit = permit; NB_QUIC_TASKS.inc(); - for tx in txs { - connection.send_transaction(tx).await; - } + connection.send_transaction(tx).await; NB_QUIC_TASKS.dec(); }); },