Optimize banking processing of AccountInUse (#10154)
* Optimize banking processing of AccountInUse and thread count * Add more options to banking-bench
This commit is contained in:
parent
f1e932c90a
commit
10b1895357
|
@ -3687,10 +3687,12 @@ dependencies = [
|
|||
name = "solana-banking-bench"
|
||||
version = "1.2.0"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"crossbeam-channel",
|
||||
"log 0.4.8",
|
||||
"rand 0.7.3",
|
||||
"rayon",
|
||||
"solana-clap-utils",
|
||||
"solana-core",
|
||||
"solana-ledger",
|
||||
"solana-logger",
|
||||
|
@ -3699,6 +3701,7 @@ dependencies = [
|
|||
"solana-runtime",
|
||||
"solana-sdk",
|
||||
"solana-streamer",
|
||||
"solana-version",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
@ -8,9 +8,13 @@ license = "Apache-2.0"
|
|||
homepage = "https://solana.com/"
|
||||
|
||||
[dependencies]
|
||||
clap = "2.33.1"
|
||||
crossbeam-channel = "0.4"
|
||||
log = "0.4.6"
|
||||
rand = "0.7.0"
|
||||
rayon = "1.3.0"
|
||||
solana-core = { path = "../core", version = "1.2.0" }
|
||||
solana-clap-utils = { path = "../clap-utils", version = "1.2.0" }
|
||||
solana-streamer = { path = "../streamer", version = "1.2.0" }
|
||||
solana-perf = { path = "../perf", version = "1.2.0" }
|
||||
solana-ledger = { path = "../ledger", version = "1.2.0" }
|
||||
|
@ -18,8 +22,7 @@ solana-logger = { path = "../logger", version = "1.2.0" }
|
|||
solana-runtime = { path = "../runtime", version = "1.2.0" }
|
||||
solana-measure = { path = "../measure", version = "1.2.0" }
|
||||
solana-sdk = { path = "../sdk", version = "1.2.0" }
|
||||
rand = "0.7.0"
|
||||
crossbeam-channel = "0.4"
|
||||
solana-version = { path = "../version", version = "1.2.0" }
|
||||
|
||||
[package.metadata.docs.rs]
|
||||
targets = ["x86_64-unknown-linux-gnu"]
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use clap::{crate_description, crate_name, value_t, App, Arg};
|
||||
use crossbeam_channel::unbounded;
|
||||
use log::*;
|
||||
use rand::{thread_rng, Rng};
|
||||
|
@ -64,15 +65,22 @@ fn check_txs(
|
|||
no_bank
|
||||
}
|
||||
|
||||
fn make_accounts_txs(txes: usize, mint_keypair: &Keypair, hash: Hash) -> Vec<Transaction> {
|
||||
fn make_accounts_txs(
|
||||
total_num_transactions: usize,
|
||||
hash: Hash,
|
||||
same_payer: bool,
|
||||
) -> Vec<Transaction> {
|
||||
let to_pubkey = Pubkey::new_rand();
|
||||
let dummy = system_transaction::transfer(mint_keypair, &to_pubkey, 1, hash);
|
||||
(0..txes)
|
||||
let payer_key = Keypair::new();
|
||||
let dummy = system_transaction::transfer(&payer_key, &to_pubkey, 1, hash);
|
||||
(0..total_num_transactions)
|
||||
.into_par_iter()
|
||||
.map(|_| {
|
||||
let mut new = dummy.clone();
|
||||
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
|
||||
new.message.account_keys[0] = Pubkey::new_rand();
|
||||
if !same_payer {
|
||||
new.message.account_keys[0] = Pubkey::new_rand();
|
||||
}
|
||||
new.message.account_keys[1] = Pubkey::new_rand();
|
||||
new.signatures = vec![Signature::new(&sig[0..64])];
|
||||
new
|
||||
|
@ -96,13 +104,61 @@ fn bytes_as_usize(bytes: &[u8]) -> usize {
|
|||
bytes[0] as usize | (bytes[1] as usize) << 8
|
||||
}
|
||||
|
||||
#[allow(clippy::cognitive_complexity)]
|
||||
fn main() {
|
||||
solana_logger::setup();
|
||||
let num_threads = BankingStage::num_threads() as usize;
|
||||
|
||||
let matches = App::new(crate_name!())
|
||||
.about(crate_description!())
|
||||
.version(solana_version::version!())
|
||||
.arg(
|
||||
Arg::with_name("num_chunks")
|
||||
.long("num-chunks")
|
||||
.takes_value(true)
|
||||
.value_name("SIZE")
|
||||
.help("Number of transaction chunks."),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("packets_per_chunk")
|
||||
.long("packets-per-chunk")
|
||||
.takes_value(true)
|
||||
.value_name("SIZE")
|
||||
.help("Packets per chunk"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("skip_sanity")
|
||||
.long("skip-sanity")
|
||||
.takes_value(false)
|
||||
.help("Skip transaction sanity execution"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("same_payer")
|
||||
.long("same-payer")
|
||||
.takes_value(false)
|
||||
.help("Use the same payer for transfers"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("iterations")
|
||||
.long("iterations")
|
||||
.takes_value(true)
|
||||
.help("Number of iterations"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("num_threads")
|
||||
.long("num-threads")
|
||||
.takes_value(true)
|
||||
.help("Number of iterations"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
let num_threads =
|
||||
value_t!(matches, "num_threads", usize).unwrap_or(BankingStage::num_threads() as usize);
|
||||
// a multiple of packet chunk duplicates to avoid races
|
||||
const CHUNKS: usize = 8 * 2;
|
||||
const PACKETS_PER_BATCH: usize = 192;
|
||||
let txes = PACKETS_PER_BATCH * num_threads * CHUNKS;
|
||||
let num_chunks = value_t!(matches, "num_chunks", usize).unwrap_or(16);
|
||||
let packets_per_chunk = value_t!(matches, "packets_per_chunk", usize).unwrap_or(192);
|
||||
let iterations = value_t!(matches, "iterations", usize).unwrap_or(1000);
|
||||
|
||||
let total_num_transactions = num_chunks * num_threads * packets_per_chunk;
|
||||
let mint_total = 1_000_000_000_000;
|
||||
let GenesisConfigInfo {
|
||||
genesis_config,
|
||||
|
@ -116,34 +172,44 @@ fn main() {
|
|||
let mut bank_forks = BankForks::new(0, bank0);
|
||||
let mut bank = bank_forks.working_bank();
|
||||
|
||||
info!("threads: {} txs: {}", num_threads, txes);
|
||||
info!("threads: {} txs: {}", num_threads, total_num_transactions);
|
||||
|
||||
let mut transactions = make_accounts_txs(txes, &mint_keypair, genesis_config.hash());
|
||||
let same_payer = matches.is_present("same_payer");
|
||||
let mut transactions =
|
||||
make_accounts_txs(total_num_transactions, genesis_config.hash(), same_payer);
|
||||
|
||||
// fund all the accounts
|
||||
transactions.iter().for_each(|tx| {
|
||||
let fund = system_transaction::transfer(
|
||||
let mut fund = system_transaction::transfer(
|
||||
&mint_keypair,
|
||||
&tx.message.account_keys[0],
|
||||
mint_total / txes as u64,
|
||||
mint_total / total_num_transactions as u64,
|
||||
genesis_config.hash(),
|
||||
);
|
||||
// Ignore any pesky duplicate signature errors in the case we are using single-payer
|
||||
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
|
||||
fund.signatures = vec![Signature::new(&sig[0..64])];
|
||||
let x = bank.process_transaction(&fund);
|
||||
x.unwrap();
|
||||
});
|
||||
//sanity check, make sure all the transactions can execute sequentially
|
||||
transactions.iter().for_each(|tx| {
|
||||
let res = bank.process_transaction(&tx);
|
||||
assert!(res.is_ok(), "sanity test transactions");
|
||||
});
|
||||
bank.clear_signatures();
|
||||
//sanity check, make sure all the transactions can execute in parallel
|
||||
let res = bank.process_transactions(&transactions);
|
||||
for r in res {
|
||||
assert!(r.is_ok(), "sanity parallel execution");
|
||||
|
||||
let skip_sanity = matches.is_present("skip_sanity");
|
||||
if !skip_sanity {
|
||||
//sanity check, make sure all the transactions can execute sequentially
|
||||
transactions.iter().for_each(|tx| {
|
||||
let res = bank.process_transaction(&tx);
|
||||
assert!(res.is_ok(), "sanity test transactions error: {:?}", res);
|
||||
});
|
||||
bank.clear_signatures();
|
||||
//sanity check, make sure all the transactions can execute in parallel
|
||||
let res = bank.process_transactions(&transactions);
|
||||
for r in res {
|
||||
assert!(r.is_ok(), "sanity parallel execution error: {:?}", r);
|
||||
}
|
||||
bank.clear_signatures();
|
||||
}
|
||||
bank.clear_signatures();
|
||||
let mut verified: Vec<_> = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH);
|
||||
|
||||
let mut verified: Vec<_> = to_packets_chunked(&transactions.clone(), packets_per_chunk);
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let blockstore = Arc::new(
|
||||
|
@ -162,7 +228,7 @@ fn main() {
|
|||
);
|
||||
poh_recorder.lock().unwrap().set_bank(&bank);
|
||||
|
||||
let chunk_len = verified.len() / CHUNKS;
|
||||
let chunk_len = verified.len() / num_chunks;
|
||||
let mut start = 0;
|
||||
|
||||
// This is so that the signal_receiver does not go out of scope after the closure.
|
||||
|
@ -171,17 +237,17 @@ fn main() {
|
|||
let signal_receiver = Arc::new(signal_receiver);
|
||||
let mut total_us = 0;
|
||||
let mut tx_total_us = 0;
|
||||
let base_tx_count = bank.transaction_count();
|
||||
let mut txs_processed = 0;
|
||||
let mut root = 1;
|
||||
let collector = Pubkey::new_rand();
|
||||
const ITERS: usize = 1_000;
|
||||
let config = Config {
|
||||
packets_per_batch: PACKETS_PER_BATCH,
|
||||
packets_per_batch: packets_per_chunk,
|
||||
chunk_len,
|
||||
num_threads,
|
||||
};
|
||||
let mut total_sent = 0;
|
||||
for _ in 0..ITERS {
|
||||
for _ in 0..iterations {
|
||||
let now = Instant::now();
|
||||
let mut sent = 0;
|
||||
|
||||
|
@ -222,7 +288,11 @@ fn main() {
|
|||
sleep(Duration::from_millis(5));
|
||||
}
|
||||
}
|
||||
if check_txs(&signal_receiver, txes / CHUNKS, &poh_recorder) {
|
||||
if check_txs(
|
||||
&signal_receiver,
|
||||
total_num_transactions / num_chunks,
|
||||
&poh_recorder,
|
||||
) {
|
||||
debug!(
|
||||
"resetting bank {} tx count: {} txs_proc: {}",
|
||||
bank.slot(),
|
||||
|
@ -274,7 +344,7 @@ fn main() {
|
|||
debug!(
|
||||
"time: {} us checked: {} sent: {}",
|
||||
duration_as_us(&now.elapsed()),
|
||||
txes / CHUNKS,
|
||||
total_num_transactions / num_chunks,
|
||||
sent,
|
||||
);
|
||||
total_sent += sent;
|
||||
|
@ -285,20 +355,26 @@ fn main() {
|
|||
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
|
||||
tx.signatures[0] = Signature::new(&sig[0..64]);
|
||||
}
|
||||
verified = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH);
|
||||
verified = to_packets_chunked(&transactions.clone(), packets_per_chunk);
|
||||
}
|
||||
|
||||
start += chunk_len;
|
||||
start %= verified.len();
|
||||
}
|
||||
let txs_processed = bank_forks.working_bank().transaction_count();
|
||||
debug!("processed: {} base: {}", txs_processed, base_tx_count);
|
||||
eprintln!(
|
||||
"{{'name': 'banking_bench_total', 'median': '{}'}}",
|
||||
"{{'name': 'banking_bench_total', 'median': '{:.2}'}}",
|
||||
(1000.0 * 1000.0 * total_sent as f64) / (total_us as f64),
|
||||
);
|
||||
eprintln!(
|
||||
"{{'name': 'banking_bench_tx_total', 'median': '{}'}}",
|
||||
"{{'name': 'banking_bench_tx_total', 'median': '{:.2}'}}",
|
||||
(1000.0 * 1000.0 * total_sent as f64) / (tx_total_us as f64),
|
||||
);
|
||||
eprintln!(
|
||||
"{{'name': 'banking_bench_success_tx_total', 'median': '{:.2}'}}",
|
||||
(1000.0 * 1000.0 * (txs_processed - base_tx_count) as f64) / (total_us as f64),
|
||||
);
|
||||
|
||||
drop(verified_sender);
|
||||
drop(vote_sender);
|
||||
|
|
|
@ -292,7 +292,7 @@ impl BankingStage {
|
|||
enable_forwarding: bool,
|
||||
batch_limit: usize,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
) {
|
||||
) -> BufferedPacketsDecision {
|
||||
let (leader_at_slot_offset, poh_has_bank, would_be_leader) = {
|
||||
let poh = poh_recorder.lock().unwrap();
|
||||
(
|
||||
|
@ -349,6 +349,7 @@ impl BankingStage {
|
|||
}
|
||||
_ => (),
|
||||
}
|
||||
decision
|
||||
}
|
||||
|
||||
pub fn process_loop(
|
||||
|
@ -365,8 +366,8 @@ impl BankingStage {
|
|||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let mut buffered_packets = vec![];
|
||||
loop {
|
||||
if !buffered_packets.is_empty() {
|
||||
Self::process_buffered_packets(
|
||||
while !buffered_packets.is_empty() {
|
||||
let decision = Self::process_buffered_packets(
|
||||
&my_pubkey,
|
||||
&socket,
|
||||
poh_recorder,
|
||||
|
@ -376,6 +377,11 @@ impl BankingStage {
|
|||
batch_limit,
|
||||
transaction_status_sender.clone(),
|
||||
);
|
||||
if decision == BufferedPacketsDecision::Hold {
|
||||
// If we are waiting on a new bank,
|
||||
// check the receiver for more transactions/for exiting
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let recv_timeout = if !buffered_packets.is_empty() {
|
||||
|
|
Loading…
Reference in New Issue