Refactor bench-tps chunking transactions logic (#26661)

Refactor bench-tps chunking txs
This commit is contained in:
kirill lykov 2022-07-19 13:13:24 +02:00 committed by GitHub
parent 4163dc181e
commit 3929ad67eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 79 additions and 56 deletions

View File

@ -50,6 +50,77 @@ fn split_into_source_destination(
}
(source_keypair_chunks, dest_keypair_chunks)
}
struct TransactionChunkGenerator<'a> {
source_keypair_chunks: Vec<Vec<&'a Keypair>>,
dest_keypair_chunks: Vec<VecDeque<&'a Keypair>>,
chunk_index: usize,
reclaim_lamports_back_to_source_account: bool,
}
impl<'a> TransactionChunkGenerator<'a> {
fn new(gen_keypairs: &'a [Keypair], chunk_size: usize) -> Self {
let (source_keypair_chunks, dest_keypair_chunks) =
split_into_source_destination(gen_keypairs, chunk_size);
TransactionChunkGenerator {
source_keypair_chunks,
dest_keypair_chunks,
chunk_index: 0,
reclaim_lamports_back_to_source_account: false,
}
}
fn generate(&mut self, blockhash: Option<&Hash>) -> Vec<(Transaction, u64)> {
let tx_count = self.source_keypair_chunks.len();
info!(
"Signing transactions... {} (reclaim={}, blockhash={:?})",
tx_count, self.reclaim_lamports_back_to_source_account, blockhash
);
let signing_start = Instant::now();
let source_chunk = &self.source_keypair_chunks[self.chunk_index];
let dest_chunk = &self.dest_keypair_chunks[self.chunk_index];
assert!(blockhash.is_some());
let transactions = generate_system_txs(
source_chunk,
dest_chunk,
self.reclaim_lamports_back_to_source_account,
blockhash.unwrap(),
);
let duration = signing_start.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let bsps = (tx_count) as f64 / ns as f64;
let nsps = ns as f64 / (tx_count) as f64;
info!(
"Done. {:.2} thousand signatures per second, {:.2} us per signature, {} ms total time, {:?}",
bsps * 1_000_000_f64,
nsps / 1_000_f64,
duration_as_ms(&duration),
blockhash,
);
datapoint_info!(
"bench-tps-generate_txs",
("duration", duration_as_us(&duration), i64)
);
transactions
}
fn advance(&mut self) {
// Rotate destination keypairs so that the next round of transactions will have different
// transaction signatures even when blockhash is reused.
self.dest_keypair_chunks[self.chunk_index].rotate_left(1);
// Move on to next chunk
self.chunk_index = (self.chunk_index + 1) % self.source_keypair_chunks.len();
// Switch directions after transfering for each "chunk"
if self.chunk_index == 0 {
self.reclaim_lamports_back_to_source_account =
!self.reclaim_lamports_back_to_source_account;
}
}
}
fn wait_for_target_slots_per_epoch<T>(target_slots_per_epoch: u64, client: &Arc<T>)
where
@ -101,28 +172,17 @@ fn generate_chunked_transfers(
recent_blockhash: Arc<RwLock<Hash>>,
shared_txs: &SharedTransactions,
shared_tx_active_thread_count: Arc<AtomicIsize>,
source_keypair_chunks: Vec<Vec<&Keypair>>,
dest_keypair_chunks: &mut [VecDeque<&Keypair>],
mut chunk_generator: TransactionChunkGenerator<'_>,
threads: usize,
duration: Duration,
sustained: bool,
) {
// generate and send transactions for the specified duration
let start = Instant::now();
let keypair_chunks = source_keypair_chunks.len();
let mut reclaim_lamports_back_to_source_account = false;
let mut chunk_index = 0;
let mut last_generate_txs_time = Instant::now();
while start.elapsed() < duration {
generate_txs(
shared_txs,
&recent_blockhash,
&source_keypair_chunks[chunk_index],
&dest_keypair_chunks[chunk_index],
threads,
reclaim_lamports_back_to_source_account,
);
generate_txs(shared_txs, &recent_blockhash, &mut chunk_generator, threads);
datapoint_info!(
"blockhash_stats",
@ -149,18 +209,7 @@ fn generate_chunked_transfers(
sleep(Duration::from_millis(1));
}
}
// Rotate destination keypairs so that the next round of transactions will have different
// transaction signatures even when blockhash is reused.
dest_keypair_chunks[chunk_index].rotate_left(1);
// Move on to next chunk
chunk_index = (chunk_index + 1) % keypair_chunks;
// Switch directions after transfering for each "chunk"
if chunk_index == 0 {
reclaim_lamports_back_to_source_account = !reclaim_lamports_back_to_source_account;
}
chunk_generator.advance();
}
}
@ -216,8 +265,7 @@ where
} = config;
assert!(gen_keypairs.len() >= 2 * tx_count);
let (source_keypair_chunks, mut dest_keypair_chunks) =
split_into_source_destination(&gen_keypairs, tx_count);
let chunk_generator = TransactionChunkGenerator::new(&gen_keypairs, tx_count);
let first_tx_count = loop {
match client.get_transaction_count() {
@ -275,8 +323,7 @@ where
blockhash,
&shared_txs,
shared_tx_active_thread_count,
source_keypair_chunks,
&mut dest_keypair_chunks,
chunk_generator,
threads,
duration,
sustained,
@ -351,36 +398,12 @@ fn generate_system_txs(
fn generate_txs(
shared_txs: &SharedTransactions,
blockhash: &Arc<RwLock<Hash>>,
source: &[&Keypair],
dest: &VecDeque<&Keypair>,
chunk_generator: &mut TransactionChunkGenerator<'_>,
threads: usize,
reclaim: bool,
) {
let blockhash = *blockhash.read().unwrap();
let tx_count = source.len();
info!(
"Signing transactions... {} (reclaim={}, blockhash={})",
tx_count, reclaim, &blockhash
);
let signing_start = Instant::now();
let blockhash = blockhash.read().map(|x| *x).ok();
let transactions = generate_system_txs(source, dest, reclaim, &blockhash);
let duration = signing_start.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let bsps = (tx_count) as f64 / ns as f64;
let nsps = ns as f64 / (tx_count) as f64;
info!(
"Done. {:.2} thousand signatures per second, {:.2} us per signature, {} ms total time, {}",
bsps * 1_000_000_f64,
nsps / 1_000_f64,
duration_as_ms(&duration),
blockhash,
);
datapoint_info!(
"bench-tps-generate_txs",
("duration", duration_as_us(&duration), i64)
);
let transactions = chunk_generator.generate(blockhash.as_ref());
let sz = transactions.len() / threads;
let chunks: Vec<_> = transactions.chunks(sz).collect();