From 37da6c039e26cff906d184ad696a2eb5b9d46b3b Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 21 Jul 2023 17:23:13 +0200 Subject: [PATCH] extract TestCaseParams --- ...literpc_tpu_quic_server_integrationtest.rs | 101 ++++++++++++------ 1 file changed, 70 insertions(+), 31 deletions(-) diff --git a/services/tests/literpc_tpu_quic_server_integrationtest.rs b/services/tests/literpc_tpu_quic_server_integrationtest.rs index ac5b125f..372ff52d 100644 --- a/services/tests/literpc_tpu_quic_server_integrationtest.rs +++ b/services/tests/literpc_tpu_quic_server_integrationtest.rs @@ -1,10 +1,11 @@ +use std::cell::Cell; use anyhow::bail; use countmap::CountMap; use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender}; use futures::future::join_all; use log::{debug, error, info, trace, warn}; use quinn::TokioRuntime; -use serde::de::Unexpected::Option; +use std::option::Option; use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters; use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes; use solana_lite_rpc_core::tx_store::empty_tx_store; @@ -27,7 +28,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::ops::Deref; use std::path::Path; use std::str::FromStr; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::{Arc, RwLock}; use std::thread; use std::time::{Duration, Instant}; @@ -38,16 +39,61 @@ use tokio::task::JoinHandle; use tokio::time::{interval, sleep}; use tokio::{join, spawn, task}; use tracing_subscriber::{filter::LevelFilter, fmt}; +use tracing_subscriber::util::SubscriberInitExt; -// note: logging will be auto-adjusted -const SAMPLE_TX_COUNT: u32 = 10; +#[derive(Copy, Clone, Debug)] +struct TestCaseParams { + // note: logging will be auto-adjusted based on 'sample_tx_count' + sample_tx_count: u32, +} const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000; const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; // like solana repo -#[test] // note: tokio runtimes get created as part of the integration test -pub fn wireup_and_send_txs_via_channel() { - configure_logging(); +const STAKE_CONNECTION: bool = true; + +const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters { + connection_timeout: Duration::from_secs(1), + connection_retry_count: 10, + finalize_timeout: Duration::from_millis(200), + max_number_of_connections: 8, + unistream_timeout: Duration::from_millis(500), + write_timeout: Duration::from_secs(1), + number_of_transactions_per_unistream: 10, +}; + +#[test] +pub fn small_tx_batch() { + configure_logging(true); + + wireup_and_send_txs_via_channel(TestCaseParams { + sample_tx_count: 20, + }); +} + +#[test] +pub fn many_transactions() { + configure_logging(false); + + wireup_and_send_txs_via_channel(TestCaseParams { + sample_tx_count: 10000, + }); +} + +#[ignore] +#[test] +pub fn too_many_transactions() { + configure_logging(false); + + wireup_and_send_txs_via_channel(TestCaseParams { + sample_tx_count: 100000, + }); +} + +// note: this not a tokio test as runtimes get created as part of the integration test +fn wireup_and_send_txs_via_channel( + test_case_params: TestCaseParams, +) { // value from solana - see quic streamer - see quic.rs -> rt() const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 1; @@ -98,6 +144,7 @@ pub fn wireup_and_send_txs_via_channel() { runtime_literpc.block_on(async { tokio::spawn(start_literpc_client( + test_case_params.clone(), listen_addr, literpc_validator_identity, )); @@ -112,9 +159,9 @@ pub fn wireup_and_send_txs_via_channel() { // second half let mut timer2 = None; let mut packet_count2 = 0; - let mut count_map: CountMap = CountMap::with_capacity(SAMPLE_TX_COUNT as usize); - const WARMUP_TX_COUNT: u32 = SAMPLE_TX_COUNT / 2; - while packet_count < SAMPLE_TX_COUNT { + let mut count_map: CountMap = CountMap::with_capacity(test_case_params.sample_tx_count as usize); + let warmup_tx_count: u32 = test_case_params.sample_tx_count / 2; + while packet_count < test_case_params.sample_tx_count { if latest_tx.elapsed() > Duration::from_secs(5) { warn!("abort after timeout waiting for packet from quic streamer"); break; @@ -156,10 +203,10 @@ pub fn wireup_and_send_txs_via_channel() { count_map.insert_or_increment(*tx.get_signature()); } - if packet_count == WARMUP_TX_COUNT { + if packet_count == warmup_tx_count { timer2 = Some(Instant::now()); } - if packet_count == SAMPLE_TX_COUNT { + if packet_count == test_case_params.sample_tx_count { break; } } // -- while not all packets received - by count @@ -182,8 +229,8 @@ pub fn wireup_and_send_txs_via_channel() { assert_eq!( count_map.len() as u32, - SAMPLE_TX_COUNT, - "count_map size should be equal to SAMPLE_TX_COUNT" + test_case_params.sample_tx_count, + "count_map size should be equal to sample_tx_count" ); // note: this assumption will not hold as soon as test is configured to do fanout assert!( @@ -197,30 +244,22 @@ pub fn wireup_and_send_txs_via_channel() { packet_consumer_jh.join().unwrap(); } -fn configure_logging() { - let env_filter = if SAMPLE_TX_COUNT < 100 { +fn configure_logging(verbose: bool) { + let env_filter = if verbose { "trace,rustls=info,quinn_proto=debug" } else { "debug,quinn_proto=info,rustls=info,solana_streamer=debug" }; - tracing_subscriber::fmt::fmt() + let result = tracing_subscriber::fmt::fmt() .with_env_filter(env_filter) - .init(); + .try_init(); + if result.is_err() { + println!("Logging already initialized - ignore"); + } } -const STAKE_CONNECTION: bool = true; - -const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters { - connection_timeout: Duration::from_secs(1), - connection_retry_count: 10, - finalize_timeout: Duration::from_millis(200), - max_number_of_connections: 8, - unistream_timeout: Duration::from_millis(500), - write_timeout: Duration::from_secs(1), - number_of_transactions_per_unistream: 10, -}; - async fn start_literpc_client( + test_case_params: TestCaseParams, streamer_listen_addrs: SocketAddr, literpc_validator_identity: Arc, ) -> anyhow::Result<()> { @@ -286,7 +325,7 @@ async fn start_literpc_client( ) .await; - for i in 0..SAMPLE_TX_COUNT { + for i in 0..test_case_params.sample_tx_count { let raw_sample_tx = build_raw_sample_tx(i); broadcast_sender.send(raw_sample_tx)?; }