diff --git a/services/tests/literpc_tpu_quic_server_integrationtest.rs b/services/tests/literpc_tpu_quic_server_integrationtest.rs index 1d5eeeda..3f7b5e41 100644 --- a/services/tests/literpc_tpu_quic_server_integrationtest.rs +++ b/services/tests/literpc_tpu_quic_server_integrationtest.rs @@ -13,6 +13,7 @@ use futures::future::join_all; use itertools::join; use log::{debug, error, info}; use quinn::TokioRuntime; +use serde::de::Unexpected::Option; use solana_rpc_client::rpc_client::SerializableTransaction; use solana_sdk::hash::Hash; use solana_sdk::instruction::Instruction; @@ -37,16 +38,22 @@ use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes; use solana_lite_rpc_core::tx_store::empty_tx_store; use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionManager; - -const SAMPLE_TX_COUNT: u32 = 10; +// note: logging will be auto-adjusted +const SAMPLE_TX_COUNT: u32 = 1000; const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000; const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 2; // prod=8 #[test] pub fn wireup_and_send_txs_via_channel() { + let env_filter = if SAMPLE_TX_COUNT < 100 { + "trace,rustls=info" + } else { + "trace,rustls=info,quinn_proto=debug" + }; tracing_subscriber::fmt::fmt() - .with_max_level(LevelFilter::DEBUG) + // .with_max_level(LevelFilter::DEBUG) + .with_env_filter(env_filter) .init(); // solana quic streamer - see quic.rs -> rt() @@ -105,20 +112,36 @@ pub fn wireup_and_send_txs_via_channel() { let packet_consumer_jh = thread::spawn(move || { info!("start pulling packets..."); let mut packet_count = 0; + let time_to_first = Instant::now(); + let timer = Instant::now(); + // second half + let mut timer2 = None; + let mut packet_count2 = 0; + const WARMUP_TX_COUNT: u32 = SAMPLE_TX_COUNT / 2; while packet_count != SAMPLE_TX_COUNT { match inbound_packets_receiver.recv() { Ok(packet_batch) => { + if packet_count == 0 { + info!("time to first packet {}ms", time_to_first.elapsed().as_millis()); + } + packet_count = packet_count + packet_batch.len() as u32; + if timer2.is_some() { + packet_count2 = packet_count2 + packet_batch.len() as u32; + } for packet in packet_batch.iter() { let tx = packet.deserialize_slice::(..).unwrap(); - debug!("read transaction from quic streaming server: {:?}", tx.get_signature()); + // debug!("read transaction from quic streaming server: {:?}", tx.get_signature()); // for ix in tx.message.instructions() { // info!("instruction: {:?}", ix.data); // } } - info!("received packets so far: {}", packet_count); + if packet_count == WARMUP_TX_COUNT { + timer2 = Some(Instant::now()); + } + // info!("received packets so far: {}", packet_count); if packet_count == SAMPLE_TX_COUNT { break; } @@ -129,6 +152,16 @@ pub fn wireup_and_send_txs_via_channel() { } } + let total_duration = timer.elapsed(); + let half_duration = timer2.unwrap().elapsed(); + + // throughput_50 is second half of transactions - should iron out startup effects + info!("consumed {} packets in {}us - throughput {:.2} tps, throughput_50 {:.2} tps, " + , packet_count, total_duration.as_micros(), + packet_count as f64 / total_duration.as_secs_f64(), + packet_count2 as f64 / half_duration.as_secs_f64(), + ); + info!("got all expected packets - shutting down tokio runtime with lite-rpc client"); runtime2.shutdown_timeout(Duration::from_millis(100)); });