From 10b189535723b634a37f4d86f91445563db0f89e Mon Sep 17 00:00:00 2001 From: sakridge Date: Fri, 22 May 2020 15:01:01 -0700 Subject: [PATCH] Optimize banking processing of AccountInUse (#10154) * Optimize banking processing of AccountInUse and thread count * Add more options to banking-bench --- Cargo.lock | 3 + banking-bench/Cargo.toml | 7 +- banking-bench/src/main.rs | 142 +++++++++++++++++++++++++++++--------- core/src/banking_stage.rs | 12 +++- 4 files changed, 126 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d0059c97a5..bfb4b2857b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3687,10 +3687,12 @@ dependencies = [ name = "solana-banking-bench" version = "1.2.0" dependencies = [ + "clap", "crossbeam-channel", "log 0.4.8", "rand 0.7.3", "rayon", + "solana-clap-utils", "solana-core", "solana-ledger", "solana-logger", @@ -3699,6 +3701,7 @@ dependencies = [ "solana-runtime", "solana-sdk", "solana-streamer", + "solana-version", ] [[package]] diff --git a/banking-bench/Cargo.toml b/banking-bench/Cargo.toml index db1d4dd919..bfe924c061 100644 --- a/banking-bench/Cargo.toml +++ b/banking-bench/Cargo.toml @@ -8,9 +8,13 @@ license = "Apache-2.0" homepage = "https://solana.com/" [dependencies] +clap = "2.33.1" +crossbeam-channel = "0.4" log = "0.4.6" +rand = "0.7.0" rayon = "1.3.0" solana-core = { path = "../core", version = "1.2.0" } +solana-clap-utils = { path = "../clap-utils", version = "1.2.0" } solana-streamer = { path = "../streamer", version = "1.2.0" } solana-perf = { path = "../perf", version = "1.2.0" } solana-ledger = { path = "../ledger", version = "1.2.0" } @@ -18,8 +22,7 @@ solana-logger = { path = "../logger", version = "1.2.0" } solana-runtime = { path = "../runtime", version = "1.2.0" } solana-measure = { path = "../measure", version = "1.2.0" } solana-sdk = { path = "../sdk", version = "1.2.0" } -rand = "0.7.0" -crossbeam-channel = "0.4" +solana-version = { path = "../version", version = "1.2.0" } [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index bba9ec24da..824237da88 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -1,3 +1,4 @@ +use clap::{crate_description, crate_name, value_t, App, Arg}; use crossbeam_channel::unbounded; use log::*; use rand::{thread_rng, Rng}; @@ -64,15 +65,22 @@ fn check_txs( no_bank } -fn make_accounts_txs(txes: usize, mint_keypair: &Keypair, hash: Hash) -> Vec { +fn make_accounts_txs( + total_num_transactions: usize, + hash: Hash, + same_payer: bool, +) -> Vec { let to_pubkey = Pubkey::new_rand(); - let dummy = system_transaction::transfer(mint_keypair, &to_pubkey, 1, hash); - (0..txes) + let payer_key = Keypair::new(); + let dummy = system_transaction::transfer(&payer_key, &to_pubkey, 1, hash); + (0..total_num_transactions) .into_par_iter() .map(|_| { let mut new = dummy.clone(); let sig: Vec = (0..64).map(|_| thread_rng().gen()).collect(); - new.message.account_keys[0] = Pubkey::new_rand(); + if !same_payer { + new.message.account_keys[0] = Pubkey::new_rand(); + } new.message.account_keys[1] = Pubkey::new_rand(); new.signatures = vec![Signature::new(&sig[0..64])]; new @@ -96,13 +104,61 @@ fn bytes_as_usize(bytes: &[u8]) -> usize { bytes[0] as usize | (bytes[1] as usize) << 8 } +#[allow(clippy::cognitive_complexity)] fn main() { solana_logger::setup(); - let num_threads = BankingStage::num_threads() as usize; + + let matches = App::new(crate_name!()) + .about(crate_description!()) + .version(solana_version::version!()) + .arg( + Arg::with_name("num_chunks") + .long("num-chunks") + .takes_value(true) + .value_name("SIZE") + .help("Number of transaction chunks."), + ) + .arg( + Arg::with_name("packets_per_chunk") + .long("packets-per-chunk") + .takes_value(true) + .value_name("SIZE") + .help("Packets per chunk"), + ) + .arg( + Arg::with_name("skip_sanity") + .long("skip-sanity") + .takes_value(false) + .help("Skip transaction sanity execution"), + ) + .arg( + Arg::with_name("same_payer") + .long("same-payer") + .takes_value(false) + .help("Use the same payer for transfers"), + ) + .arg( + Arg::with_name("iterations") + .long("iterations") + .takes_value(true) + .help("Number of iterations"), + ) + .arg( + Arg::with_name("num_threads") + .long("num-threads") + .takes_value(true) + .help("Number of iterations"), + ) + .get_matches(); + + let num_threads = + value_t!(matches, "num_threads", usize).unwrap_or(BankingStage::num_threads() as usize); // a multiple of packet chunk duplicates to avoid races - const CHUNKS: usize = 8 * 2; - const PACKETS_PER_BATCH: usize = 192; - let txes = PACKETS_PER_BATCH * num_threads * CHUNKS; + let num_chunks = value_t!(matches, "num_chunks", usize).unwrap_or(16); + let packets_per_chunk = value_t!(matches, "packets_per_chunk", usize).unwrap_or(192); + let iterations = value_t!(matches, "iterations", usize).unwrap_or(1000); + + let total_num_transactions = num_chunks * num_threads * packets_per_chunk; let mint_total = 1_000_000_000_000; let GenesisConfigInfo { genesis_config, @@ -116,34 +172,44 @@ fn main() { let mut bank_forks = BankForks::new(0, bank0); let mut bank = bank_forks.working_bank(); - info!("threads: {} txs: {}", num_threads, txes); + info!("threads: {} txs: {}", num_threads, total_num_transactions); - let mut transactions = make_accounts_txs(txes, &mint_keypair, genesis_config.hash()); + let same_payer = matches.is_present("same_payer"); + let mut transactions = + make_accounts_txs(total_num_transactions, genesis_config.hash(), same_payer); // fund all the accounts transactions.iter().for_each(|tx| { - let fund = system_transaction::transfer( + let mut fund = system_transaction::transfer( &mint_keypair, &tx.message.account_keys[0], - mint_total / txes as u64, + mint_total / total_num_transactions as u64, genesis_config.hash(), ); + // Ignore any pesky duplicate signature errors in the case we are using single-payer + let sig: Vec = (0..64).map(|_| thread_rng().gen()).collect(); + fund.signatures = vec![Signature::new(&sig[0..64])]; let x = bank.process_transaction(&fund); x.unwrap(); }); - //sanity check, make sure all the transactions can execute sequentially - transactions.iter().for_each(|tx| { - let res = bank.process_transaction(&tx); - assert!(res.is_ok(), "sanity test transactions"); - }); - bank.clear_signatures(); - //sanity check, make sure all the transactions can execute in parallel - let res = bank.process_transactions(&transactions); - for r in res { - assert!(r.is_ok(), "sanity parallel execution"); + + let skip_sanity = matches.is_present("skip_sanity"); + if !skip_sanity { + //sanity check, make sure all the transactions can execute sequentially + transactions.iter().for_each(|tx| { + let res = bank.process_transaction(&tx); + assert!(res.is_ok(), "sanity test transactions error: {:?}", res); + }); + bank.clear_signatures(); + //sanity check, make sure all the transactions can execute in parallel + let res = bank.process_transactions(&transactions); + for r in res { + assert!(r.is_ok(), "sanity parallel execution error: {:?}", r); + } + bank.clear_signatures(); } - bank.clear_signatures(); - let mut verified: Vec<_> = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH); + + let mut verified: Vec<_> = to_packets_chunked(&transactions.clone(), packets_per_chunk); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -162,7 +228,7 @@ fn main() { ); poh_recorder.lock().unwrap().set_bank(&bank); - let chunk_len = verified.len() / CHUNKS; + let chunk_len = verified.len() / num_chunks; let mut start = 0; // This is so that the signal_receiver does not go out of scope after the closure. @@ -171,17 +237,17 @@ fn main() { let signal_receiver = Arc::new(signal_receiver); let mut total_us = 0; let mut tx_total_us = 0; + let base_tx_count = bank.transaction_count(); let mut txs_processed = 0; let mut root = 1; let collector = Pubkey::new_rand(); - const ITERS: usize = 1_000; let config = Config { - packets_per_batch: PACKETS_PER_BATCH, + packets_per_batch: packets_per_chunk, chunk_len, num_threads, }; let mut total_sent = 0; - for _ in 0..ITERS { + for _ in 0..iterations { let now = Instant::now(); let mut sent = 0; @@ -222,7 +288,11 @@ fn main() { sleep(Duration::from_millis(5)); } } - if check_txs(&signal_receiver, txes / CHUNKS, &poh_recorder) { + if check_txs( + &signal_receiver, + total_num_transactions / num_chunks, + &poh_recorder, + ) { debug!( "resetting bank {} tx count: {} txs_proc: {}", bank.slot(), @@ -274,7 +344,7 @@ fn main() { debug!( "time: {} us checked: {} sent: {}", duration_as_us(&now.elapsed()), - txes / CHUNKS, + total_num_transactions / num_chunks, sent, ); total_sent += sent; @@ -285,20 +355,26 @@ fn main() { let sig: Vec = (0..64).map(|_| thread_rng().gen()).collect(); tx.signatures[0] = Signature::new(&sig[0..64]); } - verified = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH); + verified = to_packets_chunked(&transactions.clone(), packets_per_chunk); } start += chunk_len; start %= verified.len(); } + let txs_processed = bank_forks.working_bank().transaction_count(); + debug!("processed: {} base: {}", txs_processed, base_tx_count); eprintln!( - "{{'name': 'banking_bench_total', 'median': '{}'}}", + "{{'name': 'banking_bench_total', 'median': '{:.2}'}}", (1000.0 * 1000.0 * total_sent as f64) / (total_us as f64), ); eprintln!( - "{{'name': 'banking_bench_tx_total', 'median': '{}'}}", + "{{'name': 'banking_bench_tx_total', 'median': '{:.2}'}}", (1000.0 * 1000.0 * total_sent as f64) / (tx_total_us as f64), ); + eprintln!( + "{{'name': 'banking_bench_success_tx_total', 'median': '{:.2}'}}", + (1000.0 * 1000.0 * (txs_processed - base_tx_count) as f64) / (total_us as f64), + ); drop(verified_sender); drop(vote_sender); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index c2ccdff6cd..4feffefbae 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -292,7 +292,7 @@ impl BankingStage { enable_forwarding: bool, batch_limit: usize, transaction_status_sender: Option, - ) { + ) -> BufferedPacketsDecision { let (leader_at_slot_offset, poh_has_bank, would_be_leader) = { let poh = poh_recorder.lock().unwrap(); ( @@ -349,6 +349,7 @@ impl BankingStage { } _ => (), } + decision } pub fn process_loop( @@ -365,8 +366,8 @@ impl BankingStage { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packets = vec![]; loop { - if !buffered_packets.is_empty() { - Self::process_buffered_packets( + while !buffered_packets.is_empty() { + let decision = Self::process_buffered_packets( &my_pubkey, &socket, poh_recorder, @@ -376,6 +377,11 @@ impl BankingStage { batch_limit, transaction_status_sender.clone(), ); + if decision == BufferedPacketsDecision::Hold { + // If we are waiting on a new bank, + // check the receiver for more transactions/for exiting + break; + } } let recv_timeout = if !buffered_packets.is_empty() {