diff --git a/banking-bench/Cargo.toml b/banking-bench/Cargo.toml index a90cd06e3..4f6b60eca 100644 --- a/banking-bench/Cargo.toml +++ b/banking-bench/Cargo.toml @@ -9,7 +9,7 @@ homepage = "https://solana.com/" publish = false [dependencies] -clap = "3.1.8" +clap = { version = "3.1.8", features = ["derive"] } crossbeam-channel = "0.5" log = "0.4.14" rand = "0.7.0" diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 0bd919312..056217848 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -1,6 +1,6 @@ #![allow(clippy::integer_arithmetic)] use { - clap::{crate_description, crate_name, Arg, Command}, + clap::{crate_description, crate_name, Arg, ArgEnum, Command}, crossbeam_channel::{unbounded, Receiver}, log::*, rand::{thread_rng, Rng}, @@ -66,23 +66,55 @@ fn check_txs( no_bank } +#[derive(ArgEnum, Clone, Copy, PartialEq)] +enum WriteLockContention { + /// No transactions lock the same accounts. + None, + /// Transactions don't lock the same account, unless they belong to the same batch. + SameBatchOnly, + /// All transactions write lock the same account. + Full, +} + +impl WriteLockContention { + fn possible_values<'a>() -> impl Iterator> { + Self::value_variants() + .iter() + .filter_map(|v| v.to_possible_value()) + } +} + +impl std::str::FromStr for WriteLockContention { + type Err = String; + fn from_str(input: &str) -> Result { + ArgEnum::from_str(input, false) + } +} + fn make_accounts_txs( total_num_transactions: usize, + packets_per_batch: usize, hash: Hash, - same_payer: bool, + contention: WriteLockContention, ) -> Vec { - let to_pubkey = solana_sdk::pubkey::new_rand(); + use solana_sdk::pubkey; + let to_pubkey = pubkey::new_rand(); + let chunk_pubkeys: Vec = (0..total_num_transactions / packets_per_batch) + .map(|_| pubkey::new_rand()) + .collect(); let payer_key = Keypair::new(); let dummy = system_transaction::transfer(&payer_key, &to_pubkey, 1, hash); (0..total_num_transactions) .into_par_iter() - .map(|_| { + .map(|i| { let mut new = dummy.clone(); let sig: Vec = (0..64).map(|_| thread_rng().gen::()).collect(); - if !same_payer { - new.message.account_keys[0] = solana_sdk::pubkey::new_rand(); - } - new.message.account_keys[1] = solana_sdk::pubkey::new_rand(); + new.message.account_keys[0] = pubkey::new_rand(); + new.message.account_keys[1] = match contention { + WriteLockContention::None => pubkey::new_rand(), + WriteLockContention::SameBatchOnly => chunk_pubkeys[i / packets_per_batch], + WriteLockContention::Full => to_pubkey, + }; new.signatures = vec![Signature::new(&sig[0..64])]; new }) @@ -91,13 +123,11 @@ fn make_accounts_txs( struct Config { packets_per_batch: usize, - chunk_len: usize, - num_threads: usize, } impl Config { fn get_transactions_index(&self, chunk_index: usize) -> usize { - chunk_index * (self.chunk_len / self.num_threads) * self.packets_per_batch + chunk_index * self.packets_per_batch } } @@ -120,11 +150,11 @@ fn main() { .help("Number of transaction chunks."), ) .arg( - Arg::new("packets_per_chunk") - .long("packets-per-chunk") + Arg::new("packets_per_batch") + .long("packets-per-batch") .takes_value(true) .value_name("SIZE") - .help("Packets per chunk"), + .help("Packets per batch"), ) .arg( Arg::new("skip_sanity") @@ -133,10 +163,11 @@ fn main() { .help("Skip transaction sanity execution"), ) .arg( - Arg::new("same_payer") - .long("same-payer") - .takes_value(false) - .help("Use the same payer for transfers"), + Arg::new("write_lock_contention") + .long("write-lock-contention") + .takes_value(true) + .possible_values(WriteLockContention::possible_values()) + .help("Accounts that test transactions write lock"), ) .arg( Arg::new("iterations") @@ -145,24 +176,36 @@ fn main() { .help("Number of iterations"), ) .arg( - Arg::new("num_threads") - .long("num-threads") + Arg::new("batches_per_iteration") + .long("batches-per-iteration") .takes_value(true) - .help("Number of iterations"), + .help("Number of batches to send in each iteration"), + ) + .arg( + Arg::new("num_banking_threads") + .long("num-banking-threads") + .takes_value(true) + .help("Number of threads to use in the banking stage"), ) .get_matches(); - let num_threads = matches - .value_of_t::("num_threads") - .unwrap_or(BankingStage::num_threads() as usize); + let num_banking_threads = matches + .value_of_t::("num_banking_threads") + .unwrap_or_else(|_| BankingStage::num_threads()); // a multiple of packet chunk duplicates to avoid races let num_chunks = matches.value_of_t::("num_chunks").unwrap_or(16); - let packets_per_chunk = matches - .value_of_t::("packets_per_chunk") + let packets_per_batch = matches + .value_of_t::("packets_per_batch") .unwrap_or(192); let iterations = matches.value_of_t::("iterations").unwrap_or(1000); + let batches_per_iteration = matches + .value_of_t::("batches_per_iteration") + .unwrap_or(BankingStage::num_threads() as usize); + let write_lock_contention = matches + .value_of_t::("write_lock_contention") + .unwrap_or(WriteLockContention::None); - let total_num_transactions = num_chunks * num_threads * packets_per_chunk; + let total_num_transactions = num_chunks * packets_per_batch * batches_per_iteration; let mint_total = 1_000_000_000_000; let GenesisConfigInfo { genesis_config, @@ -183,11 +226,17 @@ fn main() { .unwrap() .set_limits(std::u64::MAX, std::u64::MAX, std::u64::MAX); - info!("threads: {} txs: {}", num_threads, total_num_transactions); + info!( + "threads: {} txs: {}", + num_banking_threads, total_num_transactions + ); - let same_payer = matches.is_present("same_payer"); - let mut transactions = - make_accounts_txs(total_num_transactions, genesis_config.hash(), same_payer); + let mut transactions = make_accounts_txs( + total_num_transactions, + packets_per_batch, + genesis_config.hash(), + write_lock_contention, + ); // fund all the accounts transactions.iter().for_each(|tx| { @@ -212,16 +261,20 @@ fn main() { assert!(res.is_ok(), "sanity test transactions error: {:?}", res); }); bank.clear_signatures(); - //sanity check, make sure all the transactions can execute in parallel - let res = bank.process_transactions(transactions.iter()); - for r in res { - assert!(r.is_ok(), "sanity parallel execution error: {:?}", r); + if write_lock_contention == WriteLockContention::None { + //sanity check, make sure all the transactions can execute in parallel + let res = bank.process_transactions(transactions.iter()); + for r in res { + assert!(r.is_ok(), "sanity parallel execution error: {:?}", r); + } + bank.clear_signatures(); } - bank.clear_signatures(); } - let mut verified: Vec<_> = to_packet_batches(&transactions, packets_per_chunk); + let mut verified: Vec<_> = to_packet_batches(&transactions, packets_per_batch); + assert_eq!(verified.len(), num_chunks * batches_per_iteration); + let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -240,19 +293,20 @@ fn main() { SocketAddrSpace::Unspecified, ); let cluster_info = Arc::new(cluster_info); - let banking_stage = BankingStage::new( + let banking_stage = BankingStage::new_num_threads( &cluster_info, &poh_recorder, verified_receiver, tpu_vote_receiver, vote_receiver, + num_banking_threads, None, replay_vote_sender, Arc::new(RwLock::new(CostModel::default())), ); poh_recorder.lock().unwrap().set_bank(&bank); - let chunk_len = verified.len() / num_chunks; + let chunk_len = batches_per_iteration; let mut start = 0; // This is so that the signal_receiver does not go out of scope after the closure. @@ -265,20 +319,13 @@ fn main() { let mut txs_processed = 0; let mut root = 1; let collector = solana_sdk::pubkey::new_rand(); - let config = Config { - packets_per_batch: packets_per_chunk, - chunk_len, - num_threads, - }; + let config = Config { packets_per_batch }; let mut total_sent = 0; for _ in 0..iterations { let now = Instant::now(); let mut sent = 0; - for (i, v) in verified[start..start + chunk_len] - .chunks(chunk_len / num_threads) - .enumerate() - { + for (i, v) in verified[start..start + chunk_len].chunks(1).enumerate() { let mut byte = 0; let index = config.get_transactions_index(start + i); if index < transactions.len() { @@ -386,7 +433,7 @@ fn main() { let sig: Vec = (0..64).map(|_| thread_rng().gen::()).collect(); tx.signatures[0] = Signature::new(&sig[0..64]); } - verified = to_packet_batches(&transactions.clone(), packets_per_chunk); + verified = to_packet_batches(&transactions.clone(), packets_per_batch); } start += chunk_len; diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 335a929e8..783e8261c 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -399,7 +399,7 @@ impl BankingStage { } #[allow(clippy::too_many_arguments)] - fn new_num_threads( + pub fn new_num_threads( cluster_info: &Arc, poh_recorder: &Arc>, verified_receiver: CrossbeamReceiver>,