extract TestCaseParams

This commit is contained in:
GroovieGermanikus 2023-07-21 17:23:13 +02:00
parent c70b0f8662
commit 37da6c039e
1 changed files with 70 additions and 31 deletions

View File

@ -1,10 +1,11 @@
use std::cell::Cell;
use anyhow::bail; use anyhow::bail;
use countmap::CountMap; use countmap::CountMap;
use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender}; use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender};
use futures::future::join_all; use futures::future::join_all;
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use quinn::TokioRuntime; use quinn::TokioRuntime;
use serde::de::Unexpected::Option; use std::option::Option;
use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters; use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes; use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes;
use solana_lite_rpc_core::tx_store::empty_tx_store; use solana_lite_rpc_core::tx_store::empty_tx_store;
@ -27,7 +28,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::ops::Deref; use std::ops::Deref;
use std::path::Path; use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -38,16 +39,61 @@ use tokio::task::JoinHandle;
use tokio::time::{interval, sleep}; use tokio::time::{interval, sleep};
use tokio::{join, spawn, task}; use tokio::{join, spawn, task};
use tracing_subscriber::{filter::LevelFilter, fmt}; use tracing_subscriber::{filter::LevelFilter, fmt};
use tracing_subscriber::util::SubscriberInitExt;
// note: logging will be auto-adjusted #[derive(Copy, Clone, Debug)]
const SAMPLE_TX_COUNT: u32 = 10; struct TestCaseParams {
// note: logging will be auto-adjusted based on 'sample_tx_count'
sample_tx_count: u32,
}
const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000; const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000;
const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; // like solana repo const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; // like solana repo
#[test] // note: tokio runtimes get created as part of the integration test const STAKE_CONNECTION: bool = true;
pub fn wireup_and_send_txs_via_channel() {
configure_logging(); const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters {
connection_timeout: Duration::from_secs(1),
connection_retry_count: 10,
finalize_timeout: Duration::from_millis(200),
max_number_of_connections: 8,
unistream_timeout: Duration::from_millis(500),
write_timeout: Duration::from_secs(1),
number_of_transactions_per_unistream: 10,
};
#[test]
pub fn small_tx_batch() {
configure_logging(true);
wireup_and_send_txs_via_channel(TestCaseParams {
sample_tx_count: 20,
});
}
#[test]
pub fn many_transactions() {
configure_logging(false);
wireup_and_send_txs_via_channel(TestCaseParams {
sample_tx_count: 10000,
});
}
#[ignore]
#[test]
pub fn too_many_transactions() {
configure_logging(false);
wireup_and_send_txs_via_channel(TestCaseParams {
sample_tx_count: 100000,
});
}
// note: this not a tokio test as runtimes get created as part of the integration test
fn wireup_and_send_txs_via_channel(
test_case_params: TestCaseParams,
) {
// value from solana - see quic streamer - see quic.rs -> rt() // value from solana - see quic streamer - see quic.rs -> rt()
const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 1; const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 1;
@ -98,6 +144,7 @@ pub fn wireup_and_send_txs_via_channel() {
runtime_literpc.block_on(async { runtime_literpc.block_on(async {
tokio::spawn(start_literpc_client( tokio::spawn(start_literpc_client(
test_case_params.clone(),
listen_addr, listen_addr,
literpc_validator_identity, literpc_validator_identity,
)); ));
@ -112,9 +159,9 @@ pub fn wireup_and_send_txs_via_channel() {
// second half // second half
let mut timer2 = None; let mut timer2 = None;
let mut packet_count2 = 0; let mut packet_count2 = 0;
let mut count_map: CountMap<Signature> = CountMap::with_capacity(SAMPLE_TX_COUNT as usize); let mut count_map: CountMap<Signature> = CountMap::with_capacity(test_case_params.sample_tx_count as usize);
const WARMUP_TX_COUNT: u32 = SAMPLE_TX_COUNT / 2; let warmup_tx_count: u32 = test_case_params.sample_tx_count / 2;
while packet_count < SAMPLE_TX_COUNT { while packet_count < test_case_params.sample_tx_count {
if latest_tx.elapsed() > Duration::from_secs(5) { if latest_tx.elapsed() > Duration::from_secs(5) {
warn!("abort after timeout waiting for packet from quic streamer"); warn!("abort after timeout waiting for packet from quic streamer");
break; break;
@ -156,10 +203,10 @@ pub fn wireup_and_send_txs_via_channel() {
count_map.insert_or_increment(*tx.get_signature()); count_map.insert_or_increment(*tx.get_signature());
} }
if packet_count == WARMUP_TX_COUNT { if packet_count == warmup_tx_count {
timer2 = Some(Instant::now()); timer2 = Some(Instant::now());
} }
if packet_count == SAMPLE_TX_COUNT { if packet_count == test_case_params.sample_tx_count {
break; break;
} }
} // -- while not all packets received - by count } // -- while not all packets received - by count
@ -182,8 +229,8 @@ pub fn wireup_and_send_txs_via_channel() {
assert_eq!( assert_eq!(
count_map.len() as u32, count_map.len() as u32,
SAMPLE_TX_COUNT, test_case_params.sample_tx_count,
"count_map size should be equal to SAMPLE_TX_COUNT" "count_map size should be equal to sample_tx_count"
); );
// note: this assumption will not hold as soon as test is configured to do fanout // note: this assumption will not hold as soon as test is configured to do fanout
assert!( assert!(
@ -197,30 +244,22 @@ pub fn wireup_and_send_txs_via_channel() {
packet_consumer_jh.join().unwrap(); packet_consumer_jh.join().unwrap();
} }
fn configure_logging() { fn configure_logging(verbose: bool) {
let env_filter = if SAMPLE_TX_COUNT < 100 { let env_filter = if verbose {
"trace,rustls=info,quinn_proto=debug" "trace,rustls=info,quinn_proto=debug"
} else { } else {
"debug,quinn_proto=info,rustls=info,solana_streamer=debug" "debug,quinn_proto=info,rustls=info,solana_streamer=debug"
}; };
tracing_subscriber::fmt::fmt() let result = tracing_subscriber::fmt::fmt()
.with_env_filter(env_filter) .with_env_filter(env_filter)
.init(); .try_init();
if result.is_err() {
println!("Logging already initialized - ignore");
}
} }
const STAKE_CONNECTION: bool = true;
const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters {
connection_timeout: Duration::from_secs(1),
connection_retry_count: 10,
finalize_timeout: Duration::from_millis(200),
max_number_of_connections: 8,
unistream_timeout: Duration::from_millis(500),
write_timeout: Duration::from_secs(1),
number_of_transactions_per_unistream: 10,
};
async fn start_literpc_client( async fn start_literpc_client(
test_case_params: TestCaseParams,
streamer_listen_addrs: SocketAddr, streamer_listen_addrs: SocketAddr,
literpc_validator_identity: Arc<Keypair>, literpc_validator_identity: Arc<Keypair>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@ -286,7 +325,7 @@ async fn start_literpc_client(
) )
.await; .await;
for i in 0..SAMPLE_TX_COUNT { for i in 0..test_case_params.sample_tx_count {
let raw_sample_tx = build_raw_sample_tx(i); let raw_sample_tx = build_raw_sample_tx(i);
broadcast_sender.send(raw_sample_tx)?; broadcast_sender.send(raw_sample_tx)?;
} }