From 8465a3aa46bb9ee2112734701fb93dcb1f586710 Mon Sep 17 00:00:00 2001 From: kirill lykov Date: Thu, 21 Jul 2022 14:50:23 +0200 Subject: [PATCH] Create nonce keypair chunks (#26670) * extract KeypairChunks structure * introduce durable nonce in TransactionChunkGenerator * Introduce TimestampedTransaction with optional timestamp --- bench-tps/src/bench.rs | 202 +++++++++++++++++++++++++++++++---------- 1 file changed, 152 insertions(+), 50 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 5593d9e94..633168e14 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -7,6 +7,7 @@ use { }, log::*, rayon::prelude::*, + solana_client::nonce_utils, solana_metrics::{self, datapoint_info}, solana_sdk::{ clock::{DEFAULT_MS_PER_SLOT, DEFAULT_S_PER_SLOT, MAX_PROCESSING_AGE}, @@ -35,57 +36,95 @@ use { // The point at which transactions become "too old", in seconds. const MAX_TX_QUEUE_AGE: u64 = (MAX_PROCESSING_AGE as f64 * DEFAULT_S_PER_SLOT) as u64; -pub type SharedTransactions = Arc>>>; +pub type TimestampedTransaction = (Transaction, Option); +pub type SharedTransactions = Arc>>>; -/// Split input vector of keypairs into two sets of chunks of given size -fn split_into_source_destination( - keypairs: &[Keypair], - chunk_size: usize, -) -> (Vec>, Vec>) { - let mut source_keypair_chunks: Vec> = Vec::new(); - let mut dest_keypair_chunks: Vec> = Vec::new(); - for chunk in keypairs.chunks_exact(2 * chunk_size) { - source_keypair_chunks.push(chunk[..chunk_size].iter().collect()); - dest_keypair_chunks.push(chunk[chunk_size..].iter().collect()); - } - (source_keypair_chunks, dest_keypair_chunks) +/// Keypairs split into source and destination +/// used for transfer transactions +struct KeypairChunks<'a> { + source: Vec>, + dest: Vec>, } -struct TransactionChunkGenerator<'a> { - source_keypair_chunks: Vec>, - dest_keypair_chunks: Vec>, + +impl<'a> KeypairChunks<'a> { + /// Split input vector of keypairs into two sets of chunks of given size + fn new(keypairs: &'a [Keypair], chunk_size: usize) -> Self { + let mut source_keypair_chunks: Vec> = Vec::new(); + let mut dest_keypair_chunks: Vec> = Vec::new(); + for chunk in keypairs.chunks_exact(2 * chunk_size) { + source_keypair_chunks.push(chunk[..chunk_size].iter().collect()); + dest_keypair_chunks.push(chunk[chunk_size..].iter().collect()); + } + KeypairChunks { + source: source_keypair_chunks, + dest: dest_keypair_chunks, + } + } +} + +struct TransactionChunkGenerator<'a, 'b, T> { + client: Arc, + account_chunks: KeypairChunks<'a>, + nonce_chunks: Option>, chunk_index: usize, reclaim_lamports_back_to_source_account: bool, } -impl<'a> TransactionChunkGenerator<'a> { - fn new(gen_keypairs: &'a [Keypair], chunk_size: usize) -> Self { - let (source_keypair_chunks, dest_keypair_chunks) = - split_into_source_destination(gen_keypairs, chunk_size); +impl<'a, 'b, T> TransactionChunkGenerator<'a, 'b, T> +where + T: 'static + BenchTpsClient + Send + Sync, +{ + fn new( + client: Arc, + gen_keypairs: &'a [Keypair], + nonce_keypairs: Option<&'b Vec>, + chunk_size: usize, + ) -> Self { + let account_chunks = KeypairChunks::new(gen_keypairs, chunk_size); + let nonce_chunks = + nonce_keypairs.map(|nonce_keypairs| KeypairChunks::new(nonce_keypairs, chunk_size)); + TransactionChunkGenerator { - source_keypair_chunks, - dest_keypair_chunks, + client, + account_chunks, + nonce_chunks, chunk_index: 0, reclaim_lamports_back_to_source_account: false, } } - fn generate(&mut self, blockhash: Option<&Hash>) -> Vec<(Transaction, u64)> { - let tx_count = self.source_keypair_chunks.len(); + /// generate transactions to transfer lamports from source to destination accounts + /// if durable nonce is used, blockhash is None + fn generate(&mut self, blockhash: Option<&Hash>) -> Vec { + let tx_count = self.account_chunks.source.len(); info!( "Signing transactions... {} (reclaim={}, blockhash={:?})", tx_count, self.reclaim_lamports_back_to_source_account, blockhash ); let signing_start = Instant::now(); - let source_chunk = &self.source_keypair_chunks[self.chunk_index]; - let dest_chunk = &self.dest_keypair_chunks[self.chunk_index]; - assert!(blockhash.is_some()); - let transactions = generate_system_txs( - source_chunk, - dest_chunk, - self.reclaim_lamports_back_to_source_account, - blockhash.unwrap(), - ); + let source_chunk = &self.account_chunks.source[self.chunk_index]; + let dest_chunk = &self.account_chunks.dest[self.chunk_index]; + let transactions = if let Some(nonce_chunks) = &self.nonce_chunks { + let source_nonce_chunk = &nonce_chunks.source[self.chunk_index]; + let dest_nonce_chunk: &VecDeque<&Keypair> = &nonce_chunks.dest[self.chunk_index]; + generate_nonced_system_txs( + self.client.clone(), + source_chunk, + dest_chunk, + source_nonce_chunk, + dest_nonce_chunk, + self.reclaim_lamports_back_to_source_account, + ) + } else { + assert!(blockhash.is_some()); + generate_system_txs( + source_chunk, + dest_chunk, + self.reclaim_lamports_back_to_source_account, + blockhash.unwrap(), + ) + }; let duration = signing_start.elapsed(); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); @@ -109,10 +148,12 @@ impl<'a> TransactionChunkGenerator<'a> { fn advance(&mut self) { // Rotate destination keypairs so that the next round of transactions will have different // transaction signatures even when blockhash is reused. - self.dest_keypair_chunks[self.chunk_index].rotate_left(1); - + self.account_chunks.dest[self.chunk_index].rotate_left(1); + if let Some(nonce_chunks) = &mut self.nonce_chunks { + nonce_chunks.dest[self.chunk_index].rotate_left(1); + } // Move on to next chunk - self.chunk_index = (self.chunk_index + 1) % self.source_keypair_chunks.len(); + self.chunk_index = (self.chunk_index + 1) % self.account_chunks.source.len(); // Switch directions after transfering for each "chunk" if self.chunk_index == 0 { @@ -168,11 +209,11 @@ where .unwrap() } -fn generate_chunked_transfers( +fn generate_chunked_transfers( recent_blockhash: Arc>, shared_txs: &SharedTransactions, shared_tx_active_thread_count: Arc, - mut chunk_generator: TransactionChunkGenerator<'_>, + mut chunk_generator: TransactionChunkGenerator<'_, '_, T>, threads: usize, duration: Duration, sustained: bool, @@ -265,7 +306,12 @@ where } = config; assert!(gen_keypairs.len() >= 2 * tx_count); - let chunk_generator = TransactionChunkGenerator::new(&gen_keypairs, tx_count); + let chunk_generator = TransactionChunkGenerator::new( + client.clone(), + &gen_keypairs, + None, // TODO(klykov): to be added in the follow up PR + tx_count, + ); let first_tx_count = loop { match client.get_transaction_count() { @@ -377,7 +423,7 @@ fn generate_system_txs( dest: &VecDeque<&Keypair>, reclaim: bool, blockhash: &Hash, -) -> Vec<(Transaction, u64)> { +) -> Vec { let pairs: Vec<_> = if !reclaim { source.iter().zip(dest.iter()).collect() } else { @@ -389,16 +435,70 @@ fn generate_system_txs( .map(|(from, to)| { ( system_transaction::transfer(from, &to.pubkey(), 1, *blockhash), - timestamp(), + Some(timestamp()), ) }) .collect() } -fn generate_txs( +fn get_nonce_blockhash( + client: Arc, + nonce_account_pubkey: Pubkey, +) -> Hash { + let nonce_account = client + .get_account(&nonce_account_pubkey) + .unwrap_or_else(|error| panic!("{:?}", error)); + let nonce_data = nonce_utils::data_from_account(&nonce_account) + .unwrap_or_else(|error| panic!("{:?}", error)); + nonce_data.blockhash() +} + +fn generate_nonced_system_txs( + client: Arc, + source: &[&Keypair], + dest: &VecDeque<&Keypair>, + source_nonce: &[&Keypair], + dest_nonce: &VecDeque<&Keypair>, + reclaim: bool, +) -> Vec { + let length = source.len(); + let mut transactions: Vec = Vec::with_capacity(length); + for i in 0..length { + let (from, to, nonce, nonce_blockhash) = if !reclaim { + ( + source[i], + dest[i], + source_nonce[i], + get_nonce_blockhash(client.clone(), source_nonce[i].pubkey()), + ) + } else { + ( + dest[i], + source[i], + dest_nonce[i], + get_nonce_blockhash(client.clone(), dest_nonce[i].pubkey()), + ) + }; + + transactions.push(( + system_transaction::nonced_transfer( + from, + &to.pubkey(), + 1, + &nonce.pubkey(), + from, + nonce_blockhash, + ), + None, + )); + } + transactions +} + +fn generate_txs( shared_txs: &SharedTransactions, blockhash: &Arc>, - chunk_generator: &mut TransactionChunkGenerator<'_>, + chunk_generator: &mut TransactionChunkGenerator<'_, '_, T>, threads: usize, ) { let blockhash = blockhash.read().map(|x| *x).ok(); @@ -508,14 +608,16 @@ fn do_tx_transfers( let mut min_timestamp = u64::MAX; for tx in txs0 { let now = timestamp(); - // Transactions that are too old will be rejected by the cluster Don't bother + // Transactions without durable nonce that are too old will be rejected by the cluster Don't bother // sending them. - if tx.1 < min_timestamp { - min_timestamp = tx.1; - } - if now > tx.1 && now - tx.1 > 1000 * MAX_TX_QUEUE_AGE { - old_transactions = true; - continue; + if let Some(tx_timestamp) = tx.1 { + if tx_timestamp < min_timestamp { + min_timestamp = tx_timestamp; + } + if now > tx_timestamp && now - tx_timestamp > 1000 * MAX_TX_QUEUE_AGE { + old_transactions = true; + continue; + } } transactions.push(tx.0); }