liq: get rid of separate rebalance job (#815)

Previously, the separate job and the post-liquidation rebalance could
run at the same time and would occasionally perform the same action at
the same time, leading to overshooting.

Now rebalancing never happens twice. In the future it should potentially
just run separately from liquidation, but that needs a review of the
assumptions the liquidation job is making first.

(cherry picked from commit e8e7e445d3)
This commit is contained in:
Christian Kamm 2023-12-11 17:41:11 +01:00
parent 9ba0004760
commit 47faa8a7f1
1 changed files with 12 additions and 30 deletions

View File

@ -308,7 +308,6 @@ async fn main() -> anyhow::Result<()> {
min_buy_fraction: 0.7,
};
let mut rebalance_interval = tokio::time::interval(Duration::from_secs(5));
let rebalance_config = rebalance::Config {
enabled: cli.rebalance == BoolArg::True,
slippage_bps: cli.rebalance_slippage_bps,
@ -434,30 +433,13 @@ async fn main() -> anyhow::Result<()> {
// Could be refactored to only start the below jobs when the first snapshot is done.
// But need to take care to abort if the above job aborts beforehand.
let rebalance_job = tokio::spawn({
let shared_state = shared_state.clone();
async move {
loop {
rebalance_interval.tick().await;
if !shared_state.read().unwrap().one_snapshot_done {
continue;
}
if let Err(err) = rebalancer.zero_all_non_quote().await {
error!("failed to rebalance liqor: {:?}", err);
// Workaround: We really need a sequence enforcer in the liquidator since we don't want to
// accidentally send a similar tx again when we incorrectly believe an earlier one got forked
// off. For now, hard sleep on error to avoid the most frequent error cases.
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
});
let liquidation_job = tokio::spawn({
let mut interval = tokio::time::interval(Duration::from_millis(cli.check_interval_ms));
let shared_state = shared_state.clone();
async move {
let mut must_rebalance = true;
let rebalance_delay = Duration::from_secs(5);
let mut last_rebalance = Instant::now();
loop {
interval.tick().await;
@ -469,6 +451,14 @@ async fn main() -> anyhow::Result<()> {
state.mango_accounts.iter().cloned().collect_vec()
};
if must_rebalance || last_rebalance.elapsed() > rebalance_delay {
if let Err(err) = liquidation.rebalancer.zero_all_non_quote().await {
error!("failed to rebalance liqor: {:?}", err);
}
must_rebalance = false;
last_rebalance = Instant::now();
}
liquidation.log_persistent_errors();
let liquidated = liquidation
@ -484,14 +474,7 @@ async fn main() -> anyhow::Result<()> {
.unwrap();
}
if liquidated || took_tcs {
// It's awkward that this rebalance can run in parallel with the one
// from the rebalance_job. Ideally we'd get only one at a time/in quick succession.
// However, we do want to rebalance after a liquidation before liquidating further.
if let Err(err) = liquidation.rebalancer.zero_all_non_quote().await {
error!("failed to rebalance liqor: {:?}", err);
}
}
must_rebalance = must_rebalance || liquidated || took_tcs;
}
}
});
@ -549,7 +532,6 @@ async fn main() -> anyhow::Result<()> {
use futures::StreamExt;
let mut jobs: futures::stream::FuturesUnordered<_> = vec![
data_job,
rebalance_job,
liquidation_job,
token_swap_info_job,
check_changes_for_abort_job,