Trying to pick free connection in order so that we prioritize connection that already have been established

This commit is contained in:
Godmode Galactus 2023-09-26 16:28:05 +02:00
parent a68a379ed1
commit be435249ed
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
1 changed files with 26 additions and 5 deletions

View File

@ -2,7 +2,6 @@ use crate::{
quic_connection_utils::{QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils},
structures::rotating_queue::RotatingQueue,
};
use anyhow::Context;
use futures::FutureExt;
use log::warn;
use quinn::{Connection, Endpoint};
@ -178,6 +177,14 @@ impl QuicConnection {
pub fn has_connected_atleast_once(&self) -> bool {
self.has_connected_once.load(Ordering::Relaxed)
}
pub async fn is_connected(&self) -> bool {
let connection = self.connection.read().await.clone();
match connection {
Some(connection) => connection.close_reason().is_none(),
None => false,
}
}
}
#[derive(Clone)]
@ -227,21 +234,35 @@ impl QuicConnectionPool {
}
}
pub async fn get_pooled_connection(&self) -> anyhow::Result<PooledConnection> {
let (permit, index, _others) = futures::future::select_all(
async fn get_permit_and_index(&self) -> anyhow::Result<(OwnedSemaphorePermit, usize)> {
// pefer getting connection that were already established
for (index, sem) in self.transactions_in_sending_semaphore.iter().enumerate() {
let connection = &self.connections[index];
if connection.has_connected_atleast_once() || connection.is_connected().await {
// if it is connection is not yet created or connection is still open
if let Ok(permit) = sem.clone().try_acquire_owned() {
return Ok((permit, index));
}
}
}
// if all of the connections are full then fall back on semaphore which gets free first.
let (permit, index, _) = futures::future::select_all(
self.transactions_in_sending_semaphore
.iter()
.map(|x| x.clone().acquire_owned().boxed()),
)
.await;
drop(_others);
let permit = permit?;
Ok((permit, index))
}
pub async fn get_pooled_connection(&self) -> anyhow::Result<PooledConnection> {
let (permit, index) = self.get_permit_and_index().await?;
// establish a connection if the connection has not yet been used
let connection = self.connections[index].clone();
if !connection.has_connected_atleast_once() {
connection.get_connection().await;
}
let permit = permit.context("Cannot aquire permit, connection pool erased")?;
Ok(PooledConnection { connection, permit })
}