From e8e7e445d335d37c1b1a3f847aabb3ceadb66a57 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Mon, 11 Dec 2023 17:41:11 +0100 Subject: [PATCH] liq: get rid of separate rebalance job (#815) Previously, the separate job and the post-liquidation rebalance could run at the same time and would occasionally perform the same action at the same time, leading to overshooting. Now rebalancing never happens twice. In the future it should potentially just run separately from liquidation, but that needs a review of the assumptions the liquidation job is making first. --- bin/liquidator/src/main.rs | 42 +++++++++++--------------------------- 1 file changed, 12 insertions(+), 30 deletions(-) diff --git a/bin/liquidator/src/main.rs b/bin/liquidator/src/main.rs index cda897543..5781c41d3 100644 --- a/bin/liquidator/src/main.rs +++ b/bin/liquidator/src/main.rs @@ -339,7 +339,6 @@ async fn main() -> anyhow::Result<()> { min_buy_fraction: 0.7, }; - let mut rebalance_interval = tokio::time::interval(Duration::from_secs(5)); let rebalance_config = rebalance::Config { enabled: cli.rebalance == BoolArg::True, slippage_bps: cli.rebalance_slippage_bps, @@ -465,30 +464,13 @@ async fn main() -> anyhow::Result<()> { // Could be refactored to only start the below jobs when the first snapshot is done. // But need to take care to abort if the above job aborts beforehand. - let rebalance_job = tokio::spawn({ - let shared_state = shared_state.clone(); - async move { - loop { - rebalance_interval.tick().await; - if !shared_state.read().unwrap().one_snapshot_done { - continue; - } - if let Err(err) = rebalancer.zero_all_non_quote().await { - error!("failed to rebalance liqor: {:?}", err); - - // Workaround: We really need a sequence enforcer in the liquidator since we don't want to - // accidentally send a similar tx again when we incorrectly believe an earlier one got forked - // off. For now, hard sleep on error to avoid the most frequent error cases. - tokio::time::sleep(Duration::from_secs(10)).await; - } - } - } - }); - let liquidation_job = tokio::spawn({ let mut interval = tokio::time::interval(Duration::from_millis(cli.check_interval_ms)); let shared_state = shared_state.clone(); async move { + let mut must_rebalance = true; + let rebalance_delay = Duration::from_secs(5); + let mut last_rebalance = Instant::now(); loop { interval.tick().await; @@ -500,6 +482,14 @@ async fn main() -> anyhow::Result<()> { state.mango_accounts.iter().cloned().collect_vec() }; + if must_rebalance || last_rebalance.elapsed() > rebalance_delay { + if let Err(err) = liquidation.rebalancer.zero_all_non_quote().await { + error!("failed to rebalance liqor: {:?}", err); + } + must_rebalance = false; + last_rebalance = Instant::now(); + } + liquidation.log_persistent_errors(); let liquidated = liquidation @@ -515,14 +505,7 @@ async fn main() -> anyhow::Result<()> { .unwrap(); } - if liquidated || took_tcs { - // It's awkward that this rebalance can run in parallel with the one - // from the rebalance_job. Ideally we'd get only one at a time/in quick succession. - // However, we do want to rebalance after a liquidation before liquidating further. - if let Err(err) = liquidation.rebalancer.zero_all_non_quote().await { - error!("failed to rebalance liqor: {:?}", err); - } - } + must_rebalance = must_rebalance || liquidated || took_tcs; } } }); @@ -580,7 +563,6 @@ async fn main() -> anyhow::Result<()> { use futures::StreamExt; let mut jobs: futures::stream::FuturesUnordered<_> = vec![ data_job, - rebalance_job, liquidation_job, token_swap_info_job, check_changes_for_abort_job,