diff --git a/bench-exchange/src/bench.rs b/bench-exchange/src/bench.rs index be540731ac..332134f4c4 100644 --- a/bench-exchange/src/bench.rs +++ b/bench-exchange/src/bench.rs @@ -20,7 +20,7 @@ use solana_sdk::client::SyncClient; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_instruction; -use solana_sdk::timing::{duration_as_ms, duration_as_ns, duration_as_s}; +use solana_sdk::timing::{duration_as_ms, duration_as_s}; use solana_sdk::transaction::Transaction; use std::cmp; use std::collections::VecDeque; @@ -92,6 +92,18 @@ where chunk_size, account_groups, } = config; + + info!( + "Exchange client: threads {} duration {} fund_amount {}", + threads, + duration_as_s(&duration), + fund_amount + ); + info!( + "Exchange client: transfer delay {} batch size {} chunk size {}", + transfer_delay, batch_size, chunk_size + ); + let accounts_in_groups = batch_size * account_groups; let exit_signal = Arc::new(AtomicBool::new(false)); let clients: Vec<_> = clients.into_iter().map(Arc::new).collect(); @@ -261,7 +273,7 @@ fn sample_txs( let mut txs = client.get_transaction_count().expect("transaction count"); if txs < last_txs { - error!("expected txs({}) >= last_txs({})", txs, last_txs); + info!("Expected txs({}) >= last_txs({})", txs, last_txs); txs = last_txs; } total_txs = txs - initial_txs; @@ -302,7 +314,6 @@ fn do_tx_transfers( ) where T: Client, { - let mut stats = Stats::default(); loop { let txs; { @@ -319,14 +330,6 @@ fn do_tx_transfers( let duration = now.elapsed(); total_txs_sent_count.fetch_add(n, Ordering::Relaxed); - stats.total += n as u64; - stats.sent_ns += duration_as_ns(&duration); - let rate = n as f32 / duration_as_s(&duration); - if rate > stats.sent_peak_rate { - stats.sent_peak_rate = rate; - } - trace!(" tx {:?} sent {:.2}/s", n, rate); - solana_metrics::submit( influxdb::Point::new("bench-exchange") .add_tag("op", influxdb::Value::String("do_tx_transfers".to_string())) @@ -339,28 +342,11 @@ fn do_tx_transfers( ); } if exit_signal.load(Ordering::Relaxed) { - info!( - " Thread Transferred {} Txs, avg {:.2}/s peak {:.2}/s", - stats.total, - (stats.total as f64 / stats.sent_ns as f64) * 1_000_000_000_f64, - stats.sent_peak_rate, - ); return; } } } -#[derive(Default)] -struct Stats { - total: u64, - keygen_ns: u64, - keygen_peak_rate: f32, - sign_ns: u64, - sign_peak_rate: f32, - sent_ns: u64, - sent_peak_rate: f32, -} - struct TradeInfo { trade_account: Pubkey, order_info: TradeOrderInfo, @@ -379,9 +365,14 @@ fn swapper( ) where T: Client, { - let mut stats = Stats::default(); let mut order_book = OrderBook::default(); let mut account_group: usize = 0; + + let mut txs = 0; + let mut total_txs = 0; + let mut now = Instant::now(); + let start_time = now; + let mut total_elapsed = start_time.elapsed(); 'outer: loop { if let Ok(trade_infos) = receiver.try_recv() { let mut tries = 0; @@ -417,9 +408,6 @@ fn swapper( } } let swaps_size = swaps.len(); - stats.total += swaps_size as u64; - - let now = Instant::now(); let mut to_swap = vec![]; let start = account_group * swaps_size as usize; @@ -432,15 +420,6 @@ fn swapper( to_swap.push((signer, swap, profit)); } account_group = (account_group + 1) % account_groups as usize; - let duration = now.elapsed(); - let rate = swaps_size as f32 / duration_as_s(&duration); - stats.keygen_ns += duration_as_ns(&duration); - if rate > stats.keygen_peak_rate { - stats.keygen_peak_rate = rate; - } - trace!("sw {:?} keypairs {:.2} /s", swaps_size, rate); - - let now = Instant::now(); let blockhash = client .get_recent_blockhash() @@ -462,14 +441,23 @@ fn swapper( ) }) .collect(); + + txs += to_swap_txs.len() as u64; + total_txs += to_swap_txs.len() as u64; + total_elapsed = start_time.elapsed(); let duration = now.elapsed(); - let n = to_swap_txs.len(); - let rate = n as f32 / duration_as_s(&duration); - stats.sign_ns += duration_as_ns(&duration); - if rate > stats.sign_peak_rate { - stats.sign_peak_rate = rate; + if duration_as_s(&duration) >= 1_f32 { + now = Instant::now(); + let tps = txs as f32 / duration_as_s(&duration); + info!( + "Swapper {:9.2} TPS, Transactions: {:6}, Total transactions: {} over {} s", + tps, + txs, + total_txs, + total_elapsed.as_secs(), + ); + txs = 0; } - trace!(" sw {:?} signed {:.2} /s ", n, rate); solana_metrics::submit( influxdb::Point::new("bench-exchange") @@ -492,18 +480,9 @@ fn swapper( } } info!( - "{} Swaps, batch size {}, chunk size {}", - stats.total, batch_size, chunk_size - ); - info!( - " Keygen avg {:.2}/s peak {:.2}/s", - (stats.total as f64 / stats.keygen_ns as f64) * 1_000_000_000_f64, - stats.keygen_peak_rate - ); - info!( - " Signed avg {:.2}/s peak {:.2}/s", - (stats.total as f64 / stats.sign_ns as f64) * 1_000_000_000_f64, - stats.sign_peak_rate + "Swapper sent {} at {:9.2} TPS", + total_txs, + total_txs as f32 / duration_as_s(&total_elapsed) ); assert_eq!( order_book.get_num_outstanding().0 + order_book.get_num_outstanding().1, @@ -518,7 +497,7 @@ fn trader( shared_txs: &SharedTransactions, signers: &[Arc], srcs: &[Pubkey], - delay: u64, + transfer_delay: u64, batch_size: usize, chunk_size: usize, account_groups: usize, @@ -526,16 +505,19 @@ fn trader( ) where T: Client, { - let mut stats = Stats::default(); - // TODO Hard coded for now let pair = TokenPair::AB; let tokens = 1; let price = 1000; let mut account_group: usize = 0; + let mut txs = 0; + let mut total_txs = 0; + let mut now = Instant::now(); + let start_time = now; + let mut total_elapsed = start_time.elapsed(); + loop { - let now = Instant::now(); let trade_keys = generate_keypairs(batch_size as u64); let mut trades = vec![]; @@ -569,20 +551,12 @@ fn trader( trades.push((signer, trade.pubkey(), direction, src)); } account_group = (account_group + 1) % account_groups as usize; - let duration = now.elapsed(); - let rate = batch_size as f32 / duration_as_s(&duration); - stats.keygen_ns += duration_as_ns(&duration); - if rate > stats.keygen_peak_rate { - stats.keygen_peak_rate = rate; - } - trace!("sw {:?} keypairs {:.2} /s", batch_size, rate); let blockhash = client .get_recent_blockhash() .expect("Failed to get blockhash"); trades.chunks(chunk_size).for_each(|chunk| { - let now = Instant::now(); let trades_txs: Vec<_> = chunk .par_iter() .map(|(signer, trade, direction, src)| { @@ -601,55 +575,57 @@ fn trader( ) }) .collect(); - let duration = now.elapsed(); - let n = trades_txs.len(); - let rate = n as f32 / duration_as_s(&duration); - stats.sign_ns += duration_as_ns(&duration); - if rate > stats.sign_peak_rate { - stats.sign_peak_rate = rate; - } - trace!(" sw {:?} signed {:.2} /s ", n, rate); - - solana_metrics::submit( - influxdb::Point::new("bench-exchange") - .add_tag("op", influxdb::Value::String("trades".to_string())) - .add_field("count", influxdb::Value::Integer(trades_txs.len() as i64)) - .to_owned(), - ); { - let mut shared_txs_wl = shared_txs - .write() - .expect("Failed to send tx to transfer threads"); - stats.total += chunk_size as u64; - shared_txs_wl.push_back(trades_txs); + txs += chunk_size as u64; + total_txs += chunk_size as u64; + total_elapsed = start_time.elapsed(); + let duration = now.elapsed(); + if duration_as_s(&duration) >= 1_f32 { + now = Instant::now(); + let tps = txs as f32 / duration_as_s(&duration); + info!( + "Trader {:9.2} TPS, Transactions: {:6}, Total transactions: {} over {} s", + tps, + txs, + total_txs, + total_elapsed.as_secs(), + ); + txs = 0; + } + + solana_metrics::submit( + influxdb::Point::new("bench-exchange") + .add_tag("op", influxdb::Value::String("trades".to_string())) + .add_field("count", influxdb::Value::Integer(trades_txs.len() as i64)) + .to_owned(), + ); + + { + let mut shared_txs_wl = shared_txs + .write() + .expect("Failed to send tx to transfer threads"); + shared_txs_wl.push_back(trades_txs); + } } - if delay > 0 { - sleep(Duration::from_millis(delay)); + if transfer_delay > 0 { + sleep(Duration::from_millis(transfer_delay)); } }); + if exit_signal.load(Ordering::Relaxed) { + info!( + "Trader sent {} at {:9.2} TPS", + total_txs, + total_txs as f32 / duration_as_s(&total_elapsed) + ); + return; + } + + // TODO chunk the trade infos and send them when the batch is sent sender .send(trade_infos) .expect("Failed to send trades to swapper"); - - if exit_signal.load(Ordering::Relaxed) { - info!( - "{} Trades with batch size {} chunk size {}", - stats.total, batch_size, chunk_size - ); - info!( - " Keygen avg {:.2}/s peak {:.2}/s", - (stats.total as f64 / stats.keygen_ns as f64) * 1_000_000_000_f64, - stats.keygen_peak_rate - ); - info!( - " Signed avg {:.2}/s peak {:.2}/s", - (stats.total as f64 / stats.sign_ns as f64) * 1_000_000_000_f64, - stats.sign_peak_rate - ); - break; - } } } @@ -1013,9 +989,9 @@ mod tests { config.threads = 1; config.duration = Duration::from_secs(1); config.fund_amount = 100_000; - config.transfer_delay = 20; + config.transfer_delay = 20; // 15 config.batch_size = 100; // 1000; - config.chunk_size = 10; // 250; + config.chunk_size = 10; // 200; config.account_groups = 1; // 10; let Config { fund_amount,