diff --git a/services/tests/literpc_tpu_quic_server_integrationtest.rs b/services/tests/literpc_tpu_quic_server_integrationtest.rs index 372ff52d..4bf8fff1 100644 --- a/services/tests/literpc_tpu_quic_server_integrationtest.rs +++ b/services/tests/literpc_tpu_quic_server_integrationtest.rs @@ -1,11 +1,9 @@ -use std::cell::Cell; use anyhow::bail; use countmap::CountMap; use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender}; use futures::future::join_all; use log::{debug, error, info, trace, warn}; use quinn::TokioRuntime; -use std::option::Option; use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters; use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes; use solana_lite_rpc_core::tx_store::empty_tx_store; @@ -26,6 +24,7 @@ use solana_streamer::tls_certificates::new_self_signed_tls_certificate; use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::ops::Deref; +use std::option::Option; use std::path::Path; use std::str::FromStr; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; @@ -37,37 +36,45 @@ use tokio::sync::broadcast; use tokio::sync::broadcast::error::SendError; use tokio::task::JoinHandle; use tokio::time::{interval, sleep}; -use tokio::{join, spawn, task}; -use tracing_subscriber::{filter::LevelFilter, fmt}; use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{filter::LevelFilter, fmt}; #[derive(Copy, Clone, Debug)] struct TestCaseParams { - // note: logging will be auto-adjusted based on 'sample_tx_count' sample_tx_count: u32, + stake_connection: bool, } const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000; const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; // like solana repo -const STAKE_CONNECTION: bool = true; - const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters { - connection_timeout: Duration::from_secs(1), + connection_timeout: Duration::from_secs(2), connection_retry_count: 10, - finalize_timeout: Duration::from_millis(200), + finalize_timeout: Duration::from_secs(2), max_number_of_connections: 8, - unistream_timeout: Duration::from_millis(500), - write_timeout: Duration::from_secs(1), + unistream_timeout: Duration::from_secs(2), + write_timeout: Duration::from_secs(2), number_of_transactions_per_unistream: 10, }; #[test] -pub fn small_tx_batch() { +pub fn small_tx_batch_staked() { configure_logging(true); wireup_and_send_txs_via_channel(TestCaseParams { sample_tx_count: 20, + stake_connection: true, + }); +} + +#[test] +pub fn small_tx_batch_unstaked() { + configure_logging(true); + + wireup_and_send_txs_via_channel(TestCaseParams { + sample_tx_count: 20, + stake_connection: false, }); } @@ -77,6 +84,7 @@ pub fn many_transactions() { wireup_and_send_txs_via_channel(TestCaseParams { sample_tx_count: 10000, + stake_connection: true, }); } @@ -87,14 +95,12 @@ pub fn too_many_transactions() { wireup_and_send_txs_via_channel(TestCaseParams { sample_tx_count: 100000, + stake_connection: false, }); } // note: this not a tokio test as runtimes get created as part of the integration test -fn wireup_and_send_txs_via_channel( - test_case_params: TestCaseParams, -) { - +fn wireup_and_send_txs_via_channel(test_case_params: TestCaseParams) { // value from solana - see quic streamer - see quic.rs -> rt() const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 1; let runtime_quic1 = Builder::new_multi_thread() @@ -105,7 +111,7 @@ fn wireup_and_send_txs_via_channel( .expect("failed to build tokio runtime for testing quic server"); // lite-rpc - let runtime_literpc = tokio::runtime::Builder::new_multi_thread() + let runtime_literpc = Builder::new_multi_thread() // see lite-rpc -> main.rs .worker_threads(16) // also works with 1 .enable_all() @@ -126,13 +132,13 @@ fn wireup_and_send_txs_via_channel( max_stake: 40, min_stake: 0, ip_stake_map: Default::default(), - pubkey_stake_map: - // literpc_validator_identity.as_ref() - if STAKE_CONNECTION { - let mut map = HashMap::new(); - map.insert(literpc_validator_identity.pubkey(),30); - map - } else { HashMap::default() } + pubkey_stake_map: if test_case_params.stake_connection { + let mut map = HashMap::new(); + map.insert(literpc_validator_identity.pubkey(), 30); + map + } else { + HashMap::default() + }, }; let _solana_quic_streamer = SolanaQuicStreamer::new_start_listening( @@ -159,7 +165,8 @@ fn wireup_and_send_txs_via_channel( // second half let mut timer2 = None; let mut packet_count2 = 0; - let mut count_map: CountMap = CountMap::with_capacity(test_case_params.sample_tx_count as usize); + let mut count_map: CountMap = + CountMap::with_capacity(test_case_params.sample_tx_count as usize); let warmup_tx_count: u32 = test_case_params.sample_tx_count / 2; while packet_count < test_case_params.sample_tx_count { if latest_tx.elapsed() > Duration::from_secs(5) { @@ -306,7 +313,11 @@ async fn start_literpc_client( // populated from get_stakes_for_identity() let identity_stakes = IdentityStakes { peer_type: ConnectionPeerType::Staked, - stakes: if STAKE_CONNECTION { 30 } else { 0 }, // stake of lite-rpc + stakes: if test_case_params.stake_connection { + 30 + } else { + 0 + }, // stake of lite-rpc min_stakes: 0, max_stakes: 40, total_stakes: 100,