only quiche will stop laggy clients
This commit is contained in:
parent
8e4dbc7efb
commit
828cfe38a3
|
@ -20,7 +20,7 @@ pub fn recv_message(
|
|||
None => vec![],
|
||||
};
|
||||
loop {
|
||||
let mut buf = [0; MAX_DATAGRAM_SIZE]; // 10kk buffer size
|
||||
let mut buf = [0; MAX_DATAGRAM_SIZE];
|
||||
match connection.stream_recv(stream_id, &mut buf) {
|
||||
Ok((read, fin)) => {
|
||||
log::trace!("read {} on stream {}", read, stream_id);
|
||||
|
|
|
@ -19,7 +19,6 @@ impl Debug for QuicServer {
|
|||
|
||||
impl QuicServer {
|
||||
pub fn new(config: ConfigQuicPlugin) -> anyhow::Result<Self> {
|
||||
let max_number_of_streams = config.quic_parameters.max_number_of_streams_per_client;
|
||||
let server_config = configure_server(config.quic_parameters)?;
|
||||
let socket = config.address;
|
||||
let compression_type = config.compression_parameters.compression_type;
|
||||
|
@ -33,7 +32,6 @@ impl QuicServer {
|
|||
data_channel_tx,
|
||||
compression_type,
|
||||
true,
|
||||
max_number_of_streams,
|
||||
) {
|
||||
panic!("Server loop closed by error : {e}");
|
||||
}
|
||||
|
|
|
@ -32,7 +32,6 @@ use quic_geyser_quiche_utils::{
|
|||
struct DispatchingData {
|
||||
pub sender: Sender<(Vec<u8>, u8)>,
|
||||
pub filters: Arc<RwLock<Vec<Filter>>>,
|
||||
pub message_counter: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
type DispachingConnections = Arc<Mutex<HashMap<ConnectionId<'static>, DispatchingData>>>;
|
||||
|
@ -50,7 +49,6 @@ pub fn server_loop(
|
|||
message_send_queue: mpsc::Receiver<ChannelMessage>,
|
||||
compression_type: CompressionType,
|
||||
stop_laggy_client: bool,
|
||||
max_number_of_streams: u64,
|
||||
) -> anyhow::Result<()> {
|
||||
let maximum_concurrent_streams_id = u64::MAX;
|
||||
|
||||
|
@ -94,7 +92,6 @@ pub fn server_loop(
|
|||
message_send_queue,
|
||||
dispatching_connections.clone(),
|
||||
compression_type,
|
||||
max_number_of_streams,
|
||||
);
|
||||
|
||||
loop {
|
||||
|
@ -210,9 +207,7 @@ pub fn server_loop(
|
|||
};
|
||||
|
||||
let (client_sender, client_reciver) = mio_channel::channel();
|
||||
|
||||
let (client_message_sx, client_message_rx) = mpsc::channel();
|
||||
let message_counter = Arc::new(AtomicU64::new(0));
|
||||
|
||||
let filters = Arc::new(RwLock::new(Vec::new()));
|
||||
create_client_task(
|
||||
|
@ -223,7 +218,6 @@ pub fn server_loop(
|
|||
filters.clone(),
|
||||
maximum_concurrent_streams_id,
|
||||
stop_laggy_client,
|
||||
message_counter.clone(),
|
||||
);
|
||||
let mut lk = dispatching_connections.lock().unwrap();
|
||||
lk.insert(
|
||||
|
@ -231,7 +225,6 @@ pub fn server_loop(
|
|||
DispatchingData {
|
||||
sender: client_message_sx,
|
||||
filters,
|
||||
message_counter,
|
||||
},
|
||||
);
|
||||
clients.insert(scid, client_sender);
|
||||
|
@ -302,7 +295,6 @@ fn create_client_task(
|
|||
filters: Arc<RwLock<Vec<Filter>>>,
|
||||
maximum_concurrent_streams_id: u64,
|
||||
stop_laggy_client: bool,
|
||||
message_count: Arc<AtomicU64>,
|
||||
) {
|
||||
std::thread::spawn(move || {
|
||||
let mut partial_responses = PartialResponses::new();
|
||||
|
@ -426,9 +418,7 @@ fn create_client_task(
|
|||
while partial_responses.len() < max_allowed_partial_responses {
|
||||
let close = match message_channel.try_recv() {
|
||||
Ok((message, priority)) => {
|
||||
message_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
|
||||
let stream_id = next_stream;
|
||||
|
||||
next_stream =
|
||||
get_next_unidi(stream_id, true, maximum_concurrent_streams_id);
|
||||
|
||||
|
@ -459,6 +449,7 @@ fn create_client_task(
|
|||
}
|
||||
mpsc::TryRecvError::Disconnected => {
|
||||
// too many message the connection is lagging
|
||||
log::error!("channel disconnected by dispatcher");
|
||||
true
|
||||
}
|
||||
}
|
||||
|
@ -523,7 +514,6 @@ fn create_dispatching_thread(
|
|||
message_send_queue: mpsc::Receiver<ChannelMessage>,
|
||||
dispatching_connections: DispachingConnections,
|
||||
compression_type: CompressionType,
|
||||
max_number_of_streams: u64,
|
||||
) {
|
||||
std::thread::spawn(move || {
|
||||
while let Ok(message) = message_send_queue.recv() {
|
||||
|
@ -573,20 +563,12 @@ fn create_dispatching_thread(
|
|||
bincode::serialize(&message).expect("Message should be serializable in binary");
|
||||
for id in dispatching_connections.iter() {
|
||||
let data = dispatching_connections_lk.get(id).unwrap();
|
||||
data.message_counter
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
if data.sender.send((binary.clone(), priority)).is_err() {
|
||||
// client is closed
|
||||
dispatching_connections_lk.remove(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
dispatching_connections_lk.retain(|_id, connection| {
|
||||
connection
|
||||
.message_counter
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
< max_number_of_streams
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue