diff --git a/bin/liquidator/src/main.rs b/bin/liquidator/src/main.rs index cda897543..5781c41d3 100644 --- a/bin/liquidator/src/main.rs +++ b/bin/liquidator/src/main.rs @@ -339,7 +339,6 @@ async fn main() -> anyhow::Result<()> { min_buy_fraction: 0.7, }; - let mut rebalance_interval = tokio::time::interval(Duration::from_secs(5)); let rebalance_config = rebalance::Config { enabled: cli.rebalance == BoolArg::True, slippage_bps: cli.rebalance_slippage_bps, @@ -465,30 +464,13 @@ async fn main() -> anyhow::Result<()> { // Could be refactored to only start the below jobs when the first snapshot is done. // But need to take care to abort if the above job aborts beforehand. - let rebalance_job = tokio::spawn({ - let shared_state = shared_state.clone(); - async move { - loop { - rebalance_interval.tick().await; - if !shared_state.read().unwrap().one_snapshot_done { - continue; - } - if let Err(err) = rebalancer.zero_all_non_quote().await { - error!("failed to rebalance liqor: {:?}", err); - - // Workaround: We really need a sequence enforcer in the liquidator since we don't want to - // accidentally send a similar tx again when we incorrectly believe an earlier one got forked - // off. For now, hard sleep on error to avoid the most frequent error cases. - tokio::time::sleep(Duration::from_secs(10)).await; - } - } - } - }); - let liquidation_job = tokio::spawn({ let mut interval = tokio::time::interval(Duration::from_millis(cli.check_interval_ms)); let shared_state = shared_state.clone(); async move { + let mut must_rebalance = true; + let rebalance_delay = Duration::from_secs(5); + let mut last_rebalance = Instant::now(); loop { interval.tick().await; @@ -500,6 +482,14 @@ async fn main() -> anyhow::Result<()> { state.mango_accounts.iter().cloned().collect_vec() }; + if must_rebalance || last_rebalance.elapsed() > rebalance_delay { + if let Err(err) = liquidation.rebalancer.zero_all_non_quote().await { + error!("failed to rebalance liqor: {:?}", err); + } + must_rebalance = false; + last_rebalance = Instant::now(); + } + liquidation.log_persistent_errors(); let liquidated = liquidation @@ -515,14 +505,7 @@ async fn main() -> anyhow::Result<()> { .unwrap(); } - if liquidated || took_tcs { - // It's awkward that this rebalance can run in parallel with the one - // from the rebalance_job. Ideally we'd get only one at a time/in quick succession. - // However, we do want to rebalance after a liquidation before liquidating further. - if let Err(err) = liquidation.rebalancer.zero_all_non_quote().await { - error!("failed to rebalance liqor: {:?}", err); - } - } + must_rebalance = must_rebalance || liquidated || took_tcs; } } }); @@ -580,7 +563,6 @@ async fn main() -> anyhow::Result<()> { use futures::StreamExt; let mut jobs: futures::stream::FuturesUnordered<_> = vec![ data_job, - rebalance_job, liquidation_job, token_swap_info_job, check_changes_for_abort_job,