Updating quic client and sever examples to include number of streams in the cli, printing number of messages in the queue

This commit is contained in:
godmodegalactus 2024-06-06 10:37:39 +02:00
parent 669f85cbd6
commit 70fc01913a
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
5 changed files with 44 additions and 16 deletions

View File

@ -8,4 +8,10 @@ pub struct Args {
#[clap(short, long)]
pub rpc_url: Option<String>,
#[clap(short, long, default_value_t = false)]
pub blocks_instead_of_accounts: bool,
#[clap(short = 's', long, default_value_t = 1_000)]
pub number_of_streams: u64,
}

View File

@ -40,9 +40,15 @@ pub async fn main() {
tracing_subscriber::fmt::init();
let args = Args::parse();
println!("Connecting");
let (client, mut reciever) = Client::new(args.url, ConnectionParameters::default())
.await
.unwrap();
let (client, mut reciever) = Client::new(
args.url,
ConnectionParameters {
max_number_of_streams: args.number_of_streams,
..Default::default()
},
)
.await
.unwrap();
println!("Connected");
let bytes_transfered = Arc::new(AtomicU64::new(0));
@ -131,16 +137,16 @@ pub async fn main() {
}
sleep(Duration::from_secs(1));
let mut filters = vec![Filter::Slot, Filter::BlockMeta];
if args.blocks_instead_of_accounts {
filters.push(Filter::BlockAll);
} else {
filters.push(Filter::AccountsAll);
};
println!("Subscribing");
client
.subscribe(vec![
Filter::BlockAll,
Filter::Slot,
Filter::BlockMeta,
Filter::AccountsAll,
])
.await
.unwrap();
client.subscribe(filters).await.unwrap();
println!("Subscribed");
while let Some(message) = reciever.recv().await {

View File

@ -12,6 +12,6 @@ pub struct Args {
#[clap(short = 'l', long, default_value_t = 200)]
pub account_data_size: u32,
#[clap(short, long, default_value_t = 1_000)]
pub max_lagging: u64,
#[clap(short = 's', long, default_value_t = 1_000)]
pub number_of_streams: u64,
}

View File

@ -24,7 +24,10 @@ pub fn main() {
let config = ConfigQuicPlugin {
address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(),
log_level: "info".to_string(),
quic_parameters: QuicParameters::default(),
quic_parameters: QuicParameters {
max_number_of_streams_per_client: args.number_of_streams,
..Default::default()
},
compression_parameters: CompressionParameters {
compression_type: quic_geyser_common::compression::CompressionType::None,
},

View File

@ -2,7 +2,7 @@ use std::{
collections::{BTreeMap, HashMap},
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicU64},
atomic::{AtomicBool, AtomicU64, AtomicUsize},
mpsc::{self, Sender},
Arc, Mutex, RwLock,
},
@ -32,6 +32,7 @@ use quic_geyser_quiche_utils::{
struct DispatchingData {
pub sender: Sender<(Vec<u8>, u8)>,
pub filters: Arc<RwLock<Vec<Filter>>>,
pub messages_in_queue: Arc<AtomicUsize>,
}
type DispachingConnections = Arc<Mutex<HashMap<ConnectionId<'static>, DispatchingData>>>;
@ -208,6 +209,7 @@ pub fn server_loop(
let (client_sender, client_reciver) = mio_channel::channel();
let (client_message_sx, client_message_rx) = mpsc::channel();
let messages_in_queue = Arc::new(AtomicUsize::new(0));
let filters = Arc::new(RwLock::new(Vec::new()));
create_client_task(
@ -218,6 +220,7 @@ pub fn server_loop(
filters.clone(),
maximum_concurrent_streams_id,
stop_laggy_client,
messages_in_queue.clone(),
);
let mut lk = dispatching_connections.lock().unwrap();
lk.insert(
@ -225,6 +228,7 @@ pub fn server_loop(
DispatchingData {
sender: client_message_sx,
filters,
messages_in_queue,
},
);
clients.insert(scid, client_sender);
@ -295,6 +299,7 @@ fn create_client_task(
filters: Arc<RwLock<Vec<Filter>>>,
maximum_concurrent_streams_id: u64,
stop_laggy_client: bool,
messages_in_queue: Arc<AtomicUsize>,
) {
std::thread::spawn(move || {
let mut partial_responses = PartialResponses::new();
@ -328,6 +333,7 @@ fn create_client_task(
let number_of_readable_streams = number_of_readable_streams.clone();
let number_of_writable_streams = number_of_writable_streams.clone();
let messages_added = messages_added.clone();
let messages_in_queue = messages_in_queue.clone();
let quit = quit.clone();
std::thread::spawn(move || {
while !quit.load(std::sync::atomic::Ordering::Relaxed) {
@ -358,6 +364,10 @@ fn create_client_task(
"messages_added : {}",
messages_added.swap(0, std::sync::atomic::Ordering::Relaxed)
);
log::info!(
"messages in queue to be sent : {}",
messages_in_queue.load(std::sync::atomic::Ordering::Relaxed)
);
}
});
}
@ -422,6 +432,7 @@ fn create_client_task(
while partial_responses.len() < max_allowed_partial_responses {
let close = match message_channel.try_recv() {
Ok((message, priority)) => {
messages_in_queue.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
let stream_id = next_stream;
next_stream =
get_next_unidi(stream_id, true, maximum_concurrent_streams_id);
@ -576,6 +587,8 @@ fn create_dispatching_thread(
bincode::serialize(&message).expect("Message should be serializable in binary");
for id in dispatching_connections.iter() {
let data = dispatching_connections_lk.get(id).unwrap();
data.messages_in_queue
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if data.sender.send((binary.clone(), priority)).is_err() {
// client is closed
dispatching_connections_lk.remove(id);