Bench exchange tweaks (#3957)

This commit is contained in:
Jack May 2019-04-23 16:48:17 -07:00 committed by GitHub
parent 0cbac26591
commit f2e2106f62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 203 additions and 176 deletions

View File

@ -34,7 +34,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
// TODO Chunk length as specified results in a bunch of failures, divide by 10 helps...
// Assume 4MB network buffers, and 512 byte packets
const CHUNK_LEN: usize = 4 * 1024 * 1024 / 512 / 10;
const FUND_CHUNK_LEN: usize = 4 * 1024 * 1024 / 512;
// Maximum system transfers per transaction
const MAX_TRANSFERS_PER_TX: u64 = 4;
@ -48,9 +48,10 @@ pub struct Config {
pub identity: Keypair,
pub threads: usize,
pub duration: Duration,
pub trade_delay: u64,
pub transfer_delay: u64,
pub fund_amount: u64,
pub batch_size: usize,
pub chunk_size: usize,
pub account_groups: usize,
}
@ -60,9 +61,10 @@ impl Default for Config {
identity: Keypair::new(),
threads: 4,
duration: Duration::new(u64::max_value(), 0),
trade_delay: 0,
transfer_delay: 0,
fund_amount: 100_000,
batch_size: 10,
chunk_size: 10,
account_groups: 100,
}
}
@ -73,9 +75,9 @@ pub struct SampleStats {
/// Maximum TPS reported by this node
pub tps: f32,
/// Total time taken for those txs
pub tx_time: Duration,
pub elapsed: Duration,
/// Total transactions reported by this node
pub tx_count: u64,
pub txs: u64,
}
pub fn do_bench_exchange<T>(clients: Vec<T>, config: Config)
@ -86,9 +88,10 @@ where
identity,
threads,
duration,
trade_delay,
transfer_delay,
fund_amount,
batch_size,
chunk_size,
account_groups,
} = config;
let accounts_in_groups = batch_size * account_groups;
@ -96,7 +99,8 @@ where
let clients: Vec<_> = clients.into_iter().map(Arc::new).collect();
let client = clients[0].as_ref();
let total_keys = accounts_in_groups as u64 * 4;
const NUM_KEYPAIR_GROUPS: u64 = 4;
let total_keys = accounts_in_groups as u64 * NUM_KEYPAIR_GROUPS;
info!("Generating {:?} keys", total_keys);
let mut keypairs = generate_keypairs(total_keys);
let trader_signers: Vec<_> = keypairs
@ -130,29 +134,20 @@ where
let sample_stats = Arc::new(RwLock::new(Vec::new()));
let sample_period = 1; // in seconds
info!("Sampling clients for tps every {} s", sample_period);
let sample_threads: Vec<_> = clients
.iter()
.map(|client| {
let exit_signal = exit_signal.clone();
let sample_stats = sample_stats.clone();
let client = client.clone();
Builder::new()
.name("solana-exchange-sample".to_string())
.spawn(move || sample_tx_count(&exit_signal, &sample_stats, sample_period, &client))
.unwrap()
})
.collect();
info!(
"Requesting and swapping trades with {} ms delay per thread...",
transfer_delay
);
let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new()));
let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0));
let total_tx_sent_count = Arc::new(AtomicUsize::new(0));
let total_txs_sent_count = Arc::new(AtomicUsize::new(0));
let s_threads: Vec<_> = (0..threads)
.map(|_| {
let exit_signal = exit_signal.clone();
let shared_txs = shared_txs.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let total_tx_sent_count = total_tx_sent_count.clone();
let total_txs_sent_count = total_txs_sent_count.clone();
let client = clients[0].clone();
Builder::new()
.name("solana-exchange-transfer".to_string())
@ -161,7 +156,7 @@ where
&exit_signal,
&shared_txs,
&shared_tx_active_thread_count,
&total_tx_sent_count,
&total_txs_sent_count,
&client,
)
})
@ -187,6 +182,7 @@ where
&swapper_signers,
&profit_pubkeys,
batch_size,
chunk_size,
account_groups,
&client,
)
@ -210,8 +206,9 @@ where
&shared_tx_active_thread_count,
&trader_signers,
&src_pubkeys,
trade_delay,
transfer_delay,
batch_size,
chunk_size,
account_groups,
&client,
)
@ -219,23 +216,43 @@ where
.unwrap()
};
info!("Requesting and swapping trades");
let sample_threads: Vec<_> = clients
.iter()
.map(|client| {
let exit_signal = exit_signal.clone();
let sample_stats = sample_stats.clone();
let client = client.clone();
Builder::new()
.name("solana-exchange-sample".to_string())
.spawn(move || sample_txs(&exit_signal, &sample_stats, sample_period, &client))
.unwrap()
})
.collect();
sleep(duration);
info!("Stopping threads");
exit_signal.store(true, Ordering::Relaxed);
info!("Wait for trader thread");
let _ = trader_thread.join();
info!("Waiting for swapper thread");
let _ = swapper_thread.join();
info!("Wait for tx threads");
for t in s_threads {
let _ = t.join();
}
info!("Wait for sample threads");
for t in sample_threads {
let _ = t.join();
}
compute_and_report_stats(&sample_stats, total_tx_sent_count.load(Ordering::Relaxed));
compute_and_report_stats(
&sample_stats,
total_txs_sent_count.load(Ordering::Relaxed) as u64,
);
}
fn sample_tx_count<T>(
fn sample_txs<T>(
exit_signal: &Arc<AtomicBool>,
sample_stats: &Arc<RwLock<Vec<SampleStats>>>,
sample_period: u64,
@ -244,49 +261,48 @@ fn sample_tx_count<T>(
T: Client,
{
let mut max_tps = 0.0;
let mut total_tx_time;
let mut total_tx_count;
let mut total_elapsed;
let mut total_txs;
let mut now = Instant::now();
let start_time = now;
let mut initial_tx_count = client.get_transaction_count().expect("transaction count");
let first_tx_count = initial_tx_count;
let initial_txs = client.get_transaction_count().expect("transaction count");
let mut last_txs = initial_txs;
loop {
let mut tx_count = client.get_transaction_count().expect("transaction count");
let duration = now.elapsed();
total_elapsed = start_time.elapsed();
let elapsed = now.elapsed();
now = Instant::now();
if tx_count < initial_tx_count {
println!(
"expected tx_count({}) >= initial_tx_count({})",
tx_count, initial_tx_count
);
tx_count = initial_tx_count;
}
let sample = tx_count - initial_tx_count;
initial_tx_count = tx_count;
let mut txs = client.get_transaction_count().expect("transaction count");
let tps = sample as f32 / duration_as_s(&duration);
if txs < last_txs {
error!("expected txs({}) >= last_txs({})", txs, last_txs);
txs = last_txs;
}
total_txs = txs - initial_txs;
let sample_txs = txs - last_txs;
last_txs = txs;
let tps = sample_txs as f32 / duration_as_s(&elapsed);
if tps > max_tps {
max_tps = tps;
}
total_tx_time = start_time.elapsed();
total_tx_count = tx_count - first_tx_count;
trace!(
info!(
"Sampler {:9.2} TPS, Transactions: {:6}, Total transactions: {} over {} s",
tps,
sample,
total_tx_count,
total_tx_time.as_secs(),
sample_txs,
total_txs,
total_elapsed.as_secs(),
);
if exit_signal.load(Ordering::Relaxed) {
let stats = SampleStats {
tps: max_tps,
tx_time: total_tx_time,
tx_count: total_tx_count,
elapsed: total_elapsed,
txs: total_txs,
};
sample_stats.write().unwrap().push(stats);
break;
return;
}
sleep(Duration::from_secs(sample_period));
}
@ -296,7 +312,7 @@ fn do_tx_transfers<T>(
exit_signal: &Arc<AtomicBool>,
shared_txs: &SharedTransactions,
shared_tx_thread_count: &Arc<AtomicIsize>,
total_tx_sent_count: &Arc<AtomicUsize>,
total_txs_sent_count: &Arc<AtomicUsize>,
client: &Arc<T>,
) where
T: Client,
@ -308,49 +324,45 @@ fn do_tx_transfers<T>(
let mut shared_txs_wl = shared_txs.write().unwrap();
txs = shared_txs_wl.pop_front();
}
match txs {
Some(txs0) => {
let n = txs0.len();
if let Some(txs0) = txs {
let n = txs0.len();
shared_tx_thread_count.fetch_add(1, Ordering::Relaxed);
let now = Instant::now();
for tx in txs0 {
client.async_send_transaction(tx).expect("Transfer");
}
let duration = now.elapsed();
shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed);
total_tx_sent_count.fetch_add(n, Ordering::Relaxed);
stats.total += n as u64;
stats.sent_ns += duration_as_ns(&duration);
let rate = n as f32 / duration_as_s(&duration);
if rate > stats.sent_peak_rate {
stats.sent_peak_rate = rate;
}
trace!(" tx {:?} sent {:.2}/s", n, rate);
solana_metrics::submit(
influxdb::Point::new("bench-exchange")
.add_tag("op", influxdb::Value::String("do_tx_transfers".to_string()))
.add_field(
"duration",
influxdb::Value::Integer(duration_as_ms(&duration) as i64),
)
.add_field("count", influxdb::Value::Integer(n as i64))
.to_owned(),
);
shared_tx_thread_count.fetch_add(1, Ordering::Relaxed);
let now = Instant::now();
for tx in txs0 {
client.async_send_transaction(tx).expect("Transfer");
}
None => {
if exit_signal.load(Ordering::Relaxed) {
info!(
" Thread Transferred {} Txs, avg {:.2}/s peak {:.2}/s",
stats.total,
(stats.total as f64 / stats.sent_ns as f64) * 1_000_000_000_f64,
stats.sent_peak_rate,
);
break;
}
let duration = now.elapsed();
shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed);
total_txs_sent_count.fetch_add(n, Ordering::Relaxed);
stats.total += n as u64;
stats.sent_ns += duration_as_ns(&duration);
let rate = n as f32 / duration_as_s(&duration);
if rate > stats.sent_peak_rate {
stats.sent_peak_rate = rate;
}
trace!(" tx {:?} sent {:.2}/s", n, rate);
solana_metrics::submit(
influxdb::Point::new("bench-exchange")
.add_tag("op", influxdb::Value::String("do_tx_transfers".to_string()))
.add_field(
"duration",
influxdb::Value::Integer(duration_as_ms(&duration) as i64),
)
.add_field("count", influxdb::Value::Integer(n as i64))
.to_owned(),
);
}
if exit_signal.load(Ordering::Relaxed) {
info!(
" Thread Transferred {} Txs, avg {:.2}/s peak {:.2}/s",
stats.total,
(stats.total as f64 / stats.sent_ns as f64) * 1_000_000_000_f64,
stats.sent_peak_rate,
);
return;
}
}
}
@ -379,6 +391,7 @@ fn swapper<T>(
signers: &[Arc<Keypair>],
profit_pubkeys: &[Pubkey],
batch_size: usize,
chunk_size: usize,
account_groups: usize,
client: &Arc<T>,
) where
@ -387,7 +400,6 @@ fn swapper<T>(
let mut stats = Stats::default();
let mut order_book = OrderBook::default();
let mut account_group: usize = 0;
let mut one_more_time = true;
let mut blockhash = client
.get_recent_blockhash()
.expect("Failed to get blockhash");
@ -401,8 +413,11 @@ fn swapper<T>(
== 0
{
tries += 1;
if tries > 10 {
debug!("Give up waiting, dump batch");
if tries > 30 {
if exit_signal.load(Ordering::Relaxed) {
break 'outer;
}
error!("Give up waiting, dump batch");
continue 'outer;
}
debug!("{} waiting for trades batch to clear", tries);
@ -493,7 +508,7 @@ fn swapper<T>(
.to_owned(),
);
let chunks: Vec<_> = to_swap_txs.chunks(CHUNK_LEN).collect();
let chunks: Vec<_> = to_swap_txs.chunks(chunk_size).collect();
{
let mut shared_txs_wl = shared_txs.write().unwrap();
for chunk in chunks {
@ -507,29 +522,27 @@ fn swapper<T>(
}
if exit_signal.load(Ordering::Relaxed) {
if !one_more_time {
info!("{} Swaps with batch size {}", stats.total, batch_size);
info!(
" Keygen avg {:.2}/s peak {:.2}/s",
(stats.total as f64 / stats.keygen_ns as f64) * 1_000_000_000_f64,
stats.keygen_peak_rate
);
info!(
" Signed avg {:.2}/s peak {:.2}/s",
(stats.total as f64 / stats.sign_ns as f64) * 1_000_000_000_f64,
stats.sign_peak_rate
);
assert_eq!(
order_book.get_num_outstanding().0 + order_book.get_num_outstanding().1,
0
);
break;
}
// Grab any outstanding trades
sleep(Duration::from_secs(2));
one_more_time = false;
break 'outer;
}
}
info!(
"{} Swaps, batch size {}, chunk size {}",
stats.total, batch_size, chunk_size
);
info!(
" Keygen avg {:.2}/s peak {:.2}/s",
(stats.total as f64 / stats.keygen_ns as f64) * 1_000_000_000_f64,
stats.keygen_peak_rate
);
info!(
" Signed avg {:.2}/s peak {:.2}/s",
(stats.total as f64 / stats.sign_ns as f64) * 1_000_000_000_f64,
stats.sign_peak_rate
);
assert_eq!(
order_book.get_num_outstanding().0 + order_book.get_num_outstanding().1,
0
);
}
#[allow(clippy::too_many_arguments)]
@ -542,6 +555,7 @@ fn trader<T>(
srcs: &[Pubkey],
delay: u64,
batch_size: usize,
chunk_size: usize,
account_groups: usize,
client: &Arc<T>,
) where
@ -563,8 +577,6 @@ fn trader<T>(
let now = Instant::now();
let trade_keys = generate_keypairs(batch_size as u64);
stats.total += batch_size as u64;
let mut trades = vec![];
let mut trade_infos = vec![];
let start = account_group * batch_size as usize;
@ -604,7 +616,7 @@ fn trader<T>(
}
trace!("sw {:?} keypairs {:.2} /s", batch_size, rate);
trades.chunks(CHUNK_LEN).for_each(|chunk| {
trades.chunks(chunk_size).for_each(|chunk| {
let now = Instant::now();
// Don't get a blockhash every time
@ -654,16 +666,13 @@ fn trader<T>(
.to_owned(),
);
let chunks: Vec<_> = trades_txs.chunks(CHUNK_LEN).collect();
{
let mut shared_txs_wl = shared_txs
.write()
.expect("Failed to send tx to transfer threads");
for chunk in chunks {
shared_txs_wl.push_back(chunk.to_vec());
}
stats.total += chunk_size as u64;
shared_txs_wl.push_back(trades_txs);
}
if delay > 0 {
sleep(Duration::from_millis(delay));
}
@ -678,7 +687,10 @@ fn trader<T>(
}
if exit_signal.load(Ordering::Relaxed) {
info!("{} Trades with batch size {}", stats.total, batch_size);
info!(
"{} Trades with batch size {} chunk size {}",
stats.total, batch_size, chunk_size
);
info!(
" Keygen avg {:.2}/s peak {:.2}/s",
(stats.total as f64 / stats.keygen_ns as f64) * 1_000_000_000_f64,
@ -754,7 +766,7 @@ pub fn fund_keys(client: &Client, source: &Keypair, dests: &[Arc<Keypair>], lamp
}
}
to_fund.chunks(CHUNK_LEN).for_each(|chunk| {
to_fund.chunks(FUND_CHUNK_LEN).for_each(|chunk| {
#[allow(clippy::clone_double_ref)] // sigh
let mut to_fund_txs: Vec<_> = chunk
.par_iter()
@ -833,7 +845,7 @@ pub fn create_token_accounts(client: &Client, signers: &[Arc<Keypair>], accounts
let mut notfunded: Vec<(&Arc<Keypair>, &Pubkey)> = signers.iter().zip(accounts).collect();
while !notfunded.is_empty() {
notfunded.chunks(CHUNK_LEN).for_each(|chunk| {
notfunded.chunks(FUND_CHUNK_LEN).for_each(|chunk| {
let mut to_create_txs: Vec<_> = chunk
.par_iter()
.map(|(signer, new)| {
@ -912,47 +924,43 @@ pub fn create_token_accounts(client: &Client, signers: &[Arc<Keypair>], accounts
}
}
fn compute_and_report_stats(maxes: &Arc<RwLock<Vec<(SampleStats)>>>, total_tx_send_count: usize) {
let mut max_tx_count = 0;
let mut max_tx_time = Duration::new(0, 0);
fn compute_and_report_stats(maxes: &Arc<RwLock<Vec<(SampleStats)>>>, total_txs_sent: u64) {
let mut max_txs = 0;
let mut max_elapsed = Duration::new(0, 0);
info!("| Max TPS | Total Transactions");
info!("+---------------+--------------------");
for stats in maxes.read().unwrap().iter() {
let maybe_flag = match stats.tx_count {
let maybe_flag = match stats.txs {
0 => "!!!!!",
_ => "",
};
info!("| {:13.2} | {} {}", stats.tps, stats.tx_count, maybe_flag);
info!("| {:13.2} | {} {}", stats.tps, stats.txs, maybe_flag);
if stats.tx_time > max_tx_time {
max_tx_time = stats.tx_time;
if stats.elapsed > max_elapsed {
max_elapsed = stats.elapsed;
}
if stats.tx_count > max_tx_count {
max_tx_count = stats.tx_count;
if stats.txs > max_txs {
max_txs = stats.txs;
}
}
info!("+---------------+--------------------");
if max_tx_count > total_tx_send_count as u64 {
error!(
"{} more transactions sampled ({}) then were sent ({})",
max_tx_count - total_tx_send_count as u64,
max_tx_count,
total_tx_send_count
);
} else {
if max_txs >= total_txs_sent {
info!(
"{} txs dropped ({:.2}%)",
total_tx_send_count as u64 - max_tx_count,
(total_tx_send_count as u64 - max_tx_count) as f64 / total_tx_send_count as f64
* 100_f64
"Warning: Average TPS might be under reported, there were no txs sent for a portion of the duration"
);
max_txs = total_txs_sent;
}
info!(
"\tAverage TPS: {}",
max_tx_count as f32 / max_tx_time.as_secs() as f32
"{} txs outstanding when test ended (lag) ({:.2}%)",
total_txs_sent - max_txs,
(total_txs_sent - max_txs) as f64 / total_txs_sent as f64 * 100_f64
);
info!(
"\tAverage TPS: {:.2}",
max_txs as f32 / max_elapsed.as_secs() as f32
);
}
@ -1055,12 +1063,13 @@ mod tests {
let mut config = Config::default();
config.identity = Keypair::new();
config.threads = 4;
config.duration = Duration::from_secs(5);
config.threads = 1;
config.duration = Duration::from_secs(30);
config.fund_amount = 100_000;
config.trade_delay = 0;
config.batch_size = 10;
config.account_groups = 100;
config.transfer_delay = 40;
config.batch_size = 1000;
config.chunk_size = 250;
config.account_groups = 10;
let Config {
fund_amount,
batch_size,
@ -1105,11 +1114,12 @@ mod tests {
exit(1);
}
const NUM_SIGNERS: u64 = 2;
airdrop_lamports(
&clients[0],
&drone_addr,
&config.identity,
fund_amount * (accounts_in_groups + 1) as u64 * 2,
fund_amount * (accounts_in_groups + 1) as u64 * NUM_SIGNERS,
);
do_bench_exchange(clients, config);
@ -1125,12 +1135,13 @@ mod tests {
let mut config = Config::default();
config.identity = identity;
config.threads = 4;
config.duration = Duration::from_secs(5);
config.threads = 1;
config.duration = Duration::from_secs(30);
config.fund_amount = 100_000;
config.trade_delay = 1;
config.batch_size = 10;
config.account_groups = 100;
config.transfer_delay = 0;
config.batch_size = 1000;
config.chunk_size = 500;
config.account_groups = 10;
do_bench_exchange(clients, config);
}

View File

@ -13,9 +13,10 @@ pub struct Config {
pub threads: usize,
pub num_nodes: usize,
pub duration: Duration,
pub trade_delay: u64,
pub transfer_delay: u64,
pub fund_amount: u64,
pub batch_size: usize,
pub chunk_size: usize,
pub account_groups: usize,
}
@ -28,9 +29,10 @@ impl Default for Config {
num_nodes: 1,
threads: 4,
duration: Duration::new(u64::max_value(), 0),
trade_delay: 0,
transfer_delay: 0,
fund_amount: 100_000,
batch_size: 100,
chunk_size: 100,
account_groups: 100,
}
}
@ -74,7 +76,7 @@ pub fn build_args<'a, 'b>() -> App<'a, 'b> {
.value_name("<threads>")
.takes_value(true)
.required(false)
.default_value("4")
.default_value("1")
.help("Number of threads submitting transactions"),
)
.arg(
@ -95,13 +97,13 @@ pub fn build_args<'a, 'b>() -> App<'a, 'b> {
.help("Seconds to run benchmark, then exit; default is forever"),
)
.arg(
Arg::with_name("trade-delay")
.long("trade-delay")
Arg::with_name("transfer-delay")
.long("transfer-delay")
.value_name("<delay>")
.takes_value(true)
.required(false)
.default_value("0")
.help("Delay between trade requests in milliseconds"),
.help("Delay between each chunk"),
)
.arg(
Arg::with_name("fund-amount")
@ -119,7 +121,16 @@ pub fn build_args<'a, 'b>() -> App<'a, 'b> {
.takes_value(true)
.required(false)
.default_value("1000")
.help("Number of bulk trades to submit between trade delays"),
.help("Number of transactions before the signer rolls over"),
)
.arg(
Arg::with_name("chunk-size")
.long("chunk-size")
.value_name("<cunk>")
.takes_value(true)
.required(false)
.default_value("500")
.help("Number of transactions to generate and send at a time"),
)
.arg(
Arg::with_name("account-groups")
@ -127,7 +138,7 @@ pub fn build_args<'a, 'b>() -> App<'a, 'b> {
.value_name("<groups>")
.takes_value(true)
.required(false)
.default_value("100")
.default_value("10")
.help("Number of account groups to cycle for each batch"),
)
}
@ -161,12 +172,14 @@ pub fn extract_args<'a>(matches: &ArgMatches<'a>) -> Config {
value_t!(matches.value_of("num-nodes"), usize).expect("Failed to parse num-nodes");
let duration = value_t!(matches.value_of("duration"), u64).expect("Failed to parse duration");
args.duration = Duration::from_secs(duration);
args.trade_delay =
value_t!(matches.value_of("trade-delay"), u64).expect("Failed to parse trade-delay");
args.transfer_delay =
value_t!(matches.value_of("transfer-delay"), u64).expect("Failed to parse transfer-delay");
args.fund_amount =
value_t!(matches.value_of("fund-amount"), u64).expect("Failed to parse fund-amount");
args.batch_size =
value_t!(matches.value_of("batch-size"), usize).expect("Failed to parse batch-size");
args.chunk_size =
value_t!(matches.value_of("chunk-size"), usize).expect("Failed to parse chunk-size");
args.account_groups = value_t!(matches.value_of("account-groups"), usize)
.expect("Failed to parse account-groups");

View File

@ -21,9 +21,10 @@ fn main() {
threads,
num_nodes,
duration,
trade_delay,
transfer_delay,
fund_amount,
batch_size,
chunk_size,
account_groups,
..
} = cli_config;
@ -43,20 +44,22 @@ fn main() {
info!("Funding keypair: {}", identity.pubkey());
let accounts_in_groups = batch_size * account_groups;
const NUM_SIGNERS: u64 = 2;
airdrop_lamports(
&clients[0],
&drone_addr,
&identity,
fund_amount * (accounts_in_groups + 1) as u64 * 2,
fund_amount * (accounts_in_groups + 1) as u64 * NUM_SIGNERS,
);
let config = Config {
identity,
threads,
duration,
trade_delay,
transfer_delay,
fund_amount,
batch_size,
chunk_size,
account_groups,
};