Add swapper back-off (#4088)

* Add swapper back-off

* Reset back-off if bench-exchange suspects back-log

* nudge

* nudge
This commit is contained in:
Jack May 2019-05-01 14:29:57 -07:00 committed by GitHub
parent a7d18125d3
commit 5eee9e62e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 31 additions and 5 deletions

View File

@ -181,6 +181,7 @@ where
&shared_txs, &shared_txs,
&swapper_signers, &swapper_signers,
&profit_pubkeys, &profit_pubkeys,
transfer_delay,
batch_size, batch_size,
chunk_size, chunk_size,
account_groups, account_groups,
@ -358,6 +359,7 @@ fn swapper<T>(
shared_txs: &SharedTransactions, shared_txs: &SharedTransactions,
signers: &[Arc<Keypair>], signers: &[Arc<Keypair>],
profit_pubkeys: &[Pubkey], profit_pubkeys: &[Pubkey],
transfer_delay: u64,
batch_size: usize, batch_size: usize,
chunk_size: usize, chunk_size: usize,
account_groups: usize, account_groups: usize,
@ -373,6 +375,18 @@ fn swapper<T>(
let mut now = Instant::now(); let mut now = Instant::now();
let start_time = now; let start_time = now;
let mut total_elapsed = start_time.elapsed(); let mut total_elapsed = start_time.elapsed();
// Chunks may have been dropped and we don't want to wait a long time
// for each time, Back-off each time we fail to confirm a chunk
const CHECK_TX_TIMEOUT_MAX_MS: u64 = 15000;
const CHECK_TX_DELAY_MS: u64 = 100;
let mut max_tries = CHECK_TX_TIMEOUT_MAX_MS / CHECK_TX_DELAY_MS;
// If we dump too many chunks maybe we are just waiting on a back-log
// rather than a series of dropped packets, reset to max waits
const MAX_DUMPS: u64 = 50;
let mut dumps = 0;
'outer: loop { 'outer: loop {
if let Ok(trade_infos) = receiver.try_recv() { if let Ok(trade_infos) = receiver.try_recv() {
let mut tries = 0; let mut tries = 0;
@ -383,17 +397,27 @@ fn swapper<T>(
== 0 == 0
{ {
tries += 1; tries += 1;
if tries > 300 { if tries >= max_tries {
if exit_signal.load(Ordering::Relaxed) { if exit_signal.load(Ordering::Relaxed) {
break 'outer; break 'outer;
} }
error!("Give up waiting, dump batch"); error!("Give up and dump batch");
if dumps >= MAX_DUMPS {
error!("Max batches dumped, reset wait back-off");
max_tries = CHECK_TX_TIMEOUT_MAX_MS / CHECK_TX_DELAY_MS;
dumps = 0;
} else {
dumps += 1;
max_tries /= 2;
}
continue 'outer; continue 'outer;
} }
debug!("{} waiting for trades batch to clear", tries); debug!("{} waiting for trades batch to clear", tries);
sleep(Duration::from_millis(100)); sleep(Duration::from_millis(CHECK_TX_DELAY_MS));
trade_index = thread_rng().gen_range(0, trade_infos.len()); trade_index = thread_rng().gen_range(0, trade_infos.len());
} }
max_tries = CHECK_TX_TIMEOUT_MAX_MS / CHECK_TX_DELAY_MS;
dumps = 0;
trade_infos.iter().for_each(|info| { trade_infos.iter().for_each(|info| {
order_book order_book
@ -473,6 +497,8 @@ fn swapper<T>(
shared_txs_wl.push_back(chunk.to_vec()); shared_txs_wl.push_back(chunk.to_vec());
} }
} }
// Throttle the swapper so it doesn't try to catchup unbridled
sleep(Duration::from_millis(transfer_delay / 2));
} }
if exit_signal.load(Ordering::Relaxed) { if exit_signal.load(Ordering::Relaxed) {
@ -986,9 +1012,9 @@ mod tests {
let mut config = Config::default(); let mut config = Config::default();
config.identity = Keypair::new(); config.identity = Keypair::new();
config.threads = 1;
config.duration = Duration::from_secs(1); config.duration = Duration::from_secs(1);
config.fund_amount = 100_000; config.fund_amount = 100_000;
config.threads = 1;
config.transfer_delay = 20; // 15 config.transfer_delay = 20; // 15
config.batch_size = 100; // 1000; config.batch_size = 100; // 1000;
config.chunk_size = 10; // 200; config.chunk_size = 10; // 200;
@ -1058,9 +1084,9 @@ mod tests {
let mut config = Config::default(); let mut config = Config::default();
config.identity = identity; config.identity = identity;
config.threads = 1;
config.duration = Duration::from_secs(1); config.duration = Duration::from_secs(1);
config.fund_amount = 100_000; config.fund_amount = 100_000;
config.threads = 1;
config.transfer_delay = 20; // 0; config.transfer_delay = 20; // 0;
config.batch_size = 100; // 1500; config.batch_size = 100; // 1500;
config.chunk_size = 10; // 1500; config.chunk_size = 10; // 1500;