From 5eee9e62e5296087e32825b0156fc911ee040c79 Mon Sep 17 00:00:00 2001 From: Jack May Date: Wed, 1 May 2019 14:29:57 -0700 Subject: [PATCH] Add swapper back-off (#4088) * Add swapper back-off * Reset back-off if bench-exchange suspects back-log * nudge * nudge --- bench-exchange/src/bench.rs | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/bench-exchange/src/bench.rs b/bench-exchange/src/bench.rs index 332134f4c4..4797b839a1 100644 --- a/bench-exchange/src/bench.rs +++ b/bench-exchange/src/bench.rs @@ -181,6 +181,7 @@ where &shared_txs, &swapper_signers, &profit_pubkeys, + transfer_delay, batch_size, chunk_size, account_groups, @@ -358,6 +359,7 @@ fn swapper( shared_txs: &SharedTransactions, signers: &[Arc], profit_pubkeys: &[Pubkey], + transfer_delay: u64, batch_size: usize, chunk_size: usize, account_groups: usize, @@ -373,6 +375,18 @@ fn swapper( let mut now = Instant::now(); let start_time = now; let mut total_elapsed = start_time.elapsed(); + + // Chunks may have been dropped and we don't want to wait a long time + // for each time, Back-off each time we fail to confirm a chunk + const CHECK_TX_TIMEOUT_MAX_MS: u64 = 15000; + const CHECK_TX_DELAY_MS: u64 = 100; + let mut max_tries = CHECK_TX_TIMEOUT_MAX_MS / CHECK_TX_DELAY_MS; + + // If we dump too many chunks maybe we are just waiting on a back-log + // rather than a series of dropped packets, reset to max waits + const MAX_DUMPS: u64 = 50; + let mut dumps = 0; + 'outer: loop { if let Ok(trade_infos) = receiver.try_recv() { let mut tries = 0; @@ -383,17 +397,27 @@ fn swapper( == 0 { tries += 1; - if tries > 300 { + if tries >= max_tries { if exit_signal.load(Ordering::Relaxed) { break 'outer; } - error!("Give up waiting, dump batch"); + error!("Give up and dump batch"); + if dumps >= MAX_DUMPS { + error!("Max batches dumped, reset wait back-off"); + max_tries = CHECK_TX_TIMEOUT_MAX_MS / CHECK_TX_DELAY_MS; + dumps = 0; + } else { + dumps += 1; + max_tries /= 2; + } continue 'outer; } debug!("{} waiting for trades batch to clear", tries); - sleep(Duration::from_millis(100)); + sleep(Duration::from_millis(CHECK_TX_DELAY_MS)); trade_index = thread_rng().gen_range(0, trade_infos.len()); } + max_tries = CHECK_TX_TIMEOUT_MAX_MS / CHECK_TX_DELAY_MS; + dumps = 0; trade_infos.iter().for_each(|info| { order_book @@ -473,6 +497,8 @@ fn swapper( shared_txs_wl.push_back(chunk.to_vec()); } } + // Throttle the swapper so it doesn't try to catchup unbridled + sleep(Duration::from_millis(transfer_delay / 2)); } if exit_signal.load(Ordering::Relaxed) { @@ -986,9 +1012,9 @@ mod tests { let mut config = Config::default(); config.identity = Keypair::new(); - config.threads = 1; config.duration = Duration::from_secs(1); config.fund_amount = 100_000; + config.threads = 1; config.transfer_delay = 20; // 15 config.batch_size = 100; // 1000; config.chunk_size = 10; // 200; @@ -1058,9 +1084,9 @@ mod tests { let mut config = Config::default(); config.identity = identity; - config.threads = 1; config.duration = Duration::from_secs(1); config.fund_amount = 100_000; + config.threads = 1; config.transfer_delay = 20; // 0; config.batch_size = 100; // 1500; config.chunk_size = 10; // 1500;