extract stake flag to test params

This commit is contained in:
GroovieGermanikus 2023-07-21 17:34:27 +02:00
parent 37da6c039e
commit c132c19b76
1 changed files with 37 additions and 26 deletions

View File

@ -1,11 +1,9 @@
use std::cell::Cell;
use anyhow::bail; use anyhow::bail;
use countmap::CountMap; use countmap::CountMap;
use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender}; use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender};
use futures::future::join_all; use futures::future::join_all;
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use quinn::TokioRuntime; use quinn::TokioRuntime;
use std::option::Option;
use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters; use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes; use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes;
use solana_lite_rpc_core::tx_store::empty_tx_store; use solana_lite_rpc_core::tx_store::empty_tx_store;
@ -26,6 +24,7 @@ use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::ops::Deref; use std::ops::Deref;
use std::option::Option;
use std::path::Path; use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
@ -37,37 +36,45 @@ use tokio::sync::broadcast;
use tokio::sync::broadcast::error::SendError; use tokio::sync::broadcast::error::SendError;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::{interval, sleep}; use tokio::time::{interval, sleep};
use tokio::{join, spawn, task};
use tracing_subscriber::{filter::LevelFilter, fmt};
use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{filter::LevelFilter, fmt};
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
struct TestCaseParams { struct TestCaseParams {
// note: logging will be auto-adjusted based on 'sample_tx_count'
sample_tx_count: u32, sample_tx_count: u32,
stake_connection: bool,
} }
const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000; const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000;
const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; // like solana repo const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; // like solana repo
const STAKE_CONNECTION: bool = true;
const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters { const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters {
connection_timeout: Duration::from_secs(1), connection_timeout: Duration::from_secs(2),
connection_retry_count: 10, connection_retry_count: 10,
finalize_timeout: Duration::from_millis(200), finalize_timeout: Duration::from_secs(2),
max_number_of_connections: 8, max_number_of_connections: 8,
unistream_timeout: Duration::from_millis(500), unistream_timeout: Duration::from_secs(2),
write_timeout: Duration::from_secs(1), write_timeout: Duration::from_secs(2),
number_of_transactions_per_unistream: 10, number_of_transactions_per_unistream: 10,
}; };
#[test] #[test]
pub fn small_tx_batch() { pub fn small_tx_batch_staked() {
configure_logging(true); configure_logging(true);
wireup_and_send_txs_via_channel(TestCaseParams { wireup_and_send_txs_via_channel(TestCaseParams {
sample_tx_count: 20, sample_tx_count: 20,
stake_connection: true,
});
}
#[test]
pub fn small_tx_batch_unstaked() {
configure_logging(true);
wireup_and_send_txs_via_channel(TestCaseParams {
sample_tx_count: 20,
stake_connection: false,
}); });
} }
@ -77,6 +84,7 @@ pub fn many_transactions() {
wireup_and_send_txs_via_channel(TestCaseParams { wireup_and_send_txs_via_channel(TestCaseParams {
sample_tx_count: 10000, sample_tx_count: 10000,
stake_connection: true,
}); });
} }
@ -87,14 +95,12 @@ pub fn too_many_transactions() {
wireup_and_send_txs_via_channel(TestCaseParams { wireup_and_send_txs_via_channel(TestCaseParams {
sample_tx_count: 100000, sample_tx_count: 100000,
stake_connection: false,
}); });
} }
// note: this not a tokio test as runtimes get created as part of the integration test // note: this not a tokio test as runtimes get created as part of the integration test
fn wireup_and_send_txs_via_channel( fn wireup_and_send_txs_via_channel(test_case_params: TestCaseParams) {
test_case_params: TestCaseParams,
) {
// value from solana - see quic streamer - see quic.rs -> rt() // value from solana - see quic streamer - see quic.rs -> rt()
const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 1; const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 1;
let runtime_quic1 = Builder::new_multi_thread() let runtime_quic1 = Builder::new_multi_thread()
@ -105,7 +111,7 @@ fn wireup_and_send_txs_via_channel(
.expect("failed to build tokio runtime for testing quic server"); .expect("failed to build tokio runtime for testing quic server");
// lite-rpc // lite-rpc
let runtime_literpc = tokio::runtime::Builder::new_multi_thread() let runtime_literpc = Builder::new_multi_thread()
// see lite-rpc -> main.rs // see lite-rpc -> main.rs
.worker_threads(16) // also works with 1 .worker_threads(16) // also works with 1
.enable_all() .enable_all()
@ -126,13 +132,13 @@ fn wireup_and_send_txs_via_channel(
max_stake: 40, max_stake: 40,
min_stake: 0, min_stake: 0,
ip_stake_map: Default::default(), ip_stake_map: Default::default(),
pubkey_stake_map: pubkey_stake_map: if test_case_params.stake_connection {
// literpc_validator_identity.as_ref() let mut map = HashMap::new();
if STAKE_CONNECTION { map.insert(literpc_validator_identity.pubkey(), 30);
let mut map = HashMap::new(); map
map.insert(literpc_validator_identity.pubkey(),30); } else {
map HashMap::default()
} else { HashMap::default() } },
}; };
let _solana_quic_streamer = SolanaQuicStreamer::new_start_listening( let _solana_quic_streamer = SolanaQuicStreamer::new_start_listening(
@ -159,7 +165,8 @@ fn wireup_and_send_txs_via_channel(
// second half // second half
let mut timer2 = None; let mut timer2 = None;
let mut packet_count2 = 0; let mut packet_count2 = 0;
let mut count_map: CountMap<Signature> = CountMap::with_capacity(test_case_params.sample_tx_count as usize); let mut count_map: CountMap<Signature> =
CountMap::with_capacity(test_case_params.sample_tx_count as usize);
let warmup_tx_count: u32 = test_case_params.sample_tx_count / 2; let warmup_tx_count: u32 = test_case_params.sample_tx_count / 2;
while packet_count < test_case_params.sample_tx_count { while packet_count < test_case_params.sample_tx_count {
if latest_tx.elapsed() > Duration::from_secs(5) { if latest_tx.elapsed() > Duration::from_secs(5) {
@ -306,7 +313,11 @@ async fn start_literpc_client(
// populated from get_stakes_for_identity() // populated from get_stakes_for_identity()
let identity_stakes = IdentityStakes { let identity_stakes = IdentityStakes {
peer_type: ConnectionPeerType::Staked, peer_type: ConnectionPeerType::Staked,
stakes: if STAKE_CONNECTION { 30 } else { 0 }, // stake of lite-rpc stakes: if test_case_params.stake_connection {
30
} else {
0
}, // stake of lite-rpc
min_stakes: 0, min_stakes: 0,
max_stakes: 40, max_stakes: 40,
total_stakes: 100, total_stakes: 100,