replace send with sendmmsg (#25585)

* replace send with sendmmsg

* address PR comments
This commit is contained in:
kirill lykov 2022-06-21 09:51:48 +01:00 committed by GitHub
parent 4f0cc6224e
commit a9069244f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 45 additions and 25 deletions

View File

@ -47,6 +47,7 @@ use {
solana_client::{
connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE},
rpc_client::RpcClient,
tpu_connection::TpuConnection,
},
solana_core::serve_repair::RepairProtocol,
solana_dos::cli::*,
@ -67,6 +68,7 @@ use {
},
solana_streamer::socket::SocketAddrSpace,
std::{
cmp::min,
net::{SocketAddr, UdpSocket},
process::exit,
sync::Arc,
@ -233,6 +235,8 @@ impl TransactionGenerator {
}
}
const SEND_BATCH_MAX_SIZE: usize = 1 << 10;
fn get_target(
nodes: &[ContactInfo],
mode: Mode,
@ -385,9 +389,8 @@ fn run_dos_transactions<T: 'static + BenchTpsClient + Send + Sync>(
iterations: usize,
client: Option<Arc<T>>,
transaction_params: TransactionParams,
tpu_use_quic: bool,
) {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
// Number of payers is the number of generating threads, for now it is 1
// Later, we will create a new payer for each thread since Keypair is not clonable
let payers: Vec<Option<Keypair>> =
@ -417,34 +420,45 @@ fn run_dos_transactions<T: 'static + BenchTpsClient + Send + Sync>(
let mut transaction_generator = TransactionGenerator::new(transaction_params);
//let connection_cache_stats = Arc::new(ConnectionCacheStats::default());
//let udp_client = UdpTpuConnection::new(target, connection_cache_stats);
let connection_cache = ConnectionCache::new(tpu_use_quic, DEFAULT_TPU_CONNECTION_POOL_SIZE);
let connection = connection_cache.get_connection(&target);
let mut count = 0;
let mut total_count = 0;
let mut error_count = 0;
let mut last_log = Instant::now();
loop {
let chunk_keypairs = if generate_keypairs {
let permutation = it.next();
if permutation.is_none() {
// if ran out of permutations, regenerate keys
keypairs_flat.iter_mut().for_each(|v| *v = Keypair::new());
info!("Regenerate keypairs");
continue;
}
let permutation = permutation.unwrap();
Some(apply_permutation(permutation, &keypairs_flat))
} else {
None
};
let tx = transaction_generator.generate(payer, chunk_keypairs, client.as_ref());
let data = bincode::serialize(&tx).unwrap();
let res = socket.send_to(&data, target);
if res.is_err() {
error_count += 1;
let send_batch_size = min(iterations - total_count, SEND_BATCH_MAX_SIZE);
let mut data = Vec::<Vec<u8>>::with_capacity(SEND_BATCH_MAX_SIZE);
for _ in 0..send_batch_size {
let chunk_keypairs = if generate_keypairs {
let mut permutation = it.next();
if permutation.is_none() {
// if ran out of permutations, regenerate keys
keypairs_flat.iter_mut().for_each(|v| *v = Keypair::new());
info!("Regenerate keypairs");
permutation = it.next();
}
let permutation = permutation.unwrap();
Some(apply_permutation(permutation, &keypairs_flat))
} else {
None
};
let tx = transaction_generator.generate(payer, chunk_keypairs, client.as_ref());
data.push(bincode::serialize(&tx).unwrap());
}
count += 1;
total_count += 1;
let res = connection.send_wire_transaction_batch_async(data);
if res.is_err() {
error_count += send_batch_size;
}
count += send_batch_size;
total_count += send_batch_size;
if last_log.elapsed().as_millis() > SAMPLE_PERIOD_MS as u128 {
info!(
"count: {}, errors: {}, rps: {}",
@ -485,7 +499,13 @@ fn run_dos<T: 'static + BenchTpsClient + Send + Sync>(
{
let target = target.expect("should have target");
info!("Targeting {}", target);
run_dos_transactions(target, iterations, client, params.transaction_params);
run_dos_transactions(
target,
iterations,
client,
params.transaction_params,
params.tpu_use_quic,
);
} else {
let target = target.expect("should have target");
info!("Targeting {}", target);