diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 0562178480..e108b70af8 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -14,7 +14,7 @@ use { leader_schedule_cache::LeaderScheduleCache, }, solana_measure::measure::Measure, - solana_perf::packet::to_packet_batches, + solana_perf::packet::{to_packet_batches, PacketBatch}, solana_poh::poh_recorder::{create_test_recorder, PohRecorder, WorkingBankEntry}, solana_runtime::{ accounts_background_service::AbsRequestSender, bank::Bank, bank_forks::BankForks, @@ -55,7 +55,6 @@ fn check_txs( break; } if poh_recorder.lock().unwrap().bank().is_none() { - trace!("no bank"); no_bank = true; break; } @@ -121,18 +120,44 @@ fn make_accounts_txs( .collect() } -struct Config { +struct PacketsPerIteration { + packet_batches: Vec, + transactions: Vec, packets_per_batch: usize, } -impl Config { - fn get_transactions_index(&self, chunk_index: usize) -> usize { - chunk_index * self.packets_per_batch - } -} +impl PacketsPerIteration { + fn new( + packets_per_batch: usize, + batches_per_iteration: usize, + genesis_hash: Hash, + write_lock_contention: WriteLockContention, + ) -> Self { + let total_num_transactions = packets_per_batch * batches_per_iteration; + let transactions = make_accounts_txs( + total_num_transactions, + packets_per_batch, + genesis_hash, + write_lock_contention, + ); -fn bytes_as_usize(bytes: &[u8]) -> usize { - bytes[0] as usize | (bytes[1] as usize) << 8 + let packet_batches: Vec = to_packet_batches(&transactions, packets_per_batch); + assert_eq!(packet_batches.len(), batches_per_iteration); + Self { + packet_batches, + transactions, + packets_per_batch, + } + } + + fn refresh_blockhash(&mut self, new_blockhash: Hash) { + for tx in self.transactions.iter_mut() { + tx.message.recent_blockhash = new_blockhash; + let sig: Vec = (0..64).map(|_| thread_rng().gen::()).collect(); + tx.signatures[0] = Signature::new(&sig[0..64]); + } + self.packet_batches = to_packet_batches(&self.transactions, self.packets_per_batch); + } } #[allow(clippy::cognitive_complexity)] @@ -142,6 +167,12 @@ fn main() { let matches = Command::new(crate_name!()) .about(crate_description!()) .version(solana_version::version!()) + .arg( + Arg::new("iterations") + .long("iterations") + .takes_value(true) + .help("Number of test iterations"), + ) .arg( Arg::new("num_chunks") .long("num-chunks") @@ -169,12 +200,6 @@ fn main() { .possible_values(WriteLockContention::possible_values()) .help("Accounts that test transactions write lock"), ) - .arg( - Arg::new("iterations") - .long("iterations") - .takes_value(true) - .help("Number of iterations"), - ) .arg( Arg::new("batches_per_iteration") .long("batches-per-iteration") @@ -205,7 +230,6 @@ fn main() { .value_of_t::("write_lock_contention") .unwrap_or(WriteLockContention::None); - let total_num_transactions = num_chunks * packets_per_batch * batches_per_iteration; let mint_total = 1_000_000_000_000; let GenesisConfigInfo { genesis_config, @@ -226,55 +250,72 @@ fn main() { .unwrap() .set_limits(std::u64::MAX, std::u64::MAX, std::u64::MAX); + let mut all_packets: Vec = std::iter::from_fn(|| { + Some(PacketsPerIteration::new( + packets_per_batch, + batches_per_iteration, + genesis_config.hash(), + write_lock_contention, + )) + }) + .take(num_chunks) + .collect(); + + // fund all the accounts + let total_num_transactions: u64 = all_packets + .iter() + .map(|packets_for_single_iteration| packets_for_single_iteration.transactions.len() as u64) + .sum(); info!( "threads: {} txs: {}", num_banking_threads, total_num_transactions ); - let mut transactions = make_accounts_txs( - total_num_transactions, - packets_per_batch, - genesis_config.hash(), - write_lock_contention, - ); - - // fund all the accounts - transactions.iter().for_each(|tx| { - let mut fund = system_transaction::transfer( - &mint_keypair, - &tx.message.account_keys[0], - mint_total / total_num_transactions as u64, - genesis_config.hash(), - ); - // Ignore any pesky duplicate signature errors in the case we are using single-payer - let sig: Vec = (0..64).map(|_| thread_rng().gen::()).collect(); - fund.signatures = vec![Signature::new(&sig[0..64])]; - let x = bank.process_transaction(&fund); - x.unwrap(); + all_packets.iter().for_each(|packets_for_single_iteration| { + packets_for_single_iteration + .transactions + .iter() + .for_each(|tx| { + let mut fund = system_transaction::transfer( + &mint_keypair, + &tx.message.account_keys[0], + mint_total / total_num_transactions, + genesis_config.hash(), + ); + // Ignore any pesky duplicate signature errors in the case we are using single-payer + let sig: Vec = (0..64).map(|_| thread_rng().gen::()).collect(); + fund.signatures = vec![Signature::new(&sig[0..64])]; + bank.process_transaction(&fund).unwrap(); + }); }); let skip_sanity = matches.is_present("skip_sanity"); if !skip_sanity { - //sanity check, make sure all the transactions can execute sequentially - transactions.iter().for_each(|tx| { - let res = bank.process_transaction(tx); - assert!(res.is_ok(), "sanity test transactions error: {:?}", res); + all_packets.iter().for_each(|packets_for_single_iteration| { + //sanity check, make sure all the transactions can execute sequentially + packets_for_single_iteration + .transactions + .iter() + .for_each(|tx| { + let res = bank.process_transaction(tx); + assert!(res.is_ok(), "sanity test transactions error: {:?}", res); + }); }); bank.clear_signatures(); if write_lock_contention == WriteLockContention::None { - //sanity check, make sure all the transactions can execute in parallel - let res = bank.process_transactions(transactions.iter()); - for r in res { - assert!(r.is_ok(), "sanity parallel execution error: {:?}", r); - } - bank.clear_signatures(); + all_packets.iter().for_each(|packets_for_single_iteration| { + //sanity check, make sure all the transactions can execute in parallel + let res = + bank.process_transactions(packets_for_single_iteration.transactions.iter()); + for r in res { + assert!(r.is_ok(), "sanity parallel execution error: {:?}", r); + } + bank.clear_signatures(); + }); } } - let mut verified: Vec<_> = to_packet_batches(&transactions, packets_per_batch); - assert_eq!(verified.len(), num_chunks * batches_per_iteration); - let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -306,9 +347,6 @@ fn main() { ); poh_recorder.lock().unwrap().set_bank(&bank); - let chunk_len = batches_per_iteration; - let mut start = 0; - // This is so that the signal_receiver does not go out of scope after the closure. // If it is dropped before poh_service, then poh_service will error when // calling send() on the channel. @@ -319,36 +357,26 @@ fn main() { let mut txs_processed = 0; let mut root = 1; let collector = solana_sdk::pubkey::new_rand(); - let config = Config { packets_per_batch }; let mut total_sent = 0; - for _ in 0..iterations { + for current_iteration_index in 0..iterations { + trace!("RUNNING ITERATION {}", current_iteration_index); let now = Instant::now(); let mut sent = 0; - for (i, v) in verified[start..start + chunk_len].chunks(1).enumerate() { - let mut byte = 0; - let index = config.get_transactions_index(start + i); - if index < transactions.len() { - byte = bytes_as_usize(transactions[index].signatures[0].as_ref()); - } + let packets_for_this_iteration = &all_packets[current_iteration_index % num_chunks]; + for (packet_batch_index, packet_batch) in + packets_for_this_iteration.packet_batches.iter().enumerate() + { + sent += packet_batch.packets.len(); trace!( - "sending... {}..{} {} v.len: {} sig: {} transactions.len: {} index: {}", - start + i, - start + chunk_len, + "Sending PacketBatch index {}, {}", + packet_batch_index, timestamp(), - v.len(), - byte, - transactions.len(), - index, ); - for xv in v { - sent += xv.packets.len(); - } - verified_sender.send(v.to_vec()).unwrap(); + verified_sender.send(vec![packet_batch.clone()]).unwrap(); } - let start_tx_index = config.get_transactions_index(start); - let end_tx_index = config.get_transactions_index(start + chunk_len); - for tx in &transactions[start_tx_index..end_tx_index] { + + for tx in &packets_for_this_iteration.transactions { loop { if bank.get_signature_status(&tx.signatures[0]).is_some() { break; @@ -361,7 +389,7 @@ fn main() { } if check_txs( &signal_receiver, - total_num_transactions / num_chunks, + packets_for_this_iteration.transactions.len(), &poh_recorder, ) { debug!( @@ -370,7 +398,6 @@ fn main() { bank.transaction_count(), txs_processed ); - assert!(txs_processed < bank.transaction_count()); txs_processed = bank.transaction_count(); tx_total_us += duration_as_us(&now.elapsed()); @@ -422,22 +449,17 @@ fn main() { debug!( "time: {} us checked: {} sent: {}", duration_as_us(&now.elapsed()), - total_num_transactions / num_chunks, + total_num_transactions / num_chunks as u64, sent, ); total_sent += sent; - if bank.slot() > 0 && bank.slot() % 16 == 0 { - for tx in transactions.iter_mut() { - tx.message.recent_blockhash = bank.last_blockhash(); - let sig: Vec = (0..64).map(|_| thread_rng().gen::()).collect(); - tx.signatures[0] = Signature::new(&sig[0..64]); + if current_iteration_index % 16 == 0 { + let last_blockhash = bank.last_blockhash(); + for packets_for_single_iteration in all_packets.iter_mut() { + packets_for_single_iteration.refresh_blockhash(last_blockhash); } - verified = to_packet_batches(&transactions.clone(), packets_per_batch); } - - start += chunk_len; - start %= verified.len(); } let txs_processed = bank_forks.working_bank().transaction_count(); debug!("processed: {} base: {}", txs_processed, base_tx_count);