establish connection before using QuicConnection

This commit is contained in:
Godmode Galactus 2023-09-19 13:35:58 +02:00
parent 41ae03e8f5
commit 4b51dc0460
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
2 changed files with 28 additions and 33 deletions

View File

@ -29,6 +29,7 @@ pub struct QuicConnection {
connection_params: QuicConnectionParameters,
exit_signal: Arc<AtomicBool>,
timeout_counters: Arc<AtomicU64>,
has_connected_once: Arc<AtomicBool>,
}
impl QuicConnection {
@ -48,6 +49,7 @@ impl QuicConnection {
connection_params,
exit_signal,
timeout_counters: Arc::new(AtomicU64::new(0)),
has_connected_once: Arc::new(AtomicBool::new(false)),
}
}
@ -64,7 +66,7 @@ impl QuicConnection {
.await
}
async fn get_connection(&self) -> Option<Connection> {
pub async fn get_connection(&self) -> Option<Connection> {
// get new connection reset if necessary
let last_stable_id = self.last_stable_id.load(Ordering::Relaxed) as usize;
let conn = self.connection.read().await.clone();
@ -95,6 +97,7 @@ impl QuicConnection {
None => {
let connection = self.connect().await;
*self.connection.write().await = connection.clone();
self.has_connected_once.store(true, Ordering::Relaxed);
connection
}
}
@ -111,10 +114,6 @@ impl QuicConnection {
let mut do_retry = false;
let connection = self.get_connection().await;
if self.exit_signal.load(Ordering::Relaxed) {
return;
}
if let Some(connection) = connection {
let current_stable_id = connection.stable_id() as u64;
match QuicConnectionUtils::open_unistream(
@ -175,6 +174,10 @@ impl QuicConnection {
pub fn reset_timeouts(&self) {
self.timeout_counters.store(0, Ordering::Relaxed);
}
pub fn has_connected_atleast_once(&self) -> bool {
self.has_connected_once.load(Ordering::Relaxed)
}
}
#[derive(Clone)]
@ -232,11 +235,14 @@ impl QuicConnectionPool {
)
.await;
drop(_others);
// establish a connection if the connection has not yet been used
let connection = self.connections[index].clone();
if !connection.has_connected_atleast_once() {
connection.get_connection().await;
}
let permit = permit.context("Cannot aquire permit, connection pool erased")?;
Ok(PooledConnection {
connection: self.connections[index].clone(),
permit,
})
Ok(PooledConnection { connection, permit })
}
pub fn len(&self) -> usize {

View File

@ -80,16 +80,14 @@ impl ActiveConnection {
let mut exit_oneshot_channel = exit_oneshot_channel;
let identity = self.identity;
let number_of_transactions_per_unistream = self
.connection_parameters
.number_of_transactions_per_unistream;
let max_number_of_connections = self.connection_parameters.max_number_of_connections;
let max_uni_stream_connections = compute_max_allowed_uni_streams(
identity_stakes.peer_type,
identity_stakes.stakes,
identity_stakes.total_stakes,
);
)
.saturating_sub(1);
let exit_signal = self.exit_signal.clone();
let connection_pool = QuicConnectionPool::new(
identity,
@ -114,7 +112,7 @@ impl ActiveConnection {
break;
}
let first_tx: Vec<u8> = match tx {
let tx: Vec<u8> = match tx {
Ok((sig, tx)) => {
if Self::check_for_confirmation(&txs_sent_store, sig) {
// transaction is already confirmed/ no need to send
@ -131,31 +129,22 @@ impl ActiveConnection {
}
};
let mut txs = vec![first_tx];
for _ in 1..number_of_transactions_per_unistream {
if let Ok((sig, tx)) = transaction_reciever.try_recv() {
if Self::check_for_confirmation(&txs_sent_store, sig) {
continue;
}
txs.push(tx);
}
}
let connection_pool = match connection_pool.get_pooled_connection().await {
let PooledConnection {
connection,
permit
} = match connection_pool.get_pooled_connection().await {
Ok(connection_pool) => connection_pool,
Err(_) => break,
Err(e) => {
error!("error getting pooled connection {e:?}");
break;
},
};
tokio::spawn(async move {
let PooledConnection {
connection,
permit
} = connection_pool;
// permit will be used to send all the transaction and then destroyed
let _permit = permit;
NB_QUIC_TASKS.inc();
for tx in txs {
connection.send_transaction(tx).await;
}
connection.send_transaction(tx).await;
NB_QUIC_TASKS.dec();
});
},