diff --git a/examples/tester-client/src/cli.rs b/examples/tester-client/src/cli.rs index caabe06..3cfa566 100644 --- a/examples/tester-client/src/cli.rs +++ b/examples/tester-client/src/cli.rs @@ -8,4 +8,10 @@ pub struct Args { #[clap(short, long)] pub rpc_url: Option, + + #[clap(short, long, default_value_t = false)] + pub blocks_instead_of_accounts: bool, + + #[clap(short = 's', long, default_value_t = 1_000)] + pub number_of_streams: u64, } diff --git a/examples/tester-client/src/main.rs b/examples/tester-client/src/main.rs index 2479e39..8737cbd 100644 --- a/examples/tester-client/src/main.rs +++ b/examples/tester-client/src/main.rs @@ -40,9 +40,15 @@ pub async fn main() { tracing_subscriber::fmt::init(); let args = Args::parse(); println!("Connecting"); - let (client, mut reciever) = Client::new(args.url, ConnectionParameters::default()) - .await - .unwrap(); + let (client, mut reciever) = Client::new( + args.url, + ConnectionParameters { + max_number_of_streams: args.number_of_streams, + ..Default::default() + }, + ) + .await + .unwrap(); println!("Connected"); let bytes_transfered = Arc::new(AtomicU64::new(0)); @@ -131,16 +137,16 @@ pub async fn main() { } sleep(Duration::from_secs(1)); + let mut filters = vec![Filter::Slot, Filter::BlockMeta]; + + if args.blocks_instead_of_accounts { + filters.push(Filter::BlockAll); + } else { + filters.push(Filter::AccountsAll); + }; + println!("Subscribing"); - client - .subscribe(vec![ - Filter::BlockAll, - Filter::Slot, - Filter::BlockMeta, - Filter::AccountsAll, - ]) - .await - .unwrap(); + client.subscribe(filters).await.unwrap(); println!("Subscribed"); while let Some(message) = reciever.recv().await { diff --git a/examples/tester-server/src/cli.rs b/examples/tester-server/src/cli.rs index 353891b..8b7eff1 100644 --- a/examples/tester-server/src/cli.rs +++ b/examples/tester-server/src/cli.rs @@ -12,6 +12,6 @@ pub struct Args { #[clap(short = 'l', long, default_value_t = 200)] pub account_data_size: u32, - #[clap(short, long, default_value_t = 1_000)] - pub max_lagging: u64, + #[clap(short = 's', long, default_value_t = 1_000)] + pub number_of_streams: u64, } diff --git a/examples/tester-server/src/main.rs b/examples/tester-server/src/main.rs index b72139d..b74d629 100644 --- a/examples/tester-server/src/main.rs +++ b/examples/tester-server/src/main.rs @@ -24,7 +24,10 @@ pub fn main() { let config = ConfigQuicPlugin { address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(), log_level: "info".to_string(), - quic_parameters: QuicParameters::default(), + quic_parameters: QuicParameters { + max_number_of_streams_per_client: args.number_of_streams, + ..Default::default() + }, compression_parameters: CompressionParameters { compression_type: quic_geyser_common::compression::CompressionType::None, }, diff --git a/server/src/quiche_server_loop.rs b/server/src/quiche_server_loop.rs index 9baa94b..b173416 100644 --- a/server/src/quiche_server_loop.rs +++ b/server/src/quiche_server_loop.rs @@ -2,7 +2,7 @@ use std::{ collections::{BTreeMap, HashMap}, net::SocketAddr, sync::{ - atomic::{AtomicBool, AtomicU64}, + atomic::{AtomicBool, AtomicU64, AtomicUsize}, mpsc::{self, Sender}, Arc, Mutex, RwLock, }, @@ -32,6 +32,7 @@ use quic_geyser_quiche_utils::{ struct DispatchingData { pub sender: Sender<(Vec, u8)>, pub filters: Arc>>, + pub messages_in_queue: Arc, } type DispachingConnections = Arc, DispatchingData>>>; @@ -208,6 +209,7 @@ pub fn server_loop( let (client_sender, client_reciver) = mio_channel::channel(); let (client_message_sx, client_message_rx) = mpsc::channel(); + let messages_in_queue = Arc::new(AtomicUsize::new(0)); let filters = Arc::new(RwLock::new(Vec::new())); create_client_task( @@ -218,6 +220,7 @@ pub fn server_loop( filters.clone(), maximum_concurrent_streams_id, stop_laggy_client, + messages_in_queue.clone(), ); let mut lk = dispatching_connections.lock().unwrap(); lk.insert( @@ -225,6 +228,7 @@ pub fn server_loop( DispatchingData { sender: client_message_sx, filters, + messages_in_queue, }, ); clients.insert(scid, client_sender); @@ -295,6 +299,7 @@ fn create_client_task( filters: Arc>>, maximum_concurrent_streams_id: u64, stop_laggy_client: bool, + messages_in_queue: Arc, ) { std::thread::spawn(move || { let mut partial_responses = PartialResponses::new(); @@ -328,6 +333,7 @@ fn create_client_task( let number_of_readable_streams = number_of_readable_streams.clone(); let number_of_writable_streams = number_of_writable_streams.clone(); let messages_added = messages_added.clone(); + let messages_in_queue = messages_in_queue.clone(); let quit = quit.clone(); std::thread::spawn(move || { while !quit.load(std::sync::atomic::Ordering::Relaxed) { @@ -358,6 +364,10 @@ fn create_client_task( "messages_added : {}", messages_added.swap(0, std::sync::atomic::Ordering::Relaxed) ); + log::info!( + "messages in queue to be sent : {}", + messages_in_queue.load(std::sync::atomic::Ordering::Relaxed) + ); } }); } @@ -422,6 +432,7 @@ fn create_client_task( while partial_responses.len() < max_allowed_partial_responses { let close = match message_channel.try_recv() { Ok((message, priority)) => { + messages_in_queue.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); let stream_id = next_stream; next_stream = get_next_unidi(stream_id, true, maximum_concurrent_streams_id); @@ -576,6 +587,8 @@ fn create_dispatching_thread( bincode::serialize(&message).expect("Message should be serializable in binary"); for id in dispatching_connections.iter() { let data = dispatching_connections_lk.get(id).unwrap(); + data.messages_in_queue + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); if data.sender.send((binary.clone(), priority)).is_err() { // client is closed dispatching_connections_lk.remove(id);