removing maximum message in queue

This commit is contained in:
godmodegalactus 2024-08-02 07:01:14 +02:00
parent e5834aa308
commit 9b5a5fae7e
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
4 changed files with 8 additions and 30 deletions

View File

@ -249,7 +249,6 @@ pub fn create_quiche_client_thread(
// no more new messages
}
std::sync::mpsc::TryRecvError::Disconnected => {
log::error!("message_queue disconnected");
let _ = connection.close(true, 0, b"no longer needed");
}
}
@ -405,7 +404,10 @@ mod tests {
let (server_send_queue, rx_sent_queue) = mpsc::channel::<ChannelMessage>();
let _server_loop_jh = std::thread::spawn(move || {
if let Err(e) = server_loop(
QuicParameters::default(),
QuicParameters {
incremental_priority: true,
..Default::default()
},
socket_addr,
rx_sent_queue,
CompressionType::Lz4Fast(8),

View File

@ -7,8 +7,8 @@ use crate::{
defaults::{
DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_ENABLE_GSO,
DEFAULT_ENABLE_PACING, DEFAULT_INCREMENTAL_PRIORITY, DEFAULT_MAX_ACK_DELAY,
DEFAULT_MAX_MESSAGES_IN_QUEUE, DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE,
DEFAULT_MAX_STREAMS, DEFAULT_USE_CC_BBR,
DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS,
DEFAULT_USE_CC_BBR,
},
};
@ -69,7 +69,6 @@ pub struct QuicParameters {
pub enable_pacing: bool,
pub use_cc_bbr: bool,
pub incremental_priority: bool,
pub max_messages_in_queue: u64,
pub enable_gso: bool,
}
@ -85,7 +84,6 @@ impl Default for QuicParameters {
enable_pacing: DEFAULT_ENABLE_PACING,
use_cc_bbr: DEFAULT_USE_CC_BBR,
incremental_priority: DEFAULT_INCREMENTAL_PRIORITY,
max_messages_in_queue: DEFAULT_MAX_MESSAGES_IN_QUEUE,
enable_gso: DEFAULT_ENABLE_GSO,
}
}

View File

@ -1,5 +1,4 @@
pub const DEFAULT_MAX_STREAMS: u64 = 128 * 1024;
pub const DEFAULT_MAX_MESSAGES_IN_QUEUE: u64 = 1024 * 1024;
pub const MAX_ALLOWED_PARTIAL_RESPONSES: u64 = DEFAULT_MAX_STREAMS * 3 / 4;
pub const DEFAULT_MAX_RECIEVE_WINDOW_SIZE: u64 = 24 * 1024 * 1024; // 24 MBs
pub const DEFAULT_CONNECTION_TIMEOUT: u64 = 10;

View File

@ -2,7 +2,7 @@ use std::{
collections::HashMap,
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize},
atomic::{AtomicBool, AtomicU64},
mpsc::{self, Sender},
Arc, Mutex, RwLock,
},
@ -39,7 +39,6 @@ use crate::configure_server::configure_server;
struct DispatchingData {
pub sender: Sender<(Vec<u8>, u8)>,
pub filters: Arc<RwLock<Vec<Filter>>>,
pub messages_in_queue: Arc<AtomicUsize>,
}
type DispachingConnections = Arc<Mutex<HashMap<ConnectionId<'static>, DispatchingData>>>;
@ -52,7 +51,6 @@ pub fn server_loop(
stop_laggy_client: bool,
) -> anyhow::Result<()> {
let maximum_concurrent_streams_id = u64::MAX;
let max_messages_in_queue = quic_params.max_messages_in_queue;
let mut config = configure_server(quic_params)?;
let mut socket = mio::net::UdpSocket::bind(socket_addr)?;
@ -103,7 +101,6 @@ pub fn server_loop(
message_send_queue,
dispatching_connections.clone(),
compression_type,
max_messages_in_queue,
);
let mut client_id_counter = 0;
@ -225,7 +222,6 @@ pub fn server_loop(
let (client_sender, client_reciver) = mio_channel::channel();
let (client_message_sx, client_message_rx) = mpsc::channel();
let messages_in_queue = Arc::new(AtomicUsize::new(0));
let current_client_id = client_id_counter;
client_id_counter += 1;
@ -240,7 +236,6 @@ pub fn server_loop(
filters.clone(),
maximum_concurrent_streams_id,
stop_laggy_client,
messages_in_queue.clone(),
quic_params.incremental_priority,
rng.clone(),
);
@ -250,7 +245,6 @@ pub fn server_loop(
DispatchingData {
sender: client_message_sx,
filters,
messages_in_queue,
},
);
clients_lk.insert(scid, current_client_id);
@ -312,7 +306,6 @@ fn create_client_task(
filters: Arc<RwLock<Vec<Filter>>>,
maximum_concurrent_streams_id: u64,
stop_laggy_client: bool,
messages_in_queue: Arc<AtomicUsize>,
incremental_priority: bool,
rng: SystemRandom,
) {
@ -348,7 +341,6 @@ fn create_client_task(
let number_of_readable_streams = number_of_readable_streams.clone();
let number_of_writable_streams = number_of_writable_streams.clone();
let messages_added = messages_added.clone();
let messages_in_queue = messages_in_queue.clone();
let quit = quit.clone();
std::thread::spawn(move || {
while !quit.load(std::sync::atomic::Ordering::Relaxed) {
@ -379,10 +371,6 @@ fn create_client_task(
"messages_added : {}",
messages_added.swap(0, std::sync::atomic::Ordering::Relaxed)
);
log::debug!(
"messages in queue to be sent : {}",
messages_in_queue.load(std::sync::atomic::Ordering::Relaxed)
);
}
});
}
@ -459,8 +447,6 @@ fn create_client_task(
loop {
let close = match message_channel.try_recv() {
Ok((message, priority)) => {
messages_in_queue
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
let stream_id = next_stream;
next_stream =
get_next_unidi(stream_id, true, maximum_concurrent_streams_id);
@ -615,10 +601,8 @@ fn create_dispatching_thread(
message_send_queue: mpsc::Receiver<ChannelMessage>,
dispatching_connections: DispachingConnections,
compression_type: CompressionType,
max_messages_in_queue: u64,
) {
std::thread::spawn(move || {
let max_messages_in_queue = max_messages_in_queue as usize;
while let Ok(message) = message_send_queue.recv() {
let mut dispatching_connections_lk = dispatching_connections.lock().unwrap();
@ -670,12 +654,7 @@ fn create_dispatching_thread(
bincode::serialize(&message).expect("Message should be serializable in binary");
for id in dispatching_connections.iter() {
let data = dispatching_connections_lk.get(id).unwrap();
let messages_in_queue = data
.messages_in_queue
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if data.sender.send((binary.clone(), priority)).is_err()
|| messages_in_queue > max_messages_in_queue
{
if data.sender.send((binary.clone(), priority)).is_err() {
// client is closed
dispatching_connections_lk.remove(id);
}