liquidator: rebalance asynchronously

This commit is contained in:
Christian Kamm 2023-12-22 08:41:41 +01:00
parent 0a55184ec7
commit d2327f8d11
2 changed files with 32 additions and 17 deletions

View File

@ -7,6 +7,7 @@ use anchor_client::Cluster;
use anyhow::Context;
use clap::Parser;
use mango_v4::state::{PerpMarketIndex, TokenIndex};
use mango_v4_client::AsyncChannelSendUnlessFull;
use mango_v4_client::{
account_update_stream, chain_data, error_tracking::ErrorTracking, jupiter, keypair_from_cli,
snapshot_source, websocket_source, Client, MangoClient, MangoClientError, MangoGroupContext,
@ -315,6 +316,8 @@ async fn main() -> anyhow::Result<()> {
min_buy_fraction: 0.7,
};
let mut rebalance_interval = tokio::time::interval(Duration::from_secs(30));
let (rebalance_trigger_sender, rebalance_trigger_receiver) = async_channel::bounded::<()>(1);
let rebalance_config = rebalance::Config {
enabled: cli.rebalance == BoolArg::True,
slippage_bps: cli.rebalance_slippage_bps,
@ -343,7 +346,6 @@ async fn main() -> anyhow::Result<()> {
account_fetcher,
liquidation_config: liq_config,
trigger_tcs_config: tcs_config,
rebalancer: rebalancer.clone(),
token_swap_info: token_swap_info_updater.clone(),
liq_errors: ErrorTracking {
skip_threshold: 5,
@ -441,13 +443,33 @@ async fn main() -> anyhow::Result<()> {
// Could be refactored to only start the below jobs when the first snapshot is done.
// But need to take care to abort if the above job aborts beforehand.
let rebalance_job = tokio::spawn({
let shared_state = shared_state.clone();
async move {
loop {
tokio::select! {
_ = rebalance_interval.tick() => {}
_ = rebalance_trigger_receiver.recv() => {}
}
if !shared_state.read().unwrap().one_snapshot_done {
continue;
}
if let Err(err) = rebalancer.zero_all_non_quote().await {
error!("failed to rebalance liqor: {:?}", err);
// Workaround: We really need a sequence enforcer in the liquidator since we don't want to
// accidentally send a similar tx again when we incorrectly believe an earlier one got forked
// off. For now, hard sleep on error to avoid the most frequent error cases.
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
});
let liquidation_job = tokio::spawn({
let mut interval = tokio::time::interval(Duration::from_millis(cli.check_interval_ms));
let shared_state = shared_state.clone();
async move {
let mut must_rebalance = true;
let rebalance_delay = Duration::from_secs(5);
let mut last_rebalance = Instant::now();
loop {
interval.tick().await;
@ -459,14 +481,6 @@ async fn main() -> anyhow::Result<()> {
state.mango_accounts.iter().cloned().collect_vec()
};
if must_rebalance || last_rebalance.elapsed() > rebalance_delay {
if let Err(err) = liquidation.rebalancer.zero_all_non_quote().await {
error!("failed to rebalance liqor: {:?}", err);
}
must_rebalance = false;
last_rebalance = Instant::now();
}
liquidation.log_persistent_errors();
let liquidated = liquidation
@ -487,7 +501,9 @@ async fn main() -> anyhow::Result<()> {
}
}
must_rebalance = must_rebalance || liquidated || took_tcs;
if liquidated || took_tcs {
rebalance_trigger_sender.send_unless_full(()).unwrap();
}
}
}
});
@ -545,6 +561,7 @@ async fn main() -> anyhow::Result<()> {
use futures::StreamExt;
let mut jobs: futures::stream::FuturesUnordered<_> = vec![
data_job,
rebalance_job,
liquidation_job,
token_swap_info_job,
check_changes_for_abort_job,
@ -570,7 +587,6 @@ struct SharedState {
struct LiquidationState {
mango_client: Arc<MangoClient>,
account_fetcher: Arc<chain_data::AccountFetcher>,
rebalancer: Arc<rebalance::Rebalancer>,
token_swap_info: Arc<token_swap_info::TokenSwapInfoUpdater>,
liquidation_config: liquidate::Config,
trigger_tcs_config: trigger_tcs::Config,

View File

@ -408,10 +408,9 @@ impl Rebalancer {
return Ok(());
}
} else if amount > dust_threshold {
anyhow::bail!(
warn!(
"unexpected {} position after rebalance swap: {} native",
token.name,
amount
token.name, amount
);
}
}