diff --git a/quiche/src/quiche_reciever.rs b/quiche/src/quiche_reciever.rs index 5254068..1b4ed54 100644 --- a/quiche/src/quiche_reciever.rs +++ b/quiche/src/quiche_reciever.rs @@ -20,7 +20,7 @@ pub fn recv_message( None => vec![], }; loop { - let mut buf = [0; MAX_DATAGRAM_SIZE]; // 10kk buffer size + let mut buf = [0; MAX_DATAGRAM_SIZE]; match connection.stream_recv(stream_id, &mut buf) { Ok((read, fin)) => { log::trace!("read {} on stream {}", read, stream_id); diff --git a/server/src/quic_server.rs b/server/src/quic_server.rs index fb720c6..253731e 100644 --- a/server/src/quic_server.rs +++ b/server/src/quic_server.rs @@ -19,7 +19,6 @@ impl Debug for QuicServer { impl QuicServer { pub fn new(config: ConfigQuicPlugin) -> anyhow::Result { - let max_number_of_streams = config.quic_parameters.max_number_of_streams_per_client; let server_config = configure_server(config.quic_parameters)?; let socket = config.address; let compression_type = config.compression_parameters.compression_type; @@ -33,7 +32,6 @@ impl QuicServer { data_channel_tx, compression_type, true, - max_number_of_streams, ) { panic!("Server loop closed by error : {e}"); } diff --git a/server/src/quiche_server_loop.rs b/server/src/quiche_server_loop.rs index 16f22b6..6de06cc 100644 --- a/server/src/quiche_server_loop.rs +++ b/server/src/quiche_server_loop.rs @@ -32,7 +32,6 @@ use quic_geyser_quiche_utils::{ struct DispatchingData { pub sender: Sender<(Vec, u8)>, pub filters: Arc>>, - pub message_counter: Arc, } type DispachingConnections = Arc, DispatchingData>>>; @@ -50,7 +49,6 @@ pub fn server_loop( message_send_queue: mpsc::Receiver, compression_type: CompressionType, stop_laggy_client: bool, - max_number_of_streams: u64, ) -> anyhow::Result<()> { let maximum_concurrent_streams_id = u64::MAX; @@ -94,7 +92,6 @@ pub fn server_loop( message_send_queue, dispatching_connections.clone(), compression_type, - max_number_of_streams, ); loop { @@ -210,9 +207,7 @@ pub fn server_loop( }; let (client_sender, client_reciver) = mio_channel::channel(); - let (client_message_sx, client_message_rx) = mpsc::channel(); - let message_counter = Arc::new(AtomicU64::new(0)); let filters = Arc::new(RwLock::new(Vec::new())); create_client_task( @@ -223,7 +218,6 @@ pub fn server_loop( filters.clone(), maximum_concurrent_streams_id, stop_laggy_client, - message_counter.clone(), ); let mut lk = dispatching_connections.lock().unwrap(); lk.insert( @@ -231,7 +225,6 @@ pub fn server_loop( DispatchingData { sender: client_message_sx, filters, - message_counter, }, ); clients.insert(scid, client_sender); @@ -302,7 +295,6 @@ fn create_client_task( filters: Arc>>, maximum_concurrent_streams_id: u64, stop_laggy_client: bool, - message_count: Arc, ) { std::thread::spawn(move || { let mut partial_responses = PartialResponses::new(); @@ -426,9 +418,7 @@ fn create_client_task( while partial_responses.len() < max_allowed_partial_responses { let close = match message_channel.try_recv() { Ok((message, priority)) => { - message_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); let stream_id = next_stream; - next_stream = get_next_unidi(stream_id, true, maximum_concurrent_streams_id); @@ -459,6 +449,7 @@ fn create_client_task( } mpsc::TryRecvError::Disconnected => { // too many message the connection is lagging + log::error!("channel disconnected by dispatcher"); true } } @@ -523,7 +514,6 @@ fn create_dispatching_thread( message_send_queue: mpsc::Receiver, dispatching_connections: DispachingConnections, compression_type: CompressionType, - max_number_of_streams: u64, ) { std::thread::spawn(move || { while let Ok(message) = message_send_queue.recv() { @@ -573,20 +563,12 @@ fn create_dispatching_thread( bincode::serialize(&message).expect("Message should be serializable in binary"); for id in dispatching_connections.iter() { let data = dispatching_connections_lk.get(id).unwrap(); - data.message_counter - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); if data.sender.send((binary.clone(), priority)).is_err() { // client is closed dispatching_connections_lk.remove(id); } } } - dispatching_connections_lk.retain(|_id, connection| { - connection - .message_counter - .load(std::sync::atomic::Ordering::Relaxed) - < max_number_of_streams - }); } }); }