Get blockhash every batch and don't wait for tx threads (#3994)

* Get blockhash every batch and don't wait for tx threads

* nudge
This commit is contained in:
Jack May 2019-04-25 11:20:08 -07:00 committed by GitHub
parent 4dc0495a1b
commit a9e63455a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 69 deletions

View File

@ -26,11 +26,11 @@ use std::collections::VecDeque;
use std::mem; use std::mem;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::process::exit; use std::process::exit;
use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{sleep, Builder}; use std::thread::{sleep, Builder};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::time::{Duration, Instant};
// TODO Chunk length as specified results in a bunch of failures, divide by 10 helps... // TODO Chunk length as specified results in a bunch of failures, divide by 10 helps...
// Assume 4MB network buffers, and 512 byte packets // Assume 4MB network buffers, and 512 byte packets
@ -39,9 +39,6 @@ const FUND_CHUNK_LEN: usize = 4 * 1024 * 1024 / 512;
// Maximum system transfers per transaction // Maximum system transfers per transaction
const MAX_TRANSFERS_PER_TX: u64 = 4; const MAX_TRANSFERS_PER_TX: u64 = 4;
// Interval between fetching a new blockhash
const BLOCKHASH_RENEW_PERIOD_S: u64 = 30;
pub type SharedTransactions = Arc<RwLock<VecDeque<Vec<Transaction>>>>; pub type SharedTransactions = Arc<RwLock<VecDeque<Vec<Transaction>>>>;
pub struct Config { pub struct Config {
@ -140,25 +137,17 @@ where
); );
let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new())); let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new()));
let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0));
let total_txs_sent_count = Arc::new(AtomicUsize::new(0)); let total_txs_sent_count = Arc::new(AtomicUsize::new(0));
let s_threads: Vec<_> = (0..threads) let s_threads: Vec<_> = (0..threads)
.map(|_| { .map(|_| {
let exit_signal = exit_signal.clone(); let exit_signal = exit_signal.clone();
let shared_txs = shared_txs.clone(); let shared_txs = shared_txs.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let total_txs_sent_count = total_txs_sent_count.clone(); let total_txs_sent_count = total_txs_sent_count.clone();
let client = clients[0].clone(); let client = clients[0].clone();
Builder::new() Builder::new()
.name("solana-exchange-transfer".to_string()) .name("solana-exchange-transfer".to_string())
.spawn(move || { .spawn(move || {
do_tx_transfers( do_tx_transfers(&exit_signal, &shared_txs, &total_txs_sent_count, &client)
&exit_signal,
&shared_txs,
&shared_tx_active_thread_count,
&total_txs_sent_count,
&client,
)
}) })
.unwrap() .unwrap()
}) })
@ -169,7 +158,6 @@ where
let swapper_thread = { let swapper_thread = {
let exit_signal = exit_signal.clone(); let exit_signal = exit_signal.clone();
let shared_txs = shared_txs.clone(); let shared_txs = shared_txs.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let client = clients[0].clone(); let client = clients[0].clone();
Builder::new() Builder::new()
.name("solana-exchange-swapper".to_string()) .name("solana-exchange-swapper".to_string())
@ -178,7 +166,6 @@ where
&exit_signal, &exit_signal,
&swapper_receiver, &swapper_receiver,
&shared_txs, &shared_txs,
&shared_tx_active_thread_count,
&swapper_signers, &swapper_signers,
&profit_pubkeys, &profit_pubkeys,
batch_size, batch_size,
@ -194,7 +181,6 @@ where
let trader_thread = { let trader_thread = {
let exit_signal = exit_signal.clone(); let exit_signal = exit_signal.clone();
let shared_txs = shared_txs.clone(); let shared_txs = shared_txs.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let client = clients[0].clone(); let client = clients[0].clone();
Builder::new() Builder::new()
.name("solana-exchange-trader".to_string()) .name("solana-exchange-trader".to_string())
@ -203,7 +189,6 @@ where
&exit_signal, &exit_signal,
&swapper_sender, &swapper_sender,
&shared_txs, &shared_txs,
&shared_tx_active_thread_count,
&trader_signers, &trader_signers,
&src_pubkeys, &src_pubkeys,
transfer_delay, transfer_delay,
@ -311,7 +296,6 @@ fn sample_txs<T>(
fn do_tx_transfers<T>( fn do_tx_transfers<T>(
exit_signal: &Arc<AtomicBool>, exit_signal: &Arc<AtomicBool>,
shared_txs: &SharedTransactions, shared_txs: &SharedTransactions,
shared_tx_thread_count: &Arc<AtomicIsize>,
total_txs_sent_count: &Arc<AtomicUsize>, total_txs_sent_count: &Arc<AtomicUsize>,
client: &Arc<T>, client: &Arc<T>,
) where ) where
@ -327,13 +311,11 @@ fn do_tx_transfers<T>(
if let Some(txs0) = txs { if let Some(txs0) = txs {
let n = txs0.len(); let n = txs0.len();
shared_tx_thread_count.fetch_add(1, Ordering::Relaxed);
let now = Instant::now(); let now = Instant::now();
for tx in txs0 { for tx in txs0 {
client.async_send_transaction(tx).expect("Transfer"); client.async_send_transaction(tx).expect("Transfer");
} }
let duration = now.elapsed(); let duration = now.elapsed();
shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed);
total_txs_sent_count.fetch_add(n, Ordering::Relaxed); total_txs_sent_count.fetch_add(n, Ordering::Relaxed);
stats.total += n as u64; stats.total += n as u64;
@ -387,7 +369,6 @@ fn swapper<T>(
exit_signal: &Arc<AtomicBool>, exit_signal: &Arc<AtomicBool>,
receiver: &Receiver<Vec<TradeInfo>>, receiver: &Receiver<Vec<TradeInfo>>,
shared_txs: &SharedTransactions, shared_txs: &SharedTransactions,
shared_tx_active_thread_count: &Arc<AtomicIsize>,
signers: &[Arc<Keypair>], signers: &[Arc<Keypair>],
profit_pubkeys: &[Pubkey], profit_pubkeys: &[Pubkey],
batch_size: usize, batch_size: usize,
@ -400,10 +381,6 @@ fn swapper<T>(
let mut stats = Stats::default(); let mut stats = Stats::default();
let mut order_book = OrderBook::default(); let mut order_book = OrderBook::default();
let mut account_group: usize = 0; let mut account_group: usize = 0;
let mut blockhash = client
.get_recent_blockhash()
.expect("Failed to get blockhash");
let mut blockhash_now = UNIX_EPOCH;
'outer: loop { 'outer: loop {
if let Ok(trade_infos) = receiver.try_recv() { if let Ok(trade_infos) = receiver.try_recv() {
let mut tries = 0; let mut tries = 0;
@ -462,19 +439,9 @@ fn swapper<T>(
let now = Instant::now(); let now = Instant::now();
// Don't get a blockhash every time let blockhash = client
if SystemTime::now() .get_recent_blockhash()
.duration_since(blockhash_now) .expect("Failed to get blockhash");
.unwrap()
.as_secs()
> BLOCKHASH_RENEW_PERIOD_S
{
blockhash = client
.get_recent_blockhash()
.expect("Failed to get blockhash");
blockhash_now = SystemTime::now();
}
let to_swap_txs: Vec<_> = to_swap let to_swap_txs: Vec<_> = to_swap
.par_iter() .par_iter()
.map(|(signer, swap, profit)| { .map(|(signer, swap, profit)| {
@ -517,10 +484,6 @@ fn swapper<T>(
} }
} }
while shared_tx_active_thread_count.load(Ordering::Relaxed) > 0 {
sleep(Duration::from_millis(100));
}
if exit_signal.load(Ordering::Relaxed) { if exit_signal.load(Ordering::Relaxed) {
break 'outer; break 'outer;
} }
@ -550,7 +513,6 @@ fn trader<T>(
exit_signal: &Arc<AtomicBool>, exit_signal: &Arc<AtomicBool>,
sender: &Sender<Vec<TradeInfo>>, sender: &Sender<Vec<TradeInfo>>,
shared_txs: &SharedTransactions, shared_txs: &SharedTransactions,
shared_tx_active_thread_count: &Arc<AtomicIsize>,
signers: &[Arc<Keypair>], signers: &[Arc<Keypair>],
srcs: &[Pubkey], srcs: &[Pubkey],
delay: u64, delay: u64,
@ -568,10 +530,6 @@ fn trader<T>(
let tokens = 1; let tokens = 1;
let price = 1000; let price = 1000;
let mut account_group: usize = 0; let mut account_group: usize = 0;
let mut blockhash = client
.get_recent_blockhash()
.expect("Failed to get blockhash");
let mut blockhash_now = UNIX_EPOCH;
loop { loop {
let now = Instant::now(); let now = Instant::now();
@ -616,22 +574,12 @@ fn trader<T>(
} }
trace!("sw {:?} keypairs {:.2} /s", batch_size, rate); trace!("sw {:?} keypairs {:.2} /s", batch_size, rate);
let blockhash = client
.get_recent_blockhash()
.expect("Failed to get blockhash");
trades.chunks(chunk_size).for_each(|chunk| { trades.chunks(chunk_size).for_each(|chunk| {
let now = Instant::now(); let now = Instant::now();
// Don't get a blockhash every time
if SystemTime::now()
.duration_since(blockhash_now)
.unwrap()
.as_secs()
> BLOCKHASH_RENEW_PERIOD_S
{
blockhash = client
.get_recent_blockhash()
.expect("Failed to get blockhash");
blockhash_now = SystemTime::now();
}
let trades_txs: Vec<_> = chunk let trades_txs: Vec<_> = chunk
.par_iter() .par_iter()
.map(|(signer, trade, direction, src)| { .map(|(signer, trade, direction, src)| {
@ -682,10 +630,6 @@ fn trader<T>(
.send(trade_infos) .send(trade_infos)
.expect("Failed to send trades to swapper"); .expect("Failed to send trades to swapper");
while shared_tx_active_thread_count.load(Ordering::Relaxed) > 0 {
sleep(Duration::from_millis(100));
}
if exit_signal.load(Ordering::Relaxed) { if exit_signal.load(Ordering::Relaxed) {
info!( info!(
"{} Trades with batch size {} chunk size {}", "{} Trades with batch size {} chunk size {}",
@ -1023,7 +967,7 @@ pub fn airdrop_lamports(client: &Client, drone_addr: &SocketAddr, id: &Keypair,
} }
} }
pub fn get_clients(nodes: Vec<ContactInfo>) -> Vec<ThinClient> { pub fn get_clients(nodes: &[ContactInfo]) -> Vec<ThinClient> {
nodes nodes
.iter() .iter()
.filter_map(|node| { .filter_map(|node| {
@ -1104,7 +1048,7 @@ mod tests {
exit(1); exit(1);
}); });
let clients = get_clients(nodes); let clients = get_clients(&nodes);
if clients.len() < NUM_NODES { if clients.len() < NUM_NODES {
error!( error!(

View File

@ -34,7 +34,7 @@ fn main() {
panic!("Failed to discover nodes"); panic!("Failed to discover nodes");
}); });
let clients = get_clients(nodes); let clients = get_clients(&nodes);
info!("{} nodes found", clients.len()); info!("{} nodes found", clients.len());
if clients.len() < num_nodes { if clients.len() < num_nodes {