Cleanup bench-exchange messages (#4093)

This commit is contained in:
Jack May 2019-04-30 23:09:33 -07:00 committed by GitHub
parent 9add8d0afc
commit ad27c30623
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 89 additions and 113 deletions

View File

@ -20,7 +20,7 @@ use solana_sdk::client::SyncClient;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_instruction; use solana_sdk::system_instruction;
use solana_sdk::timing::{duration_as_ms, duration_as_ns, duration_as_s}; use solana_sdk::timing::{duration_as_ms, duration_as_s};
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use std::cmp; use std::cmp;
use std::collections::VecDeque; use std::collections::VecDeque;
@ -92,6 +92,18 @@ where
chunk_size, chunk_size,
account_groups, account_groups,
} = config; } = config;
info!(
"Exchange client: threads {} duration {} fund_amount {}",
threads,
duration_as_s(&duration),
fund_amount
);
info!(
"Exchange client: transfer delay {} batch size {} chunk size {}",
transfer_delay, batch_size, chunk_size
);
let accounts_in_groups = batch_size * account_groups; let accounts_in_groups = batch_size * account_groups;
let exit_signal = Arc::new(AtomicBool::new(false)); let exit_signal = Arc::new(AtomicBool::new(false));
let clients: Vec<_> = clients.into_iter().map(Arc::new).collect(); let clients: Vec<_> = clients.into_iter().map(Arc::new).collect();
@ -261,7 +273,7 @@ fn sample_txs<T>(
let mut txs = client.get_transaction_count().expect("transaction count"); let mut txs = client.get_transaction_count().expect("transaction count");
if txs < last_txs { if txs < last_txs {
error!("expected txs({}) >= last_txs({})", txs, last_txs); info!("Expected txs({}) >= last_txs({})", txs, last_txs);
txs = last_txs; txs = last_txs;
} }
total_txs = txs - initial_txs; total_txs = txs - initial_txs;
@ -302,7 +314,6 @@ fn do_tx_transfers<T>(
) where ) where
T: Client, T: Client,
{ {
let mut stats = Stats::default();
loop { loop {
let txs; let txs;
{ {
@ -319,14 +330,6 @@ fn do_tx_transfers<T>(
let duration = now.elapsed(); let duration = now.elapsed();
total_txs_sent_count.fetch_add(n, Ordering::Relaxed); total_txs_sent_count.fetch_add(n, Ordering::Relaxed);
stats.total += n as u64;
stats.sent_ns += duration_as_ns(&duration);
let rate = n as f32 / duration_as_s(&duration);
if rate > stats.sent_peak_rate {
stats.sent_peak_rate = rate;
}
trace!(" tx {:?} sent {:.2}/s", n, rate);
solana_metrics::submit( solana_metrics::submit(
influxdb::Point::new("bench-exchange") influxdb::Point::new("bench-exchange")
.add_tag("op", influxdb::Value::String("do_tx_transfers".to_string())) .add_tag("op", influxdb::Value::String("do_tx_transfers".to_string()))
@ -339,28 +342,11 @@ fn do_tx_transfers<T>(
); );
} }
if exit_signal.load(Ordering::Relaxed) { if exit_signal.load(Ordering::Relaxed) {
info!(
" Thread Transferred {} Txs, avg {:.2}/s peak {:.2}/s",
stats.total,
(stats.total as f64 / stats.sent_ns as f64) * 1_000_000_000_f64,
stats.sent_peak_rate,
);
return; return;
} }
} }
} }
#[derive(Default)]
struct Stats {
total: u64,
keygen_ns: u64,
keygen_peak_rate: f32,
sign_ns: u64,
sign_peak_rate: f32,
sent_ns: u64,
sent_peak_rate: f32,
}
struct TradeInfo { struct TradeInfo {
trade_account: Pubkey, trade_account: Pubkey,
order_info: TradeOrderInfo, order_info: TradeOrderInfo,
@ -379,9 +365,14 @@ fn swapper<T>(
) where ) where
T: Client, T: Client,
{ {
let mut stats = Stats::default();
let mut order_book = OrderBook::default(); let mut order_book = OrderBook::default();
let mut account_group: usize = 0; let mut account_group: usize = 0;
let mut txs = 0;
let mut total_txs = 0;
let mut now = Instant::now();
let start_time = now;
let mut total_elapsed = start_time.elapsed();
'outer: loop { 'outer: loop {
if let Ok(trade_infos) = receiver.try_recv() { if let Ok(trade_infos) = receiver.try_recv() {
let mut tries = 0; let mut tries = 0;
@ -417,9 +408,6 @@ fn swapper<T>(
} }
} }
let swaps_size = swaps.len(); let swaps_size = swaps.len();
stats.total += swaps_size as u64;
let now = Instant::now();
let mut to_swap = vec![]; let mut to_swap = vec![];
let start = account_group * swaps_size as usize; let start = account_group * swaps_size as usize;
@ -432,15 +420,6 @@ fn swapper<T>(
to_swap.push((signer, swap, profit)); to_swap.push((signer, swap, profit));
} }
account_group = (account_group + 1) % account_groups as usize; account_group = (account_group + 1) % account_groups as usize;
let duration = now.elapsed();
let rate = swaps_size as f32 / duration_as_s(&duration);
stats.keygen_ns += duration_as_ns(&duration);
if rate > stats.keygen_peak_rate {
stats.keygen_peak_rate = rate;
}
trace!("sw {:?} keypairs {:.2} /s", swaps_size, rate);
let now = Instant::now();
let blockhash = client let blockhash = client
.get_recent_blockhash() .get_recent_blockhash()
@ -462,14 +441,23 @@ fn swapper<T>(
) )
}) })
.collect(); .collect();
txs += to_swap_txs.len() as u64;
total_txs += to_swap_txs.len() as u64;
total_elapsed = start_time.elapsed();
let duration = now.elapsed(); let duration = now.elapsed();
let n = to_swap_txs.len(); if duration_as_s(&duration) >= 1_f32 {
let rate = n as f32 / duration_as_s(&duration); now = Instant::now();
stats.sign_ns += duration_as_ns(&duration); let tps = txs as f32 / duration_as_s(&duration);
if rate > stats.sign_peak_rate { info!(
stats.sign_peak_rate = rate; "Swapper {:9.2} TPS, Transactions: {:6}, Total transactions: {} over {} s",
tps,
txs,
total_txs,
total_elapsed.as_secs(),
);
txs = 0;
} }
trace!(" sw {:?} signed {:.2} /s ", n, rate);
solana_metrics::submit( solana_metrics::submit(
influxdb::Point::new("bench-exchange") influxdb::Point::new("bench-exchange")
@ -492,18 +480,9 @@ fn swapper<T>(
} }
} }
info!( info!(
"{} Swaps, batch size {}, chunk size {}", "Swapper sent {} at {:9.2} TPS",
stats.total, batch_size, chunk_size total_txs,
); total_txs as f32 / duration_as_s(&total_elapsed)
info!(
" Keygen avg {:.2}/s peak {:.2}/s",
(stats.total as f64 / stats.keygen_ns as f64) * 1_000_000_000_f64,
stats.keygen_peak_rate
);
info!(
" Signed avg {:.2}/s peak {:.2}/s",
(stats.total as f64 / stats.sign_ns as f64) * 1_000_000_000_f64,
stats.sign_peak_rate
); );
assert_eq!( assert_eq!(
order_book.get_num_outstanding().0 + order_book.get_num_outstanding().1, order_book.get_num_outstanding().0 + order_book.get_num_outstanding().1,
@ -518,7 +497,7 @@ fn trader<T>(
shared_txs: &SharedTransactions, shared_txs: &SharedTransactions,
signers: &[Arc<Keypair>], signers: &[Arc<Keypair>],
srcs: &[Pubkey], srcs: &[Pubkey],
delay: u64, transfer_delay: u64,
batch_size: usize, batch_size: usize,
chunk_size: usize, chunk_size: usize,
account_groups: usize, account_groups: usize,
@ -526,16 +505,19 @@ fn trader<T>(
) where ) where
T: Client, T: Client,
{ {
let mut stats = Stats::default();
// TODO Hard coded for now // TODO Hard coded for now
let pair = TokenPair::AB; let pair = TokenPair::AB;
let tokens = 1; let tokens = 1;
let price = 1000; let price = 1000;
let mut account_group: usize = 0; let mut account_group: usize = 0;
let mut txs = 0;
let mut total_txs = 0;
let mut now = Instant::now();
let start_time = now;
let mut total_elapsed = start_time.elapsed();
loop { loop {
let now = Instant::now();
let trade_keys = generate_keypairs(batch_size as u64); let trade_keys = generate_keypairs(batch_size as u64);
let mut trades = vec![]; let mut trades = vec![];
@ -569,20 +551,12 @@ fn trader<T>(
trades.push((signer, trade.pubkey(), direction, src)); trades.push((signer, trade.pubkey(), direction, src));
} }
account_group = (account_group + 1) % account_groups as usize; account_group = (account_group + 1) % account_groups as usize;
let duration = now.elapsed();
let rate = batch_size as f32 / duration_as_s(&duration);
stats.keygen_ns += duration_as_ns(&duration);
if rate > stats.keygen_peak_rate {
stats.keygen_peak_rate = rate;
}
trace!("sw {:?} keypairs {:.2} /s", batch_size, rate);
let blockhash = client let blockhash = client
.get_recent_blockhash() .get_recent_blockhash()
.expect("Failed to get blockhash"); .expect("Failed to get blockhash");
trades.chunks(chunk_size).for_each(|chunk| { trades.chunks(chunk_size).for_each(|chunk| {
let now = Instant::now();
let trades_txs: Vec<_> = chunk let trades_txs: Vec<_> = chunk
.par_iter() .par_iter()
.map(|(signer, trade, direction, src)| { .map(|(signer, trade, direction, src)| {
@ -601,14 +575,24 @@ fn trader<T>(
) )
}) })
.collect(); .collect();
{
txs += chunk_size as u64;
total_txs += chunk_size as u64;
total_elapsed = start_time.elapsed();
let duration = now.elapsed(); let duration = now.elapsed();
let n = trades_txs.len(); if duration_as_s(&duration) >= 1_f32 {
let rate = n as f32 / duration_as_s(&duration); now = Instant::now();
stats.sign_ns += duration_as_ns(&duration); let tps = txs as f32 / duration_as_s(&duration);
if rate > stats.sign_peak_rate { info!(
stats.sign_peak_rate = rate; "Trader {:9.2} TPS, Transactions: {:6}, Total transactions: {} over {} s",
tps,
txs,
total_txs,
total_elapsed.as_secs(),
);
txs = 0;
} }
trace!(" sw {:?} signed {:.2} /s ", n, rate);
solana_metrics::submit( solana_metrics::submit(
influxdb::Point::new("bench-exchange") influxdb::Point::new("bench-exchange")
@ -621,35 +605,27 @@ fn trader<T>(
let mut shared_txs_wl = shared_txs let mut shared_txs_wl = shared_txs
.write() .write()
.expect("Failed to send tx to transfer threads"); .expect("Failed to send tx to transfer threads");
stats.total += chunk_size as u64;
shared_txs_wl.push_back(trades_txs); shared_txs_wl.push_back(trades_txs);
} }
if delay > 0 { }
sleep(Duration::from_millis(delay)); if transfer_delay > 0 {
sleep(Duration::from_millis(transfer_delay));
} }
}); });
if exit_signal.load(Ordering::Relaxed) {
info!(
"Trader sent {} at {:9.2} TPS",
total_txs,
total_txs as f32 / duration_as_s(&total_elapsed)
);
return;
}
// TODO chunk the trade infos and send them when the batch is sent
sender sender
.send(trade_infos) .send(trade_infos)
.expect("Failed to send trades to swapper"); .expect("Failed to send trades to swapper");
if exit_signal.load(Ordering::Relaxed) {
info!(
"{} Trades with batch size {} chunk size {}",
stats.total, batch_size, chunk_size
);
info!(
" Keygen avg {:.2}/s peak {:.2}/s",
(stats.total as f64 / stats.keygen_ns as f64) * 1_000_000_000_f64,
stats.keygen_peak_rate
);
info!(
" Signed avg {:.2}/s peak {:.2}/s",
(stats.total as f64 / stats.sign_ns as f64) * 1_000_000_000_f64,
stats.sign_peak_rate
);
break;
}
} }
} }
@ -1013,9 +989,9 @@ mod tests {
config.threads = 1; config.threads = 1;
config.duration = Duration::from_secs(1); config.duration = Duration::from_secs(1);
config.fund_amount = 100_000; config.fund_amount = 100_000;
config.transfer_delay = 20; config.transfer_delay = 20; // 15
config.batch_size = 100; // 1000; config.batch_size = 100; // 1000;
config.chunk_size = 10; // 250; config.chunk_size = 10; // 200;
config.account_groups = 1; // 10; config.account_groups = 1; // 10;
let Config { let Config {
fund_amount, fund_amount,