DoS tool: generate transactions using several threads (#26286)

* add cli arg num_gen_threads

* introduce many generating threads

* add sender thread

* add time measurments

* cleanup

* sort dependencies

* revisit threads termination

* make send_batch_size to be configurable

* update Cargo.lock
This commit is contained in:
kirill lykov 2022-08-01 16:04:19 +02:00 committed by GitHub
parent ac915776c3
commit e74ad90cdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 271 additions and 83 deletions

2
Cargo.lock generated
View File

@ -5104,6 +5104,7 @@ version = "1.11.5"
dependencies = [
"bincode",
"clap 3.1.8",
"crossbeam-channel",
"itertools",
"log",
"rand 0.7.3",
@ -5116,6 +5117,7 @@ dependencies = [
"solana-gossip",
"solana-local-cluster",
"solana-logger 1.11.5",
"solana-measure",
"solana-net-utils",
"solana-perf",
"solana-rpc",

View File

@ -12,6 +12,7 @@ description = "Tool to send various requests to cluster in order to evaluate the
[dependencies]
bincode = "1.3.3"
clap = { version = "3.1.5", features = ["derive", "cargo"] }
crossbeam-channel = "0.5.4"
itertools = "0.10.3"
log = "0.4.17"
rand = "0.7.0"
@ -22,6 +23,7 @@ solana-core = { path = "../core", version = "=1.11.5" }
solana-faucet = { path = "../faucet", version = "=1.11.5" }
solana-gossip = { path = "../gossip", version = "=1.11.5" }
solana-logger = { path = "../logger", version = "=1.11.5" }
solana-measure = { path = "../measure", version = "=1.11.5" }
solana-net-utils = { path = "../net-utils", version = "=1.11.5" }
solana-perf = { path = "../perf", version = "=1.11.5" }
solana-rpc = { path = "../rpc", version = "=1.11.5" }

View File

@ -48,6 +48,13 @@ pub struct DosClientParameters {
#[clap(long, help = "Allow contacting private ip addresses")]
pub allow_private_addr: bool,
#[clap(
long,
default_value = "1",
help = "Number of threads generating transactions"
)]
pub num_gen_threads: usize,
#[clap(flatten)]
pub transaction_params: TransactionParams,
@ -57,9 +64,12 @@ pub struct DosClientParameters {
help = "Submit transactions via QUIC"
)]
pub tpu_use_quic: bool,
#[clap(long, default_value = "16384", help = "Size of the transactions batch")]
pub send_batch_size: usize,
}
#[derive(Args, Serialize, Deserialize, Debug, Default, PartialEq, Eq)]
#[derive(Args, Clone, Serialize, Deserialize, Debug, Default, PartialEq, Eq)]
#[clap(rename_all = "kebab-case")]
pub struct TransactionParams {
#[clap(
@ -219,6 +229,8 @@ mod tests {
allow_private_addr: false,
transaction_params: TransactionParams::default(),
tpu_use_quic: false,
num_gen_threads: 1,
send_batch_size: 16384,
},
);
}
@ -237,6 +249,8 @@ mod tests {
"--num-signatures",
"8",
"--tpu-use-quic",
"--send-batch-size",
"1",
])
.unwrap();
assert_eq!(
@ -249,6 +263,7 @@ mod tests {
data_input: None,
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams {
num_signatures: Some(8),
valid_blockhash: false,
@ -258,6 +273,7 @@ mod tests {
num_instructions: None,
},
tpu_use_quic: true,
send_batch_size: 1,
},
);
}
@ -277,6 +293,8 @@ mod tests {
"transfer",
"--num-instructions",
"1",
"--send-batch-size",
"1",
])
.unwrap();
assert_eq!(
@ -289,6 +307,7 @@ mod tests {
data_input: None,
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams {
num_signatures: None,
valid_blockhash: true,
@ -298,6 +317,7 @@ mod tests {
num_instructions: Some(1),
},
tpu_use_quic: false,
send_batch_size: 1,
},
);
@ -332,6 +352,8 @@ mod tests {
"transfer",
"--num-instructions",
"8",
"--send-batch-size",
"1",
])
.unwrap();
assert_eq!(
@ -344,6 +366,7 @@ mod tests {
data_input: None,
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams {
num_signatures: None,
valid_blockhash: true,
@ -353,6 +376,7 @@ mod tests {
num_instructions: Some(8),
},
tpu_use_quic: false,
send_batch_size: 1,
},
);
}
@ -370,6 +394,8 @@ mod tests {
"--valid-blockhash",
"--transaction-type",
"account-creation",
"--send-batch-size",
"1",
])
.unwrap();
assert_eq!(
@ -382,6 +408,7 @@ mod tests {
data_input: None,
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams {
num_signatures: None,
valid_blockhash: true,
@ -391,6 +418,7 @@ mod tests {
num_instructions: None,
},
tpu_use_quic: false,
send_batch_size: 1,
},
);
}

View File

@ -40,6 +40,7 @@
//!
#![allow(clippy::integer_arithmetic)]
use {
crossbeam_channel::{select, tick, unbounded, Receiver, Sender},
itertools::Itertools,
log::*,
rand::{thread_rng, Rng},
@ -55,6 +56,7 @@ use {
contact_info::ContactInfo,
gossip_service::{discover, get_multi_client},
},
solana_measure::measure::Measure,
solana_sdk::{
hash::Hash,
instruction::CompiledInstruction,
@ -69,17 +71,18 @@ use {
},
solana_streamer::socket::SocketAddrSpace,
std::{
cmp::min,
net::{SocketAddr, UdpSocket},
process::exit,
sync::Arc,
thread,
time::{Duration, Instant},
},
};
const SAMPLE_PERIOD_MS: usize = 10_000;
const PROGRESS_TIMEOUT_S: u64 = 120;
const SAMPLE_PERIOD_MS: u64 = 10_000;
fn compute_rate_per_second(count: usize) -> usize {
(count * 1000) / SAMPLE_PERIOD_MS
(count * 1000) / (SAMPLE_PERIOD_MS as usize)
}
/// Provide functionality to generate several types of transactions:
@ -92,6 +95,7 @@ fn compute_rate_per_second(count: usize) -> usize {
/// 2.1 Transfer from 1 payer to multiple destinations (many instructions per transaction)
/// 2.2 Create an account
///
#[derive(Clone)]
struct TransactionGenerator {
blockhash: Hash,
last_generated: Instant,
@ -229,7 +233,182 @@ impl TransactionGenerator {
}
}
const SEND_BATCH_MAX_SIZE: usize = 1 << 10;
// Multithreading-related functions
//
// The most computationally expensive work is signing new transactions.
// Here we generate them in `num_gen_threads` threads.
//
struct TransactionBatchMsg {
batch: Vec<Vec<u8>>,
gen_time: u64,
}
/// Creates thread which receives batches of transactions from tx_receiver
/// and sends them to the target.
/// If `iterations` is 0, it works indefenetely.
/// Otherwise, it sends at least `iterations` number of transactions
fn create_sender_thread(
tx_receiver: Receiver<TransactionBatchMsg>,
iterations: usize,
target: &SocketAddr,
tpu_use_quic: bool,
) -> thread::JoinHandle<()> {
// ConnectionCache is used instead of client because it gives ~6% higher pps
let connection_cache = match tpu_use_quic {
true => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE),
false => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE),
};
let connection = connection_cache.get_connection(target);
let stats_timer_receiver = tick(Duration::from_millis(SAMPLE_PERIOD_MS));
let progress_timer_receiver = tick(Duration::from_secs(PROGRESS_TIMEOUT_S));
let mut time_send_ns = 0;
let mut time_generate_ns = 0;
// Sender signals to stop Generators by dropping receiver.
// It happens in 2 cases:
// * Sender has sent at least `iterations` number of transactions
// * Sender observes that there is no progress. Since there is no way to use recv_timeout with select,
// a timer is used.
thread::Builder::new().name("Sender".to_string()).spawn(move || {
let mut total_count: usize = 0;
let mut prev_total_count = 0; // to track progress
let mut stats_count: usize = 0;
let mut stats_error_count: usize = 0;
loop {
select! {
recv(tx_receiver) -> msg => {
match msg {
Ok(tx_batch) => {
let len = tx_batch.batch.len();
let mut measure_send_txs = Measure::start("measure_send_txs");
let res = connection.send_wire_transaction_batch_async(tx_batch.batch);
measure_send_txs.stop();
time_send_ns += measure_send_txs.as_ns();
time_generate_ns += tx_batch.gen_time;
if res.is_err() {
stats_error_count += len;
}
stats_count += len;
total_count += len;
if iterations != 0 && total_count >= iterations {
info!("All transactions has been sent");
// dropping receiver to signal generator threads to stop
drop(tx_receiver);
break;
}
}
_ => panic!("Sender panics"),
}
},
recv(stats_timer_receiver) -> _ => {
info!("tx_receiver queue len: {}", tx_receiver.len());
info!("Count: {}, error count: {}, send mean time: {}, generate mean time: {}, rps: {}",
stats_count,
stats_error_count,
time_send_ns.checked_div(stats_count as u64).unwrap_or(0),
time_generate_ns.checked_div(stats_count as u64).unwrap_or(0),
compute_rate_per_second(stats_count),
);
stats_count = 0;
stats_error_count = 0;
time_send_ns = 0;
time_generate_ns = 0;
},
recv(progress_timer_receiver) -> _ => {
if prev_total_count - total_count == 0 {
info!("No progress, stop execution");
// dropping receiver to signal generator threads to stop
drop(tx_receiver);
break;
}
prev_total_count = total_count;
}
}
}
}).unwrap()
}
fn create_generator_thread<T: 'static + BenchTpsClient + Send + Sync>(
tx_sender: &Sender<TransactionBatchMsg>,
send_batch_size: usize,
transaction_generator: &mut TransactionGenerator,
client: Option<Arc<T>>,
payer: Option<Keypair>,
) -> thread::JoinHandle<()> {
let tx_sender = tx_sender.clone();
let mut transaction_generator = transaction_generator.clone();
let transaction_params: &TransactionParams = &transaction_generator.transaction_params;
// Generate n=1000 unique keypairs
// The number of chunks is described by binomial coefficient
// and hence this choice of n provides large enough number of permutations
let mut keypairs_flat: Vec<Keypair> = Vec::new();
// 1000 is arbitrary number. In case of permutation_size > 1,
// this guaranties large enough set of unique permutations
let permutation_size = get_permutation_size(
transaction_params.num_signatures.as_ref(),
transaction_params.num_instructions.as_ref(),
);
let num_keypairs = 1000 * permutation_size;
let generate_keypairs =
transaction_params.valid_signatures || transaction_params.valid_blockhash;
if generate_keypairs {
keypairs_flat = (0..num_keypairs).map(|_| Keypair::new()).collect();
}
thread::Builder::new()
.name("Generator".to_string())
.spawn(move || {
let indexes: Vec<usize> = (0..keypairs_flat.len()).collect();
let mut it = indexes.iter().permutations(permutation_size);
loop {
let mut data = Vec::<Vec<u8>>::with_capacity(send_batch_size);
let mut measure_generate_txs = Measure::start("measure_generate_txs");
for _ in 0..send_batch_size {
let chunk_keypairs = if generate_keypairs {
let mut permutation = it.next();
if permutation.is_none() {
// if ran out of permutations, regenerate keys
keypairs_flat.iter_mut().for_each(|v| *v = Keypair::new());
info!("Regenerate keypairs");
permutation = it.next();
}
let permutation = permutation.unwrap();
Some(apply_permutation(permutation, &keypairs_flat))
} else {
None
};
let tx = transaction_generator.generate(
payer.as_ref(),
chunk_keypairs,
client.as_ref(),
);
data.push(bincode::serialize(&tx).unwrap());
}
measure_generate_txs.stop();
let result = tx_sender.send(TransactionBatchMsg {
batch: data,
gen_time: measure_generate_txs.as_ns(),
});
if result.is_err() {
// means that receiver has been dropped by sender thread
info!("Exit generator thread");
break;
}
}
})
.unwrap()
}
fn get_target(
nodes: &[ContactInfo],
@ -384,90 +563,39 @@ fn run_dos_transactions<T: 'static + BenchTpsClient + Send + Sync>(
client: Option<Arc<T>>,
transaction_params: TransactionParams,
tpu_use_quic: bool,
num_gen_threads: usize,
send_batch_size: usize,
) {
// Number of payers is the number of generating threads, for now it is 1
// Number of payers is the number of generating threads
// Later, we will create a new payer for each thread since Keypair is not clonable
let payers: Vec<Option<Keypair>> =
create_payers(transaction_params.valid_blockhash, 1, client.as_ref());
let payer = payers[0].as_ref();
// Generate n=1000 unique keypairs
// The number of chunks is described by binomial coefficient
// and hence this choice of n provides large enough number of permutations
let mut keypairs_flat: Vec<Keypair> = Vec::new();
// 1000 is arbitrary number. In case of permutation_size > 1,
// this guaranties large enough set of unique permutations
let permutation_size = get_permutation_size(
transaction_params.num_signatures.as_ref(),
transaction_params.num_instructions.as_ref(),
let payers: Vec<Option<Keypair>> = create_payers(
transaction_params.valid_blockhash,
num_gen_threads,
client.as_ref(),
);
let num_keypairs = 1000 * permutation_size;
let generate_keypairs =
transaction_params.valid_signatures || transaction_params.valid_blockhash;
if generate_keypairs {
keypairs_flat = (0..num_keypairs).map(|_| Keypair::new()).collect();
}
let indexes: Vec<usize> = (0..keypairs_flat.len()).collect();
let mut it = indexes.iter().permutations(permutation_size);
let mut transaction_generator = TransactionGenerator::new(transaction_params);
let (tx_sender, tx_receiver) = unbounded();
//let connection_cache_stats = Arc::new(ConnectionCacheStats::default());
//let udp_client = UdpTpuConnection::new(target, connection_cache_stats);
let connection_cache = match tpu_use_quic {
true => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE),
false => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE),
};
let connection = connection_cache.get_connection(&target);
let mut count = 0;
let mut total_count = 0;
let mut error_count = 0;
let mut last_log = Instant::now();
loop {
let send_batch_size = min(iterations - total_count, SEND_BATCH_MAX_SIZE);
let mut data = Vec::<Vec<u8>>::with_capacity(SEND_BATCH_MAX_SIZE);
for _ in 0..send_batch_size {
let chunk_keypairs = if generate_keypairs {
let mut permutation = it.next();
if permutation.is_none() {
// if ran out of permutations, regenerate keys
keypairs_flat.iter_mut().for_each(|v| *v = Keypair::new());
info!("Regenerate keypairs");
permutation = it.next();
}
let permutation = permutation.unwrap();
Some(apply_permutation(permutation, &keypairs_flat))
} else {
None
};
let tx = transaction_generator.generate(payer, chunk_keypairs, client.as_ref());
data.push(bincode::serialize(&tx).unwrap());
}
let res = connection.send_wire_transaction_batch_async(data);
if res.is_err() {
error_count += send_batch_size;
}
count += send_batch_size;
total_count += send_batch_size;
if last_log.elapsed().as_millis() > SAMPLE_PERIOD_MS as u128 {
info!(
"count: {}, errors: {}, rps: {}",
count,
error_count,
compute_rate_per_second(count)
);
last_log = Instant::now();
count = 0;
}
if iterations != 0 && total_count >= iterations {
break;
let sender_thread = create_sender_thread(tx_receiver, iterations, &target, tpu_use_quic);
let tx_generator_threads: Vec<_> = payers
.into_iter()
.map(|payer| {
create_generator_thread(
&tx_sender,
send_batch_size,
&mut transaction_generator,
client.clone(),
payer,
)
})
.collect();
if let Err(err) = sender_thread.join() {
println!("join() failed with: {:?}", err);
}
for t_generator in tx_generator_threads {
if let Err(err) = t_generator.join() {
println!("join() failed with: {:?}", err);
}
}
}
@ -502,6 +630,8 @@ fn run_dos<T: 'static + BenchTpsClient + Send + Sync>(
client,
params.transaction_params,
params.tpu_use_quic,
params.num_gen_threads,
params.send_batch_size,
);
} else {
let (target_id, target_addr) = target.expect("should have target");
@ -668,6 +798,8 @@ pub mod test {
solana_sdk::timing::timestamp,
};
const TEST_SEND_BATCH_SIZE: usize = 1;
// thin wrapper for the run_dos function
// to avoid specifying everywhere generic parameters
fn run_dos_no_client(nodes: &[ContactInfo], iterations: usize, params: DosClientParameters) {
@ -693,8 +825,10 @@ pub mod test {
data_input: None,
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams::default(),
tpu_use_quic: false,
send_batch_size: TEST_SEND_BATCH_SIZE,
},
);
@ -709,8 +843,10 @@ pub mod test {
data_input: None,
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams::default(),
tpu_use_quic: false,
send_batch_size: TEST_SEND_BATCH_SIZE,
},
);
@ -725,8 +861,10 @@ pub mod test {
data_input: None,
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams::default(),
tpu_use_quic: false,
send_batch_size: TEST_SEND_BATCH_SIZE,
},
);
@ -741,8 +879,10 @@ pub mod test {
data_input: Some(Pubkey::default()),
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams::default(),
tpu_use_quic: false,
send_batch_size: TEST_SEND_BATCH_SIZE,
},
);
}
@ -772,8 +912,10 @@ pub mod test {
data_input: None,
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams::default(),
tpu_use_quic: false,
send_batch_size: TEST_SEND_BATCH_SIZE,
},
);
}
@ -809,6 +951,7 @@ pub mod test {
data_input: None,
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams {
num_signatures: Some(8),
valid_blockhash: false,
@ -818,6 +961,7 @@ pub mod test {
num_instructions: None,
},
tpu_use_quic: false,
send_batch_size: TEST_SEND_BATCH_SIZE,
},
);
@ -834,6 +978,7 @@ pub mod test {
data_input: None,
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams {
num_signatures: Some(8),
valid_blockhash: false,
@ -843,6 +988,7 @@ pub mod test {
num_instructions: None,
},
tpu_use_quic: false,
send_batch_size: TEST_SEND_BATCH_SIZE,
},
);
@ -859,6 +1005,7 @@ pub mod test {
data_input: None,
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams {
num_signatures: Some(8),
valid_blockhash: false,
@ -868,6 +1015,7 @@ pub mod test {
num_instructions: None,
},
tpu_use_quic: false,
send_batch_size: TEST_SEND_BATCH_SIZE,
},
);
}
@ -936,6 +1084,7 @@ pub mod test {
data_input: None,
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams {
num_signatures: None,
valid_blockhash: true,
@ -945,6 +1094,7 @@ pub mod test {
num_instructions: Some(1),
},
tpu_use_quic,
send_batch_size: TEST_SEND_BATCH_SIZE,
},
);
@ -963,6 +1113,7 @@ pub mod test {
data_input: None,
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams {
num_signatures: None,
valid_blockhash: true,
@ -972,6 +1123,7 @@ pub mod test {
num_instructions: Some(1),
},
tpu_use_quic,
send_batch_size: TEST_SEND_BATCH_SIZE,
},
);
// creates and sends unique transactions of type Transfer
@ -989,6 +1141,7 @@ pub mod test {
data_input: None,
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams {
num_signatures: None,
valid_blockhash: true,
@ -998,6 +1151,7 @@ pub mod test {
num_instructions: Some(8),
},
tpu_use_quic,
send_batch_size: TEST_SEND_BATCH_SIZE,
},
);
// creates and sends unique transactions of type CreateAccount
@ -1015,6 +1169,7 @@ pub mod test {
data_input: None,
skip_gossip: false,
allow_private_addr: false,
num_gen_threads: 1,
transaction_params: TransactionParams {
num_signatures: None,
valid_blockhash: true,
@ -1024,6 +1179,7 @@ pub mod test {
num_instructions: None,
},
tpu_use_quic,
send_batch_size: TEST_SEND_BATCH_SIZE,
},
);
}