From e74ad90cdb574c8b8598418a4591804e8cf61b33 Mon Sep 17 00:00:00 2001 From: kirill lykov Date: Mon, 1 Aug 2022 16:04:19 +0200 Subject: [PATCH] DoS tool: generate transactions using several threads (#26286) * add cli arg num_gen_threads * introduce many generating threads * add sender thread * add time measurments * cleanup * sort dependencies * revisit threads termination * make send_batch_size to be configurable * update Cargo.lock --- Cargo.lock | 2 + dos/Cargo.toml | 2 + dos/src/cli.rs | 30 ++++- dos/src/main.rs | 320 +++++++++++++++++++++++++++++++++++------------- 4 files changed, 271 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5efa065750..14808c6088 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5104,6 +5104,7 @@ version = "1.11.5" dependencies = [ "bincode", "clap 3.1.8", + "crossbeam-channel", "itertools", "log", "rand 0.7.3", @@ -5116,6 +5117,7 @@ dependencies = [ "solana-gossip", "solana-local-cluster", "solana-logger 1.11.5", + "solana-measure", "solana-net-utils", "solana-perf", "solana-rpc", diff --git a/dos/Cargo.toml b/dos/Cargo.toml index 602b4d9fed..132b3b4462 100644 --- a/dos/Cargo.toml +++ b/dos/Cargo.toml @@ -12,6 +12,7 @@ description = "Tool to send various requests to cluster in order to evaluate the [dependencies] bincode = "1.3.3" clap = { version = "3.1.5", features = ["derive", "cargo"] } +crossbeam-channel = "0.5.4" itertools = "0.10.3" log = "0.4.17" rand = "0.7.0" @@ -22,6 +23,7 @@ solana-core = { path = "../core", version = "=1.11.5" } solana-faucet = { path = "../faucet", version = "=1.11.5" } solana-gossip = { path = "../gossip", version = "=1.11.5" } solana-logger = { path = "../logger", version = "=1.11.5" } +solana-measure = { path = "../measure", version = "=1.11.5" } solana-net-utils = { path = "../net-utils", version = "=1.11.5" } solana-perf = { path = "../perf", version = "=1.11.5" } solana-rpc = { path = "../rpc", version = "=1.11.5" } diff --git a/dos/src/cli.rs b/dos/src/cli.rs index 9e4354d87c..9e82c3f06e 100644 --- a/dos/src/cli.rs +++ b/dos/src/cli.rs @@ -48,6 +48,13 @@ pub struct DosClientParameters { #[clap(long, help = "Allow contacting private ip addresses")] pub allow_private_addr: bool, + #[clap( + long, + default_value = "1", + help = "Number of threads generating transactions" + )] + pub num_gen_threads: usize, + #[clap(flatten)] pub transaction_params: TransactionParams, @@ -57,9 +64,12 @@ pub struct DosClientParameters { help = "Submit transactions via QUIC" )] pub tpu_use_quic: bool, + + #[clap(long, default_value = "16384", help = "Size of the transactions batch")] + pub send_batch_size: usize, } -#[derive(Args, Serialize, Deserialize, Debug, Default, PartialEq, Eq)] +#[derive(Args, Clone, Serialize, Deserialize, Debug, Default, PartialEq, Eq)] #[clap(rename_all = "kebab-case")] pub struct TransactionParams { #[clap( @@ -219,6 +229,8 @@ mod tests { allow_private_addr: false, transaction_params: TransactionParams::default(), tpu_use_quic: false, + num_gen_threads: 1, + send_batch_size: 16384, }, ); } @@ -237,6 +249,8 @@ mod tests { "--num-signatures", "8", "--tpu-use-quic", + "--send-batch-size", + "1", ]) .unwrap(); assert_eq!( @@ -249,6 +263,7 @@ mod tests { data_input: None, skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams { num_signatures: Some(8), valid_blockhash: false, @@ -258,6 +273,7 @@ mod tests { num_instructions: None, }, tpu_use_quic: true, + send_batch_size: 1, }, ); } @@ -277,6 +293,8 @@ mod tests { "transfer", "--num-instructions", "1", + "--send-batch-size", + "1", ]) .unwrap(); assert_eq!( @@ -289,6 +307,7 @@ mod tests { data_input: None, skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams { num_signatures: None, valid_blockhash: true, @@ -298,6 +317,7 @@ mod tests { num_instructions: Some(1), }, tpu_use_quic: false, + send_batch_size: 1, }, ); @@ -332,6 +352,8 @@ mod tests { "transfer", "--num-instructions", "8", + "--send-batch-size", + "1", ]) .unwrap(); assert_eq!( @@ -344,6 +366,7 @@ mod tests { data_input: None, skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams { num_signatures: None, valid_blockhash: true, @@ -353,6 +376,7 @@ mod tests { num_instructions: Some(8), }, tpu_use_quic: false, + send_batch_size: 1, }, ); } @@ -370,6 +394,8 @@ mod tests { "--valid-blockhash", "--transaction-type", "account-creation", + "--send-batch-size", + "1", ]) .unwrap(); assert_eq!( @@ -382,6 +408,7 @@ mod tests { data_input: None, skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams { num_signatures: None, valid_blockhash: true, @@ -391,6 +418,7 @@ mod tests { num_instructions: None, }, tpu_use_quic: false, + send_batch_size: 1, }, ); } diff --git a/dos/src/main.rs b/dos/src/main.rs index d77ac1d620..fa75fe90b7 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -40,6 +40,7 @@ //! #![allow(clippy::integer_arithmetic)] use { + crossbeam_channel::{select, tick, unbounded, Receiver, Sender}, itertools::Itertools, log::*, rand::{thread_rng, Rng}, @@ -55,6 +56,7 @@ use { contact_info::ContactInfo, gossip_service::{discover, get_multi_client}, }, + solana_measure::measure::Measure, solana_sdk::{ hash::Hash, instruction::CompiledInstruction, @@ -69,17 +71,18 @@ use { }, solana_streamer::socket::SocketAddrSpace, std::{ - cmp::min, net::{SocketAddr, UdpSocket}, process::exit, sync::Arc, + thread, time::{Duration, Instant}, }, }; -const SAMPLE_PERIOD_MS: usize = 10_000; +const PROGRESS_TIMEOUT_S: u64 = 120; +const SAMPLE_PERIOD_MS: u64 = 10_000; fn compute_rate_per_second(count: usize) -> usize { - (count * 1000) / SAMPLE_PERIOD_MS + (count * 1000) / (SAMPLE_PERIOD_MS as usize) } /// Provide functionality to generate several types of transactions: @@ -92,6 +95,7 @@ fn compute_rate_per_second(count: usize) -> usize { /// 2.1 Transfer from 1 payer to multiple destinations (many instructions per transaction) /// 2.2 Create an account /// +#[derive(Clone)] struct TransactionGenerator { blockhash: Hash, last_generated: Instant, @@ -229,7 +233,182 @@ impl TransactionGenerator { } } -const SEND_BATCH_MAX_SIZE: usize = 1 << 10; +// Multithreading-related functions +// +// The most computationally expensive work is signing new transactions. +// Here we generate them in `num_gen_threads` threads. +// +struct TransactionBatchMsg { + batch: Vec>, + gen_time: u64, +} + +/// Creates thread which receives batches of transactions from tx_receiver +/// and sends them to the target. +/// If `iterations` is 0, it works indefenetely. +/// Otherwise, it sends at least `iterations` number of transactions +fn create_sender_thread( + tx_receiver: Receiver, + iterations: usize, + target: &SocketAddr, + tpu_use_quic: bool, +) -> thread::JoinHandle<()> { + // ConnectionCache is used instead of client because it gives ~6% higher pps + let connection_cache = match tpu_use_quic { + true => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE), + false => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE), + }; + let connection = connection_cache.get_connection(target); + + let stats_timer_receiver = tick(Duration::from_millis(SAMPLE_PERIOD_MS)); + let progress_timer_receiver = tick(Duration::from_secs(PROGRESS_TIMEOUT_S)); + + let mut time_send_ns = 0; + let mut time_generate_ns = 0; + + // Sender signals to stop Generators by dropping receiver. + // It happens in 2 cases: + // * Sender has sent at least `iterations` number of transactions + // * Sender observes that there is no progress. Since there is no way to use recv_timeout with select, + // a timer is used. + thread::Builder::new().name("Sender".to_string()).spawn(move || { + let mut total_count: usize = 0; + let mut prev_total_count = 0; // to track progress + + let mut stats_count: usize = 0; + let mut stats_error_count: usize = 0; + + loop { + select! { + recv(tx_receiver) -> msg => { + match msg { + Ok(tx_batch) => { + let len = tx_batch.batch.len(); + let mut measure_send_txs = Measure::start("measure_send_txs"); + let res = connection.send_wire_transaction_batch_async(tx_batch.batch); + + measure_send_txs.stop(); + time_send_ns += measure_send_txs.as_ns(); + time_generate_ns += tx_batch.gen_time; + + if res.is_err() { + stats_error_count += len; + } + stats_count += len; + total_count += len; + if iterations != 0 && total_count >= iterations { + info!("All transactions has been sent"); + // dropping receiver to signal generator threads to stop + drop(tx_receiver); + break; + } + } + _ => panic!("Sender panics"), + } + }, + recv(stats_timer_receiver) -> _ => { + info!("tx_receiver queue len: {}", tx_receiver.len()); + info!("Count: {}, error count: {}, send mean time: {}, generate mean time: {}, rps: {}", + stats_count, + stats_error_count, + time_send_ns.checked_div(stats_count as u64).unwrap_or(0), + time_generate_ns.checked_div(stats_count as u64).unwrap_or(0), + compute_rate_per_second(stats_count), + ); + stats_count = 0; + stats_error_count = 0; + time_send_ns = 0; + time_generate_ns = 0; + }, + recv(progress_timer_receiver) -> _ => { + if prev_total_count - total_count == 0 { + info!("No progress, stop execution"); + // dropping receiver to signal generator threads to stop + drop(tx_receiver); + break; + } + prev_total_count = total_count; + } + } + } + }).unwrap() +} + +fn create_generator_thread( + tx_sender: &Sender, + send_batch_size: usize, + transaction_generator: &mut TransactionGenerator, + client: Option>, + payer: Option, +) -> thread::JoinHandle<()> { + let tx_sender = tx_sender.clone(); + + let mut transaction_generator = transaction_generator.clone(); + let transaction_params: &TransactionParams = &transaction_generator.transaction_params; + + // Generate n=1000 unique keypairs + // The number of chunks is described by binomial coefficient + // and hence this choice of n provides large enough number of permutations + let mut keypairs_flat: Vec = Vec::new(); + // 1000 is arbitrary number. In case of permutation_size > 1, + // this guaranties large enough set of unique permutations + let permutation_size = get_permutation_size( + transaction_params.num_signatures.as_ref(), + transaction_params.num_instructions.as_ref(), + ); + let num_keypairs = 1000 * permutation_size; + + let generate_keypairs = + transaction_params.valid_signatures || transaction_params.valid_blockhash; + if generate_keypairs { + keypairs_flat = (0..num_keypairs).map(|_| Keypair::new()).collect(); + } + + thread::Builder::new() + .name("Generator".to_string()) + .spawn(move || { + let indexes: Vec = (0..keypairs_flat.len()).collect(); + let mut it = indexes.iter().permutations(permutation_size); + + loop { + let mut data = Vec::>::with_capacity(send_batch_size); + let mut measure_generate_txs = Measure::start("measure_generate_txs"); + for _ in 0..send_batch_size { + let chunk_keypairs = if generate_keypairs { + let mut permutation = it.next(); + if permutation.is_none() { + // if ran out of permutations, regenerate keys + keypairs_flat.iter_mut().for_each(|v| *v = Keypair::new()); + info!("Regenerate keypairs"); + permutation = it.next(); + } + let permutation = permutation.unwrap(); + Some(apply_permutation(permutation, &keypairs_flat)) + } else { + None + }; + let tx = transaction_generator.generate( + payer.as_ref(), + chunk_keypairs, + client.as_ref(), + ); + data.push(bincode::serialize(&tx).unwrap()); + } + measure_generate_txs.stop(); + + let result = tx_sender.send(TransactionBatchMsg { + batch: data, + gen_time: measure_generate_txs.as_ns(), + }); + if result.is_err() { + // means that receiver has been dropped by sender thread + info!("Exit generator thread"); + break; + } + } + }) + .unwrap() +} fn get_target( nodes: &[ContactInfo], @@ -384,90 +563,39 @@ fn run_dos_transactions( client: Option>, transaction_params: TransactionParams, tpu_use_quic: bool, + num_gen_threads: usize, + send_batch_size: usize, ) { - // Number of payers is the number of generating threads, for now it is 1 + // Number of payers is the number of generating threads // Later, we will create a new payer for each thread since Keypair is not clonable - let payers: Vec> = - create_payers(transaction_params.valid_blockhash, 1, client.as_ref()); - let payer = payers[0].as_ref(); - - // Generate n=1000 unique keypairs - // The number of chunks is described by binomial coefficient - // and hence this choice of n provides large enough number of permutations - let mut keypairs_flat: Vec = Vec::new(); - // 1000 is arbitrary number. In case of permutation_size > 1, - // this guaranties large enough set of unique permutations - let permutation_size = get_permutation_size( - transaction_params.num_signatures.as_ref(), - transaction_params.num_instructions.as_ref(), + let payers: Vec> = create_payers( + transaction_params.valid_blockhash, + num_gen_threads, + client.as_ref(), ); - let num_keypairs = 1000 * permutation_size; - - let generate_keypairs = - transaction_params.valid_signatures || transaction_params.valid_blockhash; - if generate_keypairs { - keypairs_flat = (0..num_keypairs).map(|_| Keypair::new()).collect(); - } - - let indexes: Vec = (0..keypairs_flat.len()).collect(); - let mut it = indexes.iter().permutations(permutation_size); let mut transaction_generator = TransactionGenerator::new(transaction_params); + let (tx_sender, tx_receiver) = unbounded(); - //let connection_cache_stats = Arc::new(ConnectionCacheStats::default()); - //let udp_client = UdpTpuConnection::new(target, connection_cache_stats); - - let connection_cache = match tpu_use_quic { - true => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE), - false => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE), - }; - let connection = connection_cache.get_connection(&target); - - let mut count = 0; - let mut total_count = 0; - let mut error_count = 0; - let mut last_log = Instant::now(); - - loop { - let send_batch_size = min(iterations - total_count, SEND_BATCH_MAX_SIZE); - let mut data = Vec::>::with_capacity(SEND_BATCH_MAX_SIZE); - for _ in 0..send_batch_size { - let chunk_keypairs = if generate_keypairs { - let mut permutation = it.next(); - if permutation.is_none() { - // if ran out of permutations, regenerate keys - keypairs_flat.iter_mut().for_each(|v| *v = Keypair::new()); - info!("Regenerate keypairs"); - permutation = it.next(); - } - let permutation = permutation.unwrap(); - Some(apply_permutation(permutation, &keypairs_flat)) - } else { - None - }; - let tx = transaction_generator.generate(payer, chunk_keypairs, client.as_ref()); - data.push(bincode::serialize(&tx).unwrap()); - } - - let res = connection.send_wire_transaction_batch_async(data); - - if res.is_err() { - error_count += send_batch_size; - } - count += send_batch_size; - total_count += send_batch_size; - if last_log.elapsed().as_millis() > SAMPLE_PERIOD_MS as u128 { - info!( - "count: {}, errors: {}, rps: {}", - count, - error_count, - compute_rate_per_second(count) - ); - last_log = Instant::now(); - count = 0; - } - if iterations != 0 && total_count >= iterations { - break; + let sender_thread = create_sender_thread(tx_receiver, iterations, &target, tpu_use_quic); + let tx_generator_threads: Vec<_> = payers + .into_iter() + .map(|payer| { + create_generator_thread( + &tx_sender, + send_batch_size, + &mut transaction_generator, + client.clone(), + payer, + ) + }) + .collect(); + if let Err(err) = sender_thread.join() { + println!("join() failed with: {:?}", err); + } + for t_generator in tx_generator_threads { + if let Err(err) = t_generator.join() { + println!("join() failed with: {:?}", err); } } } @@ -502,6 +630,8 @@ fn run_dos( client, params.transaction_params, params.tpu_use_quic, + params.num_gen_threads, + params.send_batch_size, ); } else { let (target_id, target_addr) = target.expect("should have target"); @@ -668,6 +798,8 @@ pub mod test { solana_sdk::timing::timestamp, }; + const TEST_SEND_BATCH_SIZE: usize = 1; + // thin wrapper for the run_dos function // to avoid specifying everywhere generic parameters fn run_dos_no_client(nodes: &[ContactInfo], iterations: usize, params: DosClientParameters) { @@ -693,8 +825,10 @@ pub mod test { data_input: None, skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams::default(), tpu_use_quic: false, + send_batch_size: TEST_SEND_BATCH_SIZE, }, ); @@ -709,8 +843,10 @@ pub mod test { data_input: None, skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams::default(), tpu_use_quic: false, + send_batch_size: TEST_SEND_BATCH_SIZE, }, ); @@ -725,8 +861,10 @@ pub mod test { data_input: None, skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams::default(), tpu_use_quic: false, + send_batch_size: TEST_SEND_BATCH_SIZE, }, ); @@ -741,8 +879,10 @@ pub mod test { data_input: Some(Pubkey::default()), skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams::default(), tpu_use_quic: false, + send_batch_size: TEST_SEND_BATCH_SIZE, }, ); } @@ -772,8 +912,10 @@ pub mod test { data_input: None, skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams::default(), tpu_use_quic: false, + send_batch_size: TEST_SEND_BATCH_SIZE, }, ); } @@ -809,6 +951,7 @@ pub mod test { data_input: None, skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams { num_signatures: Some(8), valid_blockhash: false, @@ -818,6 +961,7 @@ pub mod test { num_instructions: None, }, tpu_use_quic: false, + send_batch_size: TEST_SEND_BATCH_SIZE, }, ); @@ -834,6 +978,7 @@ pub mod test { data_input: None, skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams { num_signatures: Some(8), valid_blockhash: false, @@ -843,6 +988,7 @@ pub mod test { num_instructions: None, }, tpu_use_quic: false, + send_batch_size: TEST_SEND_BATCH_SIZE, }, ); @@ -859,6 +1005,7 @@ pub mod test { data_input: None, skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams { num_signatures: Some(8), valid_blockhash: false, @@ -868,6 +1015,7 @@ pub mod test { num_instructions: None, }, tpu_use_quic: false, + send_batch_size: TEST_SEND_BATCH_SIZE, }, ); } @@ -936,6 +1084,7 @@ pub mod test { data_input: None, skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams { num_signatures: None, valid_blockhash: true, @@ -945,6 +1094,7 @@ pub mod test { num_instructions: Some(1), }, tpu_use_quic, + send_batch_size: TEST_SEND_BATCH_SIZE, }, ); @@ -963,6 +1113,7 @@ pub mod test { data_input: None, skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams { num_signatures: None, valid_blockhash: true, @@ -972,6 +1123,7 @@ pub mod test { num_instructions: Some(1), }, tpu_use_quic, + send_batch_size: TEST_SEND_BATCH_SIZE, }, ); // creates and sends unique transactions of type Transfer @@ -989,6 +1141,7 @@ pub mod test { data_input: None, skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams { num_signatures: None, valid_blockhash: true, @@ -998,6 +1151,7 @@ pub mod test { num_instructions: Some(8), }, tpu_use_quic, + send_batch_size: TEST_SEND_BATCH_SIZE, }, ); // creates and sends unique transactions of type CreateAccount @@ -1015,6 +1169,7 @@ pub mod test { data_input: None, skip_gossip: false, allow_private_addr: false, + num_gen_threads: 1, transaction_params: TransactionParams { num_signatures: None, valid_blockhash: true, @@ -1024,6 +1179,7 @@ pub mod test { num_instructions: None, }, tpu_use_quic, + send_batch_size: TEST_SEND_BATCH_SIZE, }, ); }