From f54bb6f0b00f4674f7177b3b1484ae5d615b0805 Mon Sep 17 00:00:00 2001 From: Serge Farny Date: Wed, 20 Mar 2024 15:25:52 +0100 Subject: [PATCH] Serge/liquidator split tcs and liquidation (#914) liquidator: split TCS triggering and liquidation job Concurrent execution of candidate lookup and tx building/sending - Also added an health assertion IX to protect liqor in multi liquidation scenario - And a timeout for jupiter v6 queries (avoid blocking liquidation because of slow TCS) --- Cargo.lock | 1 + bin/liquidator/Cargo.toml | 3 +- bin/liquidator/src/cli_args.rs | 20 + bin/liquidator/src/liquidate.rs | 47 +- bin/liquidator/src/liquidation_state.rs | 238 ++++++++ bin/liquidator/src/main.rs | 545 ++++++------------ bin/liquidator/src/tcs_state.rs | 218 +++++++ bin/liquidator/src/trigger_tcs.rs | 46 +- bin/liquidator/src/tx_sender.rs | 241 ++++++++ lib/client/src/client.rs | 57 +- lib/client/src/jupiter/v6.rs | 4 + lib/client/src/util.rs | 16 +- .../mango-v4/tests/cases/test_health_check.rs | 2 - ts/client/scripts/liqtest/README.md | 1 + 14 files changed, 1046 insertions(+), 393 deletions(-) create mode 100644 bin/liquidator/src/liquidation_state.rs create mode 100644 bin/liquidator/src/tcs_state.rs create mode 100644 bin/liquidator/src/tx_sender.rs diff --git a/Cargo.lock b/Cargo.lock index a54abdb97..b6131418b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3540,6 +3540,7 @@ dependencies = [ "futures-core", "futures-util", "hdrhistogram", + "indexmap 2.0.0", "itertools", "jemallocator", "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/bin/liquidator/Cargo.toml b/bin/liquidator/Cargo.toml index d591bd37b..ea254b180 100644 --- a/bin/liquidator/Cargo.toml +++ b/bin/liquidator/Cargo.toml @@ -49,4 +49,5 @@ tokio-stream = { version = "0.1.9"} tokio-tungstenite = "0.16.1" tracing = "0.1" regex = "1.9.5" -hdrhistogram = "7.5.4" \ No newline at end of file +hdrhistogram = "7.5.4" +indexmap = "2.0.0" \ No newline at end of file diff --git a/bin/liquidator/src/cli_args.rs b/bin/liquidator/src/cli_args.rs index 53ea01fad..234fe5abd 100644 --- a/bin/liquidator/src/cli_args.rs +++ b/bin/liquidator/src/cli_args.rs @@ -136,6 +136,12 @@ pub struct Cli { #[clap(long, env, value_enum, default_value = "true")] pub(crate) take_tcs: BoolArg, + #[clap(long, env, default_value = "30")] + pub(crate) tcs_refresh_timeout_secs: u64, + + #[clap(long, env, default_value = "1000")] + pub(crate) tcs_check_interval_ms: u64, + /// profit margin at which to take tcs orders #[clap(long, env, default_value = "0.0005")] pub(crate) tcs_profit_fraction: f64, @@ -178,6 +184,10 @@ pub struct Cli { #[clap(long, env, default_value = "https://quote-api.jup.ag/v6")] pub(crate) jupiter_v6_url: String, + /// override the jupiter http request timeout + #[clap(long, env, default_value = "30")] + pub(crate) jupiter_timeout_secs: u64, + /// provide a jupiter token, currently only for jup v6 #[clap(long, env, default_value = "")] pub(crate) jupiter_token: String, @@ -191,6 +201,12 @@ pub struct Cli { #[clap(long, env, value_enum, default_value = "true")] pub(crate) telemetry: BoolArg, + /// if liquidation is enabled + /// + /// might be used to run an instance of liquidator dedicated to TCS and another one for liquidation + #[clap(long, env, value_enum, default_value = "true")] + pub(crate) liquidation_enabled: BoolArg, + /// liquidation refresh timeout in secs #[clap(long, env, default_value = "30")] pub(crate) liquidation_refresh_timeout_secs: u8, @@ -216,4 +232,8 @@ pub struct Cli { /// how long should it wait before logging an oracle error again (for the same token) #[clap(long, env, default_value = "30")] pub(crate) skip_oracle_error_in_logs_duration_secs: u64, + + /// max number of liquidation/tcs to do concurrently + #[clap(long, env, default_value = "5")] + pub(crate) max_parallel_operations: u64, } diff --git a/bin/liquidator/src/liquidate.rs b/bin/liquidator/src/liquidate.rs index c355aaa19..06c3d1f01 100644 --- a/bin/liquidator/src/liquidate.rs +++ b/bin/liquidator/src/liquidate.rs @@ -9,6 +9,7 @@ use mango_v4_client::{chain_data, MangoClient, PreparedInstructions}; use solana_sdk::signature::Signature; use futures::{stream, StreamExt, TryStreamExt}; +use mango_v4::accounts_ix::HealthCheckKind::MaintRatio; use rand::seq::SliceRandom; use tracing::*; use {anyhow::Context, fixed::types::I80F48, solana_sdk::pubkey::Pubkey}; @@ -260,7 +261,22 @@ impl<'a> LiquidateHelper<'a> { ) .await .context("creating perp_liq_base_or_positive_pnl_instruction")?; + liq_ixs.cu = liq_ixs.cu.max(self.config.compute_limit_for_liq_ix); + + let liqor = &self.client.mango_account().await?; + liq_ixs.append( + self.client + .health_check_instruction( + liqor, + self.config.min_health_ratio, + vec![], + vec![*perp_market_index], + MaintRatio, + ) + .await?, + ); + let txsig = self .client .send_and_confirm_owner_tx(liq_ixs.to_instructions()) @@ -501,6 +517,20 @@ impl<'a> LiquidateHelper<'a> { .await .context("creating liq_token_with_token ix")?; liq_ixs.cu = liq_ixs.cu.max(self.config.compute_limit_for_liq_ix); + + let liqor = self.client.mango_account().await?; + liq_ixs.append( + self.client + .health_check_instruction( + &liqor, + self.config.min_health_ratio, + vec![asset_token_index, liab_token_index], + vec![], + MaintRatio, + ) + .await?, + ); + let txsig = self .client .send_and_confirm_owner_tx(liq_ixs.to_instructions()) @@ -651,14 +681,11 @@ impl<'a> LiquidateHelper<'a> { } #[allow(clippy::too_many_arguments)] -pub async fn maybe_liquidate_account( +pub async fn can_liquidate_account( mango_client: &MangoClient, account_fetcher: &chain_data::AccountFetcher, pubkey: &Pubkey, - config: &Config, ) -> anyhow::Result { - let liqor_min_health_ratio = I80F48::from_num(config.min_health_ratio); - let account = account_fetcher.fetch_mango_account(pubkey)?; let health_cache = mango_client .health_cache(&account) @@ -675,6 +702,18 @@ pub async fn maybe_liquidate_account( "possible candidate", ); + Ok(true) +} + +#[allow(clippy::too_many_arguments)] +pub async fn maybe_liquidate_account( + mango_client: &MangoClient, + account_fetcher: &chain_data::AccountFetcher, + pubkey: &Pubkey, + config: &Config, +) -> anyhow::Result { + let liqor_min_health_ratio = I80F48::from_num(config.min_health_ratio); + // Fetch a fresh account and re-compute // This is -- unfortunately -- needed because the websocket streams seem to not // be great at providing timely updates to the account data. diff --git a/bin/liquidator/src/liquidation_state.rs b/bin/liquidator/src/liquidation_state.rs new file mode 100644 index 000000000..aedae7890 --- /dev/null +++ b/bin/liquidator/src/liquidation_state.rs @@ -0,0 +1,238 @@ +use crate::cli_args::Cli; +use crate::metrics::Metrics; +use crate::unwrappable_oracle_error::UnwrappableOracleError; +use crate::{liquidate, LiqErrorType, SharedState}; +use anchor_lang::prelude::Pubkey; +use itertools::Itertools; +use mango_v4::state::TokenIndex; +use mango_v4_client::error_tracking::ErrorTracking; +use mango_v4_client::{chain_data, MangoClient, MangoClientError}; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; +use tokio::task::JoinHandle; +use tracing::{error, trace, warn}; + +#[derive(Clone)] +pub struct LiquidationState { + pub mango_client: Arc, + pub account_fetcher: Arc, + pub liquidation_config: liquidate::Config, + + pub errors: Arc>>, + pub oracle_errors: Arc>>, +} + +impl LiquidationState { + async fn find_candidates( + &mut self, + accounts_iter: impl Iterator, + action: impl Fn(Pubkey) -> anyhow::Result<()>, + ) -> anyhow::Result { + let mut found_counter = 0u64; + use rand::seq::SliceRandom; + + let mut accounts = accounts_iter.collect::>(); + { + let mut rng = rand::thread_rng(); + accounts.shuffle(&mut rng); + } + + for pubkey in accounts { + if self.should_skip_execution(pubkey) { + continue; + } + + let result = + liquidate::can_liquidate_account(&self.mango_client, &self.account_fetcher, pubkey) + .await; + + self.log_or_ignore_error(&result, pubkey); + + if result.unwrap_or(false) { + action(*pubkey)?; + found_counter = found_counter + 1; + } + } + + Ok(found_counter) + } + + fn should_skip_execution(&mut self, pubkey: &Pubkey) -> bool { + let now = Instant::now(); + let error_tracking = &mut self.errors; + + // Skip a pubkey if there've been too many errors recently + if let Some(error_entry) = + error_tracking + .read() + .unwrap() + .had_too_many_errors(LiqErrorType::Liq, pubkey, now) + { + trace!( + %pubkey, + error_entry.count, + "skip checking account for liquidation, had errors recently", + ); + return true; + } + + false + } + + fn log_or_ignore_error(&mut self, result: &anyhow::Result, pubkey: &Pubkey) { + let error_tracking = &mut self.errors; + + if let Err(err) = result.as_ref() { + if let Some((ti, ti_name)) = err.try_unwrap_oracle_error() { + if self + .oracle_errors + .read() + .unwrap() + .had_too_many_errors(LiqErrorType::Liq, &ti, Instant::now()) + .is_none() + { + warn!( + "{:?} recording oracle error for token {} {}", + chrono::offset::Utc::now(), + ti_name, + ti + ); + } + + self.oracle_errors + .write() + .unwrap() + .record(LiqErrorType::Liq, &ti, err.to_string()); + return; + } + + // Keep track of pubkeys that had errors + error_tracking + .write() + .unwrap() + .record(LiqErrorType::Liq, pubkey, err.to_string()); + + // Not all errors need to be raised to the user's attention. + let mut is_error = true; + + // Simulation errors due to liqee precondition failures on the liquidation instructions + // will commonly happen if our liquidator is late or if there are chain forks. + match err.downcast_ref::() { + Some(MangoClientError::SendTransactionPreflightFailure { logs, .. }) => { + if logs.iter().any(|line| { + line.contains("HealthMustBeNegative") || line.contains("IsNotBankrupt") + }) { + is_error = false; + } + } + _ => {} + }; + if is_error { + error!("liquidating account {}: {:?}", pubkey, err); + } else { + trace!("liquidating account {}: {:?}", pubkey, err); + } + } else { + error_tracking + .write() + .unwrap() + .clear(LiqErrorType::Liq, pubkey); + } + } + + pub async fn maybe_liquidate_and_log_error(&mut self, pubkey: &Pubkey) -> anyhow::Result { + if self.should_skip_execution(pubkey) { + return Ok(false); + } + + let result = liquidate::maybe_liquidate_account( + &self.mango_client, + &self.account_fetcher, + pubkey, + &self.liquidation_config, + ) + .await; + + self.log_or_ignore_error(&result, pubkey); + return result; + } +} + +pub fn spawn_liquidation_job( + cli: &Cli, + shared_state: &Arc>, + tx_trigger_sender: async_channel::Sender<()>, + mut liquidation: Box, + metrics: &Metrics, +) -> JoinHandle<()> { + tokio::spawn({ + let mut interval = + mango_v4_client::delay_interval(Duration::from_millis(cli.check_interval_ms)); + let mut metric_liquidation_check = metrics.register_latency("liquidation_check".into()); + let mut metric_liquidation_start_end = + metrics.register_latency("liquidation_start_end".into()); + + let mut liquidation_start_time = None; + + let shared_state = shared_state.clone(); + async move { + loop { + interval.tick().await; + + let account_addresses = { + let mut state = shared_state.write().unwrap(); + if !state.one_snapshot_done { + // discard first latency info as it will skew data too much + state.oldest_chain_event_reception_time = None; + continue; + } + if state.oldest_chain_event_reception_time.is_none() + && liquidation_start_time.is_none() + { + // no new update, skip computing + continue; + } + + state.mango_accounts.iter().cloned().collect_vec() + }; + + liquidation.errors.write().unwrap().update(); + liquidation.oracle_errors.write().unwrap().update(); + + if liquidation_start_time.is_none() { + liquidation_start_time = Some(Instant::now()); + } + + let found_candidates = liquidation + .find_candidates(account_addresses.iter(), |p| { + if shared_state + .write() + .unwrap() + .liquidation_candidates_accounts + .insert(p) + { + tx_trigger_sender.try_send(())?; + } + + Ok(()) + }) + .await + .unwrap(); + + if found_candidates > 0 { + tracing::debug!("found {} candidates for liquidation", found_candidates); + } + + let mut state = shared_state.write().unwrap(); + let reception_time = state.oldest_chain_event_reception_time.unwrap(); + let current_time = Instant::now(); + + state.oldest_chain_event_reception_time = None; + + metric_liquidation_check.push(current_time - reception_time); + metric_liquidation_start_end.push(current_time - liquidation_start_time.unwrap()); + liquidation_start_time = None; + } + } + }) +} diff --git a/bin/liquidator/src/main.rs b/bin/liquidator/src/main.rs index 758736936..1c62c9ad4 100644 --- a/bin/liquidator/src/main.rs +++ b/bin/liquidator/src/main.rs @@ -4,33 +4,40 @@ use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use anchor_client::Cluster; -use anyhow::Context; use clap::Parser; +use futures_util::StreamExt; use mango_v4::state::{PerpMarketIndex, TokenIndex}; -use mango_v4_client::AsyncChannelSendUnlessFull; use mango_v4_client::{ account_update_stream, chain_data, error_tracking::ErrorTracking, keypair_from_cli, - snapshot_source, websocket_source, Client, MangoClient, MangoClientError, MangoGroupContext, + snapshot_source, websocket_source, Client, MangoClient, MangoGroupContext, TransactionBuilderConfig, }; +use crate::cli_args::{BoolArg, Cli, CliDotenv}; +use crate::liquidation_state::LiquidationState; +use crate::rebalance::Rebalancer; +use crate::tcs_state::TcsState; +use crate::token_swap_info::TokenSwapInfoUpdater; use itertools::Itertools; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::pubkey::Pubkey; use solana_sdk::signer::Signer; +use tokio::task::JoinHandle; use tracing::*; pub mod cli_args; pub mod liquidate; +mod liquidation_state; pub mod metrics; pub mod rebalance; +mod tcs_state; pub mod telemetry; pub mod token_swap_info; pub mod trigger_tcs; +mod tx_sender; mod unwrappable_oracle_error; pub mod util; -use crate::unwrappable_oracle_error::UnwrappableOracleError; use crate::util::{is_mango_account, is_mint_info, is_perp_market}; // jemalloc seems to be better at keeping the memory footprint reasonable over @@ -69,7 +76,7 @@ async fn main() -> anyhow::Result<()> { // Client setup // let liqor_owner = Arc::new(keypair_from_cli(&cli.liqor_owner)); - let rpc_url = cli.rpc_url; + let rpc_url = cli.rpc_url.clone(); let ws_url = rpc_url.replace("https", "wss"); let rpc_timeout = Duration::from_secs(10); let cluster = Cluster::Custom(rpc_url.clone(), ws_url.clone()); @@ -79,8 +86,9 @@ async fn main() -> anyhow::Result<()> { .commitment(commitment) .fee_payer(Some(liqor_owner.clone())) .timeout(rpc_timeout) - .jupiter_v6_url(cli.jupiter_v6_url) - .jupiter_token(cli.jupiter_token) + .jupiter_timeout(Duration::from_secs(cli.jupiter_timeout_secs)) + .jupiter_v6_url(cli.jupiter_v6_url.clone()) + .jupiter_token(cli.jupiter_token.clone()) .transaction_builder_config( TransactionBuilderConfig::builder() .priority_fee_provider(prio_provider) @@ -89,7 +97,7 @@ async fn main() -> anyhow::Result<()> { .build() .unwrap(), ) - .override_send_transaction_urls(cli.override_send_transaction_url) + .override_send_transaction_urls(cli.override_send_transaction_url.clone()) .build() .unwrap(); @@ -207,17 +215,18 @@ async fn main() -> anyhow::Result<()> { compute_limit_for_liq_ix: cli.compute_limit_for_liquidation, max_cu_per_transaction: 1_000_000, refresh_timeout: Duration::from_secs(cli.liquidation_refresh_timeout_secs as u64), - only_allowed_tokens: cli_args::cli_to_hashset::(cli.only_allow_tokens), - forbidden_tokens: cli_args::cli_to_hashset::(cli.forbidden_tokens), + only_allowed_tokens: cli_args::cli_to_hashset::(cli.only_allow_tokens.clone()), + forbidden_tokens: cli_args::cli_to_hashset::(cli.forbidden_tokens.clone()), only_allowed_perp_markets: cli_args::cli_to_hashset::( - cli.liquidation_only_allow_perp_markets, + cli.liquidation_only_allow_perp_markets.clone(), ), forbidden_perp_markets: cli_args::cli_to_hashset::( - cli.liquidation_forbidden_perp_markets, + cli.liquidation_forbidden_perp_markets.clone(), ), }; let tcs_config = trigger_tcs::Config { + refresh_timeout: Duration::from_secs(cli.tcs_refresh_timeout_secs), min_health_ratio: cli.min_health_ratio, max_trigger_quote_amount: (cli.tcs_max_trigger_amount * 1e6) as u64, compute_limit_for_trigger: cli.compute_limit_for_tcs, @@ -234,17 +243,19 @@ async fn main() -> anyhow::Result<()> { forbidden_tokens: liq_config.forbidden_tokens.clone(), }; - let mut rebalance_interval = tokio::time::interval(Duration::from_secs(30)); let (rebalance_trigger_sender, rebalance_trigger_receiver) = async_channel::bounded::<()>(1); + let (tx_tcs_trigger_sender, tx_tcs_trigger_receiver) = async_channel::unbounded::<()>(); + let (tx_liq_trigger_sender, tx_liq_trigger_receiver) = async_channel::unbounded::<()>(); let rebalance_config = rebalance::Config { enabled: cli.rebalance == BoolArg::True, slippage_bps: cli.rebalance_slippage_bps, borrow_settle_excess: (1f64 + cli.rebalance_borrow_settle_excess).max(1f64), refresh_timeout: Duration::from_secs(cli.rebalance_refresh_timeout_secs), jupiter_version: cli.jupiter_version.into(), - skip_tokens: cli.rebalance_skip_tokens.unwrap_or_default(), + skip_tokens: cli.rebalance_skip_tokens.clone().unwrap_or(Vec::new()), alternate_jupiter_route_tokens: cli .rebalance_alternate_jupiter_route_tokens + .clone() .unwrap_or_default(), allow_withdraws: signer_is_owner, }; @@ -257,23 +268,39 @@ async fn main() -> anyhow::Result<()> { config: rebalance_config, }); - let mut liquidation = Box::new(LiquidationState { + let liquidation = Box::new(LiquidationState { + mango_client: mango_client.clone(), + account_fetcher: account_fetcher.clone(), + liquidation_config: liq_config, + errors: Arc::new(RwLock::new( + ErrorTracking::builder() + .skip_threshold(2) + .skip_threshold_for_type(LiqErrorType::Liq, 5) + .skip_duration(Duration::from_secs(120)) + .build()?, + )), + oracle_errors: Arc::new(RwLock::new( + ErrorTracking::builder() + .skip_threshold(1) + .skip_duration(Duration::from_secs( + cli.skip_oracle_error_in_logs_duration_secs, + )) + .build()?, + )), + }); + + let tcs = Box::new(TcsState { mango_client: mango_client.clone(), account_fetcher, - liquidation_config: liq_config, trigger_tcs_config: tcs_config, token_swap_info: token_swap_info_updater.clone(), - errors: ErrorTracking::builder() - .skip_threshold(2) - .skip_threshold_for_type(LiqErrorType::Liq, 5) - .skip_duration(Duration::from_secs(120)) - .build()?, - oracle_errors: ErrorTracking::builder() - .skip_threshold(1) - .skip_duration(Duration::from_secs( - cli.skip_oracle_error_in_logs_duration_secs, - )) - .build()?, + errors: Arc::new(RwLock::new( + ErrorTracking::builder() + .skip_threshold(2) + .skip_threshold_for_type(LiqErrorType::Liq, 5) + .skip_duration(Duration::from_secs(120)) + .build()?, + )), }); info!("main loop"); @@ -374,126 +401,83 @@ async fn main() -> anyhow::Result<()> { } }); + let mut optional_jobs = vec![]; + // Could be refactored to only start the below jobs when the first snapshot is done. // But need to take care to abort if the above job aborts beforehand. + if cli.rebalance == BoolArg::True { + let rebalance_job = + spawn_rebalance_job(&shared_state, rebalance_trigger_receiver, rebalancer); + optional_jobs.push(rebalance_job); + } - let rebalance_job = tokio::spawn({ - let shared_state = shared_state.clone(); - async move { - loop { - tokio::select! { - _ = rebalance_interval.tick() => {} - _ = rebalance_trigger_receiver.recv() => {} - } - if !shared_state.read().unwrap().one_snapshot_done { - continue; - } - if let Err(err) = rebalancer.zero_all_non_quote().await { - error!("failed to rebalance liqor: {:?}", err); + if cli.liquidation_enabled == BoolArg::True { + let liquidation_job = liquidation_state::spawn_liquidation_job( + &cli, + &shared_state, + tx_liq_trigger_sender.clone(), + liquidation.clone(), + &metrics, + ); + optional_jobs.push(liquidation_job); + } - // Workaround: We really need a sequence enforcer in the liquidator since we don't want to - // accidentally send a similar tx again when we incorrectly believe an earlier one got forked - // off. For now, hard sleep on error to avoid the most frequent error cases. - tokio::time::sleep(Duration::from_secs(10)).await; - } - } - } - }); + if cli.take_tcs == BoolArg::True { + let tcs_job = tcs_state::spawn_tcs_job( + &cli, + &shared_state, + tx_tcs_trigger_sender.clone(), + tcs.clone(), + &metrics, + ); + optional_jobs.push(tcs_job); + } - let liquidation_job = tokio::spawn({ - let mut interval = - mango_v4_client::delay_interval(Duration::from_millis(cli.check_interval_ms)); - let mut metric_liquidation_check = metrics.register_latency("liquidation_check".into()); - let mut metric_liquidation_start_end = - metrics.register_latency("liquidation_start_end".into()); + if cli.liquidation_enabled == BoolArg::True || cli.take_tcs == BoolArg::True { + let mut tx_sender_jobs = tx_sender::spawn_tx_senders_job( + cli.max_parallel_operations, + cli.liquidation_enabled == BoolArg::True, + tx_liq_trigger_receiver, + tx_tcs_trigger_receiver, + tx_tcs_trigger_sender, + rebalance_trigger_sender, + shared_state.clone(), + liquidation, + tcs, + ); + optional_jobs.append(&mut tx_sender_jobs); + } - let mut liquidation_start_time = None; - let mut tcs_start_time = None; + if cli.telemetry == BoolArg::True { + optional_jobs.push(spawn_telemetry_job(&cli, mango_client.clone())); + } - let shared_state = shared_state.clone(); - async move { - loop { - interval.tick().await; + let token_swap_info_job = + spawn_token_swap_refresh_job(&cli, shared_state, token_swap_info_updater); + let check_changes_for_abort_job = spawn_context_change_watchdog_job(mango_client.clone()); - let account_addresses = { - let mut state = shared_state.write().unwrap(); - if !state.one_snapshot_done { - // discard first latency info as it will skew data too much - state.oldest_chain_event_reception_time = None; - continue; - } - if state.oldest_chain_event_reception_time.is_none() - && liquidation_start_time.is_none() - { - // no new update, skip computing - continue; - } + let mut jobs: futures::stream::FuturesUnordered<_> = + vec![data_job, token_swap_info_job, check_changes_for_abort_job] + .into_iter() + .chain(optional_jobs) + .chain(prio_jobs.into_iter()) + .collect(); + jobs.next().await; - state.mango_accounts.iter().cloned().collect_vec() - }; + error!("a critical job aborted, exiting"); + Ok(()) +} - liquidation.errors.update(); - liquidation.oracle_errors.update(); - - if liquidation_start_time.is_none() { - liquidation_start_time = Some(Instant::now()); - } - - let liquidated = liquidation - .maybe_liquidate_one(account_addresses.iter()) - .await; - - if !liquidated { - // This will be incorrect if we liquidate the last checked account - // (We will wait for next full run, skewing latency metrics) - // Probability is very low, might not need to be fixed - - let mut state = shared_state.write().unwrap(); - let reception_time = state.oldest_chain_event_reception_time.unwrap(); - let current_time = Instant::now(); - - state.oldest_chain_event_reception_time = None; - - metric_liquidation_check.push(current_time - reception_time); - metric_liquidation_start_end - .push(current_time - liquidation_start_time.unwrap()); - liquidation_start_time = None; - } - - let mut took_tcs = false; - if !liquidated && cli.take_tcs == BoolArg::True { - tcs_start_time = Some(tcs_start_time.unwrap_or(Instant::now())); - - took_tcs = liquidation - .maybe_take_token_conditional_swap(account_addresses.iter()) - .await - .unwrap_or_else(|err| { - error!("error during maybe_take_token_conditional_swap: {err}"); - false - }); - - if !took_tcs { - let current_time = Instant::now(); - let mut metric_tcs_start_end = - metrics.register_latency("tcs_start_end".into()); - metric_tcs_start_end.push(current_time - tcs_start_time.unwrap()); - tcs_start_time = None; - } - } - - if liquidated || took_tcs { - rebalance_trigger_sender.send_unless_full(()).unwrap(); - } - } - } - }); - - let token_swap_info_job = tokio::spawn({ +fn spawn_token_swap_refresh_job( + cli: &Cli, + shared_state: Arc>, + token_swap_info_updater: Arc, +) -> JoinHandle<()> { + tokio::spawn({ let mut interval = mango_v4_client::delay_interval(Duration::from_secs( cli.token_swap_refresh_interval_secs, )); let mut startup_wait = mango_v4_client::delay_interval(Duration::from_secs(1)); - let shared_state = shared_state.clone(); async move { loop { if !shared_state.read().unwrap().one_snapshot_done { @@ -517,41 +501,56 @@ async fn main() -> anyhow::Result<()> { token_swap_info_updater.log_all(); } } - }); + }) +} - let check_changes_for_abort_job = - tokio::spawn(MangoClient::loop_check_for_context_changes_and_abort( - mango_client.clone(), - Duration::from_secs(300), - )); +fn spawn_context_change_watchdog_job(mango_client: Arc) -> JoinHandle<()> { + tokio::spawn(MangoClient::loop_check_for_context_changes_and_abort( + mango_client, + Duration::from_secs(300), + )) +} - if cli.telemetry == BoolArg::True { - tokio::spawn(telemetry::report_regularly( - mango_client, - cli.min_health_ratio, - )); - } +fn spawn_telemetry_job(cli: &Cli, mango_client: Arc) -> JoinHandle<()> { + tokio::spawn(telemetry::report_regularly( + mango_client, + cli.min_health_ratio, + )) +} - use cli_args::{BoolArg, Cli, CliDotenv}; - use futures::StreamExt; - let mut jobs: futures::stream::FuturesUnordered<_> = vec![ - data_job, - rebalance_job, - liquidation_job, - token_swap_info_job, - check_changes_for_abort_job, - ] - .into_iter() - .chain(prio_jobs.into_iter()) - .collect(); - jobs.next().await; +fn spawn_rebalance_job( + shared_state: &Arc>, + rebalance_trigger_receiver: async_channel::Receiver<()>, + rebalancer: Arc, +) -> JoinHandle<()> { + let mut rebalance_interval = tokio::time::interval(Duration::from_secs(30)); - error!("a critical job aborted, exiting"); - Ok(()) + tokio::spawn({ + let shared_state = shared_state.clone(); + async move { + loop { + tokio::select! { + _ = rebalance_interval.tick() => {} + _ = rebalance_trigger_receiver.recv() => {} + } + if !shared_state.read().unwrap().one_snapshot_done { + continue; + } + if let Err(err) = rebalancer.zero_all_non_quote().await { + error!("failed to rebalance liqor: {:?}", err); + + // Workaround: We really need a sequence enforcer in the liquidator since we don't want to + // accidentally send a similar tx again when we incorrectly believe an earlier one got forked + // off. For now, hard sleep on error to avoid the most frequent error cases. + tokio::time::sleep(Duration::from_secs(10)).await; + } + } + } + }) } #[derive(Default)] -struct SharedState { +pub struct SharedState { /// Addresses of the MangoAccounts belonging to the mango program. /// Needed to check health of them all when the cache updates. mango_accounts: HashSet, @@ -561,6 +560,18 @@ struct SharedState { /// Oldest chain event not processed yet oldest_chain_event_reception_time: Option, + + /// Liquidation candidates (locally identified as liquidatable) + liquidation_candidates_accounts: indexmap::set::IndexSet, + + /// Interesting TCS that should be triggered + interesting_tcs: indexmap::set::IndexSet<(Pubkey, u64, u64)>, + + /// Liquidation currently being processed by a worker + processing_liquidation: HashSet, + + // TCS currently being processed by a worker + processing_tcs: HashSet<(Pubkey, u64, u64)>, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -584,218 +595,6 @@ impl std::fmt::Display for LiqErrorType { } } -struct LiquidationState { - mango_client: Arc, - account_fetcher: Arc, - token_swap_info: Arc, - liquidation_config: liquidate::Config, - trigger_tcs_config: trigger_tcs::Config, - - errors: ErrorTracking, - oracle_errors: ErrorTracking, -} - -impl LiquidationState { - async fn maybe_liquidate_one<'b>( - &mut self, - accounts_iter: impl Iterator, - ) -> bool { - use rand::seq::SliceRandom; - - let mut accounts = accounts_iter.collect::>(); - { - let mut rng = rand::thread_rng(); - accounts.shuffle(&mut rng); - } - - for pubkey in accounts { - if self - .maybe_liquidate_and_log_error(pubkey) - .await - .unwrap_or(false) - { - return true; - } - } - - false - } - - async fn maybe_liquidate_and_log_error(&mut self, pubkey: &Pubkey) -> anyhow::Result { - let now = Instant::now(); - let error_tracking = &mut self.errors; - - // Skip a pubkey if there've been too many errors recently - if let Some(error_entry) = - error_tracking.had_too_many_errors(LiqErrorType::Liq, pubkey, now) - { - trace!( - %pubkey, - error_entry.count, - "skip checking account for liquidation, had errors recently", - ); - return Ok(false); - } - - let result = liquidate::maybe_liquidate_account( - &self.mango_client, - &self.account_fetcher, - pubkey, - &self.liquidation_config, - ) - .await; - - if let Err(err) = result.as_ref() { - if let Some((ti, ti_name)) = err.try_unwrap_oracle_error() { - if self - .oracle_errors - .had_too_many_errors(LiqErrorType::Liq, &ti, Instant::now()) - .is_none() - { - warn!( - "{:?} recording oracle error for token {} {}", - chrono::offset::Utc::now(), - ti_name, - ti - ); - } - - self.oracle_errors - .record(LiqErrorType::Liq, &ti, err.to_string()); - return result; - } - - // Keep track of pubkeys that had errors - error_tracking.record(LiqErrorType::Liq, pubkey, err.to_string()); - - // Not all errors need to be raised to the user's attention. - let mut is_error = true; - - // Simulation errors due to liqee precondition failures on the liquidation instructions - // will commonly happen if our liquidator is late or if there are chain forks. - match err.downcast_ref::() { - Some(MangoClientError::SendTransactionPreflightFailure { logs, .. }) => { - if logs.iter().any(|line| { - line.contains("HealthMustBeNegative") || line.contains("IsNotBankrupt") - }) { - is_error = false; - } - } - _ => {} - }; - if is_error { - error!("liquidating account {}: {:?}", pubkey, err); - } else { - trace!("liquidating account {}: {:?}", pubkey, err); - } - } else { - error_tracking.clear(LiqErrorType::Liq, pubkey); - } - - result - } - - async fn maybe_take_token_conditional_swap( - &mut self, - accounts_iter: impl Iterator, - ) -> anyhow::Result { - let accounts = accounts_iter.collect::>(); - - let now = Instant::now(); - let now_ts: u64 = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH)? - .as_secs(); - - let tcs_context = trigger_tcs::Context { - mango_client: self.mango_client.clone(), - account_fetcher: self.account_fetcher.clone(), - token_swap_info: self.token_swap_info.clone(), - config: self.trigger_tcs_config.clone(), - jupiter_quote_cache: Arc::new(trigger_tcs::JupiterQuoteCache::default()), - now_ts, - }; - - // Find interesting (pubkey, tcsid, volume) - let mut interesting_tcs = Vec::with_capacity(accounts.len()); - for pubkey in accounts.iter() { - if let Some(error_entry) = - self.errors - .had_too_many_errors(LiqErrorType::TcsCollectionHard, pubkey, now) - { - trace!( - %pubkey, - error_entry.count, - "skip checking account for tcs, had errors recently", - ); - continue; - } - - match tcs_context.find_interesting_tcs_for_account(pubkey) { - Ok(v) => { - self.errors.clear(LiqErrorType::TcsCollectionHard, pubkey); - if v.is_empty() { - self.errors - .clear(LiqErrorType::TcsCollectionPartial, pubkey); - self.errors.clear(LiqErrorType::TcsExecution, pubkey); - } else if v.iter().all(|it| it.is_ok()) { - self.errors - .clear(LiqErrorType::TcsCollectionPartial, pubkey); - } else { - for it in v.iter() { - if let Err(e) = it { - self.errors.record( - LiqErrorType::TcsCollectionPartial, - pubkey, - e.to_string(), - ); - } - } - } - interesting_tcs.extend(v.iter().filter_map(|it| it.as_ref().ok())); - } - Err(e) => { - self.errors - .record(LiqErrorType::TcsCollectionHard, pubkey, e.to_string()); - } - } - } - if interesting_tcs.is_empty() { - return Ok(false); - } - - let (txsigs, mut changed_pubkeys) = tcs_context - .execute_tcs(&mut interesting_tcs, &mut self.errors) - .await?; - for pubkey in changed_pubkeys.iter() { - self.errors.clear(LiqErrorType::TcsExecution, pubkey); - } - if txsigs.is_empty() { - return Ok(false); - } - changed_pubkeys.push(self.mango_client.mango_account_address); - - // Force a refresh of affected accounts - let slot = self - .account_fetcher - .transaction_max_slot(&txsigs) - .await - .context("transaction_max_slot")?; - if let Err(e) = self - .account_fetcher - .refresh_accounts_via_rpc_until_slot( - &changed_pubkeys, - slot, - self.liquidation_config.refresh_timeout, - ) - .await - { - info!(slot, "could not refresh after tcs execution: {}", e); - } - - Ok(true) - } -} - fn start_chain_data_metrics(chain: Arc>, metrics: &metrics::Metrics) { let mut interval = mango_v4_client::delay_interval(Duration::from_secs(600)); diff --git a/bin/liquidator/src/tcs_state.rs b/bin/liquidator/src/tcs_state.rs new file mode 100644 index 000000000..6434a212e --- /dev/null +++ b/bin/liquidator/src/tcs_state.rs @@ -0,0 +1,218 @@ +use crate::cli_args::Cli; +use crate::metrics::Metrics; +use crate::token_swap_info::TokenSwapInfoUpdater; +use crate::{trigger_tcs, LiqErrorType, SharedState}; +use anchor_lang::prelude::Pubkey; +use anyhow::Context; +use itertools::Itertools; +use mango_v4_client::error_tracking::ErrorTracking; +use mango_v4_client::{chain_data, MangoClient}; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; +use tokio::task::JoinHandle; +use tracing::{error, info, trace}; + +pub fn spawn_tcs_job( + cli: &Cli, + shared_state: &Arc>, + tx_trigger_sender: async_channel::Sender<()>, + mut tcs: Box, + metrics: &Metrics, +) -> JoinHandle<()> { + tokio::spawn({ + let mut interval = + mango_v4_client::delay_interval(Duration::from_millis(cli.tcs_check_interval_ms)); + let mut tcs_start_time = None; + let mut metric_tcs_start_end = metrics.register_latency("tcs_start_end".into()); + let shared_state = shared_state.clone(); + + async move { + loop { + interval.tick().await; + + let account_addresses = { + let state = shared_state.write().unwrap(); + if !state.one_snapshot_done { + continue; + } + state.mango_accounts.iter().cloned().collect_vec() + }; + + tcs.errors.write().unwrap().update(); + + tcs_start_time = Some(tcs_start_time.unwrap_or(Instant::now())); + + let found_candidates = tcs + .find_candidates(account_addresses.iter(), |candidate| { + if shared_state + .write() + .unwrap() + .interesting_tcs + .insert(candidate) + { + tx_trigger_sender.try_send(())?; + } + + Ok(()) + }) + .await + .unwrap_or_else(|err| { + error!("error during find_candidate: {err}"); + 0 + }); + + if found_candidates > 0 { + tracing::debug!("found {} candidates for triggering", found_candidates); + } + + let current_time = Instant::now(); + metric_tcs_start_end.push(current_time - tcs_start_time.unwrap()); + tcs_start_time = None; + } + } + }) +} + +#[derive(Clone)] +pub struct TcsState { + pub mango_client: Arc, + pub account_fetcher: Arc, + pub token_swap_info: Arc, + pub trigger_tcs_config: trigger_tcs::Config, + + pub errors: Arc>>, +} + +impl TcsState { + async fn find_candidates( + &mut self, + accounts_iter: impl Iterator, + action: impl Fn((Pubkey, u64, u64)) -> anyhow::Result<()>, + ) -> anyhow::Result { + let accounts = accounts_iter.collect::>(); + + let now = Instant::now(); + let now_ts: u64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + + let tcs_context = trigger_tcs::Context { + mango_client: self.mango_client.clone(), + account_fetcher: self.account_fetcher.clone(), + token_swap_info: self.token_swap_info.clone(), + config: self.trigger_tcs_config.clone(), + jupiter_quote_cache: Arc::new(trigger_tcs::JupiterQuoteCache::default()), + now_ts, + }; + + let mut found_counter = 0; + + // Find interesting (pubkey, tcsid, volume) + for pubkey in accounts.iter() { + if let Some(error_entry) = self.errors.read().unwrap().had_too_many_errors( + LiqErrorType::TcsCollectionHard, + pubkey, + now, + ) { + trace!( + %pubkey, + error_entry.count, + "skip checking account for tcs, had errors recently", + ); + continue; + } + + let candidates = tcs_context.find_interesting_tcs_for_account(pubkey); + let mut error_guard = self.errors.write().unwrap(); + + match candidates { + Ok(v) => { + error_guard.clear(LiqErrorType::TcsCollectionHard, pubkey); + if v.is_empty() { + error_guard.clear(LiqErrorType::TcsCollectionPartial, pubkey); + error_guard.clear(LiqErrorType::TcsExecution, pubkey); + } else if v.iter().all(|it| it.is_ok()) { + error_guard.clear(LiqErrorType::TcsCollectionPartial, pubkey); + } else { + for it in v.iter() { + if let Err(e) = it { + error_guard.record( + LiqErrorType::TcsCollectionPartial, + pubkey, + e.to_string(), + ); + } + } + } + for interesting_candidate_res in v.iter() { + if let Ok(interesting_candidate) = interesting_candidate_res { + action(*interesting_candidate).expect("failed to send TCS candidate"); + found_counter += 1; + } + } + } + Err(e) => { + error_guard.record(LiqErrorType::TcsCollectionHard, pubkey, e.to_string()); + } + } + } + + return Ok(found_counter); + } + + pub async fn maybe_take_token_conditional_swap( + &mut self, + mut interesting_tcs: Vec<(Pubkey, u64, u64)>, + ) -> anyhow::Result { + let now_ts: u64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + + let tcs_context = trigger_tcs::Context { + mango_client: self.mango_client.clone(), + account_fetcher: self.account_fetcher.clone(), + token_swap_info: self.token_swap_info.clone(), + config: self.trigger_tcs_config.clone(), + jupiter_quote_cache: Arc::new(trigger_tcs::JupiterQuoteCache::default()), + now_ts, + }; + + if interesting_tcs.is_empty() { + return Ok(false); + } + + let (txsigs, mut changed_pubkeys) = tcs_context + .execute_tcs(&mut interesting_tcs, self.errors.clone()) + .await?; + for pubkey in changed_pubkeys.iter() { + self.errors + .write() + .unwrap() + .clear(LiqErrorType::TcsExecution, pubkey); + } + if txsigs.is_empty() { + return Ok(false); + } + changed_pubkeys.push(self.mango_client.mango_account_address); + + // Force a refresh of affected accounts + let slot = self + .account_fetcher + .transaction_max_slot(&txsigs) + .await + .context("transaction_max_slot")?; + if let Err(e) = self + .account_fetcher + .refresh_accounts_via_rpc_until_slot( + &changed_pubkeys, + slot, + self.trigger_tcs_config.refresh_timeout, + ) + .await + { + info!(slot, "could not refresh after tcs execution: {}", e); + } + + Ok(true) + } +} diff --git a/bin/liquidator/src/trigger_tcs.rs b/bin/liquidator/src/trigger_tcs.rs index d42104846..b5346d9af 100644 --- a/bin/liquidator/src/trigger_tcs.rs +++ b/bin/liquidator/src/trigger_tcs.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::time::Duration; use std::{ collections::HashMap, pin::Pin, @@ -15,6 +16,7 @@ use mango_v4::{ use mango_v4_client::{chain_data, jupiter, MangoClient, TransactionBuilder}; use anyhow::Context as AnyhowContext; +use mango_v4::accounts_ix::HealthCheckKind::MaintRatio; use solana_sdk::signature::Signature; use tracing::*; use {fixed::types::I80F48, solana_sdk::pubkey::Pubkey}; @@ -55,6 +57,7 @@ pub enum Mode { #[derive(Clone)] pub struct Config { + pub refresh_timeout: Duration, pub min_health_ratio: f64, pub max_trigger_quote_amount: u64, pub compute_limit_for_trigger: u32, @@ -1000,7 +1003,7 @@ impl Context { pub async fn execute_tcs( &self, tcs: &mut [(Pubkey, u64, u64)], - error_tracking: &mut ErrorTracking, + error_tracking: Arc>>, ) -> anyhow::Result<(Vec, Vec)> { use rand::distributions::{Distribution, WeightedError, WeightedIndex}; @@ -1049,7 +1052,7 @@ impl Context { } Err(e) => { trace!(%result.pubkey, "preparation error {:?}", e); - error_tracking.record( + error_tracking.write().unwrap().record( LiqErrorType::TcsExecution, &result.pubkey, e.to_string(), @@ -1093,7 +1096,7 @@ impl Context { }; // start the new one - if let Some(job) = self.prepare_job(&pubkey, tcs_id, volume, error_tracking) { + if let Some(job) = self.prepare_job(&pubkey, tcs_id, volume, error_tracking.clone()) { pending_volume += volume; pending.push(job); } @@ -1130,7 +1133,11 @@ impl Context { Ok(v) => Some((pubkey, v)), Err(err) => { trace!(%pubkey, "execution error {:?}", err); - error_tracking.record(LiqErrorType::TcsExecution, &pubkey, err.to_string()); + error_tracking.write().unwrap().record( + LiqErrorType::TcsExecution, + &pubkey, + err.to_string(), + ); None } }); @@ -1145,12 +1152,14 @@ impl Context { pubkey: &Pubkey, tcs_id: u64, volume: u64, - error_tracking: &ErrorTracking, + error_tracking: Arc>>, ) -> Option + Send>>> { // Skip a pubkey if there've been too many errors recently - if let Some(error_entry) = - error_tracking.had_too_many_errors(LiqErrorType::TcsExecution, pubkey, Instant::now()) - { + if let Some(error_entry) = error_tracking.read().unwrap().had_too_many_errors( + LiqErrorType::TcsExecution, + pubkey, + Instant::now(), + ) { trace!( "skip checking for tcs on account {pubkey}, had {} errors recently", error_entry.count @@ -1225,6 +1234,27 @@ impl Context { .instructions .append(&mut trigger_ixs.instructions); + let (_, tcs) = liqee.token_conditional_swap_by_id(pending.tcs_id)?; + let affected_tokens = allowed_tokens + .iter() + .chain(&[tcs.buy_token_index, tcs.sell_token_index]) + .copied() + .collect_vec(); + let liqor = &self.mango_client.mango_account().await?; + tx_builder.instructions.append( + &mut self + .mango_client + .health_check_instruction( + liqor, + self.config.min_health_ratio, + affected_tokens, + vec![], + MaintRatio, + ) + .await? + .instructions, + ); + let txsig = tx_builder .send_and_confirm(&self.mango_client.client) .await?; diff --git a/bin/liquidator/src/tx_sender.rs b/bin/liquidator/src/tx_sender.rs new file mode 100644 index 000000000..05027f678 --- /dev/null +++ b/bin/liquidator/src/tx_sender.rs @@ -0,0 +1,241 @@ +use crate::liquidation_state::LiquidationState; +use crate::tcs_state::TcsState; +use crate::SharedState; +use anchor_lang::prelude::Pubkey; +use async_channel::{Receiver, Sender}; +use mango_v4_client::AsyncChannelSendUnlessFull; +use std::sync::{Arc, RwLock}; +use tokio::task::JoinHandle; +use tracing::{debug, error, trace}; + +enum WorkerTask { + Liquidation(Pubkey), + Tcs(Vec<(Pubkey, u64, u64)>), + + // Given two workers: #0=LIQ_only, #1=LIQ+TCS + // If they are both busy, and the scanning jobs find a new TCS and a new LIQ candidates and enqueue them in the channel + // Then if #1 wake up first, it will consume the LIQ candidate (LIQ always have priority) + // Then when #0 wake up, it will not find any LIQ candidate, and would not do anything (it won't take a TCS) + // But if we do nothing, #1 would never wake up again (no new task in channel) + // So we use this `GiveUpTcs` that will be handled by #0 by queuing a new signal the channel and will wake up #1 again + GiveUpTcs, + + // Can happen if TCS is batched (2 TCS enqueued, 2 workers waken, but first one take both tasks) + NoWork, +} + +pub fn spawn_tx_senders_job( + max_parallel_operations: u64, + enable_liquidation: bool, + tx_liq_trigger_receiver: Receiver<()>, + tx_tcs_trigger_receiver: Receiver<()>, + tx_tcs_trigger_sender: Sender<()>, + rebalance_trigger_sender: Sender<()>, + shared_state: Arc>, + liquidation: Box, + tcs: Box, +) -> Vec> { + if max_parallel_operations < 1 { + error!("max_parallel_operations must be >= 1"); + std::process::exit(1) + } + + let reserve_one_worker_for_liquidation = max_parallel_operations > 1 && enable_liquidation; + + let workers: Vec> = (0..max_parallel_operations) + .map(|worker_id| { + tokio::spawn({ + let shared_state = shared_state.clone(); + let receiver_liq = tx_liq_trigger_receiver.clone(); + let receiver_tcs = tx_tcs_trigger_receiver.clone(); + let sender_tcs = tx_tcs_trigger_sender.clone(); + let rebalance_trigger_sender = rebalance_trigger_sender.clone(); + let liquidation = liquidation.clone(); + let tcs = tcs.clone(); + async move { + worker_loop( + shared_state, + receiver_liq, + receiver_tcs, + sender_tcs, + rebalance_trigger_sender, + liquidation, + tcs, + worker_id, + reserve_one_worker_for_liquidation && worker_id == 0, + ) + .await; + } + }) + }) + .collect(); + + workers +} + +async fn worker_loop( + shared_state: Arc>, + liq_receiver: Receiver<()>, + tcs_receiver: Receiver<()>, + tcs_sender: Sender<()>, + rebalance_trigger_sender: Sender<()>, + mut liquidation: Box, + mut tcs: Box, + id: u64, + only_liquidation: bool, +) { + loop { + debug!( + "Worker #{} waiting for task (only_liq={})", + id, only_liquidation + ); + + let _ = if only_liquidation { + liq_receiver.recv().await.expect("receive failed") + } else { + tokio::select!( + _ = liq_receiver.recv() => {}, + _ = tcs_receiver.recv() => {}, + ) + }; + + // a task must be available to process + // find it in global shared state, and mark it as processing + let task = worker_pull_task(&shared_state, id, only_liquidation) + .expect("Worker woke up but has nothing to do"); + + // execute the task + let need_rebalancing = match &task { + WorkerTask::Liquidation(l) => worker_execute_liquidation(&mut liquidation, *l).await, + WorkerTask::Tcs(t) => worker_execute_tcs(&mut tcs, t.clone()).await, + WorkerTask::GiveUpTcs => worker_give_up_tcs(&tcs_sender).await, + WorkerTask::NoWork => false, + }; + + if need_rebalancing { + rebalance_trigger_sender.send_unless_full(()).unwrap(); + } + + // remove from shared state + worker_finalize_task(&shared_state, id, task, need_rebalancing); + } +} + +async fn worker_give_up_tcs(sender: &Sender<()>) -> bool { + sender.send(()).await.expect("sending task failed"); + false +} + +async fn worker_execute_tcs(tcs: &mut Box, candidates: Vec<(Pubkey, u64, u64)>) -> bool { + tcs.maybe_take_token_conditional_swap(candidates) + .await + .unwrap_or(false) +} + +async fn worker_execute_liquidation( + liquidation: &mut Box, + candidate: Pubkey, +) -> bool { + liquidation + .maybe_liquidate_and_log_error(&candidate) + .await + .unwrap_or(false) +} + +fn worker_pull_task( + shared_state: &Arc>, + id: u64, + only_liquidation: bool, +) -> anyhow::Result { + let mut writer = shared_state.write().unwrap(); + + // print out list of all task for debugging + for x in &writer.liquidation_candidates_accounts { + if !writer.processing_liquidation.contains(x) { + trace!(" - LIQ {:?}", x); + } + } + + // next liq task to execute + if let Some(liq_candidate) = writer + .liquidation_candidates_accounts + .iter() + .find(|x| !writer.processing_liquidation.contains(x)) + .copied() + { + debug!("worker #{} got a liq candidate -> {}", id, liq_candidate); + writer.processing_liquidation.insert(liq_candidate); + return Ok(WorkerTask::Liquidation(liq_candidate)); + } + + let tcs_todo = writer.interesting_tcs.len() - writer.processing_tcs.len(); + + if only_liquidation { + debug!("worker #{} giving up TCS (todo count: {})", id, tcs_todo); + return Ok(WorkerTask::GiveUpTcs); + } + + for x in &writer.interesting_tcs { + if !writer.processing_tcs.contains(x) { + trace!(" - TCS {:?}", x); + } + } + + // next tcs task to execute + let max_tcs_batch_size = 20; + let tcs_candidates: Vec<(Pubkey, u64, u64)> = writer + .interesting_tcs + .iter() + .filter(|x| !writer.processing_tcs.contains(x)) + .take(max_tcs_batch_size) + .copied() + .collect(); + + for tcs_candidate in &tcs_candidates { + debug!( + "worker #{} got a tcs candidate -> {:?} (out of {})", + id, + tcs_candidate, + writer.interesting_tcs.len() + ); + writer.processing_tcs.insert(tcs_candidate.clone()); + } + + if tcs_candidates.len() > 0 { + Ok(WorkerTask::Tcs(tcs_candidates)) + } else { + debug!("worker #{} got nothing", id); + Ok(WorkerTask::NoWork) + } +} + +fn worker_finalize_task( + shared_state: &Arc>, + id: u64, + task: WorkerTask, + done: bool, +) { + let mut writer = shared_state.write().unwrap(); + match task { + WorkerTask::Liquidation(liq) => { + debug!( + "worker #{} - checked liq {:?} with success ? {}", + id, liq, done + ); + writer.liquidation_candidates_accounts.shift_remove(&liq); + writer.processing_liquidation.remove(&liq); + } + WorkerTask::Tcs(tcs_list) => { + for tcs in tcs_list { + debug!( + "worker #{} - checked tcs {:?} with success ? {}", + id, tcs, done + ); + writer.interesting_tcs.shift_remove(&tcs); + writer.processing_tcs.remove(&tcs); + } + } + WorkerTask::GiveUpTcs => {} + WorkerTask::NoWork => {} + } +} diff --git a/lib/client/src/client.rs b/lib/client/src/client.rs index b6e3243a8..65670a1c5 100644 --- a/lib/client/src/client.rs +++ b/lib/client/src/client.rs @@ -17,7 +17,9 @@ use futures::{stream, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use tracing::*; -use mango_v4::accounts_ix::{Serum3OrderType, Serum3SelfTradeBehavior, Serum3Side}; +use mango_v4::accounts_ix::{ + HealthCheckKind, Serum3OrderType, Serum3SelfTradeBehavior, Serum3Side, +}; use mango_v4::accounts_zerocopy::KeyedAccountSharedData; use mango_v4::health::HealthCache; use mango_v4::state::{ @@ -80,6 +82,12 @@ pub struct ClientConfig { #[builder(default = "Duration::from_secs(60)")] pub timeout: Duration, + /// Jupiter Timeout, defaults to 30s + /// + /// This timeout applies to jupiter requests. + #[builder(default = "Duration::from_secs(30)")] + pub jupiter_timeout: Duration, + #[builder(default)] pub transaction_builder_config: TransactionBuilderConfig, @@ -560,6 +568,48 @@ impl MangoClient { self.send_and_confirm_owner_tx(ixs.to_instructions()).await } + /// Assert that health of account is > N + pub async fn health_check_instruction( + &self, + account: &MangoAccountValue, + min_health_value: f64, + affected_tokens: Vec, + affected_perp_markets: Vec, + check_kind: HealthCheckKind, + ) -> anyhow::Result { + let (health_check_metas, health_cu) = self + .derive_health_check_remaining_account_metas( + account, + affected_tokens, + vec![], + affected_perp_markets, + ) + .await?; + + let ixs = PreparedInstructions::from_vec( + vec![Instruction { + program_id: mango_v4::id(), + accounts: { + let mut ams = anchor_lang::ToAccountMetas::to_account_metas( + &mango_v4::accounts::HealthCheck { + group: self.group(), + account: self.mango_account_address, + }, + None, + ); + ams.extend(health_check_metas.into_iter()); + ams + }, + data: anchor_lang::InstructionData::data(&mango_v4::instruction::HealthCheck { + min_health_value, + check_kind, + }), + }], + self.instruction_cu(health_cu), + ); + Ok(ixs) + } + /// Creates token withdraw instructions for the MangoClient's account/owner. /// The `account` state is passed in separately so changes during the tx can be /// accounted for when deriving health accounts. @@ -2094,7 +2144,10 @@ impl MangoClient { // jupiter pub fn jupiter_v6(&self) -> jupiter::v6::JupiterV6 { - jupiter::v6::JupiterV6 { mango_client: self } + jupiter::v6::JupiterV6 { + mango_client: self, + timeout_duration: self.client.config.jupiter_timeout, + } } pub fn jupiter(&self) -> jupiter::Jupiter { diff --git a/lib/client/src/jupiter/v6.rs b/lib/client/src/jupiter/v6.rs index 6c73fc741..ff92af5a0 100644 --- a/lib/client/src/jupiter/v6.rs +++ b/lib/client/src/jupiter/v6.rs @@ -1,4 +1,5 @@ use std::str::FromStr; +use std::time::Duration; use anchor_lang::prelude::Pubkey; use serde::{Deserialize, Serialize}; @@ -139,6 +140,7 @@ impl TryFrom<&AccountMeta> for solana_sdk::instruction::AccountMeta { pub struct JupiterV6<'a> { pub mango_client: &'a MangoClient, + pub timeout_duration: Duration, } impl<'a> JupiterV6<'a> { @@ -204,6 +206,7 @@ impl<'a> JupiterV6<'a> { .http_client .get(format!("{}/quote", config.jupiter_v6_url)) .query(&query_args) + .timeout(self.timeout_duration) .send() .await .context("quote request to jupiter")?; @@ -290,6 +293,7 @@ impl<'a> JupiterV6<'a> { destination_token_account: None, // default to user ata quote_response: quote.clone(), }) + .timeout(self.timeout_duration) .send() .await .context("swap transaction request to jupiter")?; diff --git a/lib/client/src/util.rs b/lib/client/src/util.rs index f54d6cac9..cd562e33c 100644 --- a/lib/client/src/util.rs +++ b/lib/client/src/util.rs @@ -20,19 +20,29 @@ impl AnyhowWrap for Result { /// Push to an async_channel::Sender and ignore if the channel is full pub trait AsyncChannelSendUnlessFull { /// Send a message if the channel isn't full - fn send_unless_full(&self, msg: T) -> Result<(), async_channel::SendError>; + fn send_unless_full(&self, msg: T) -> anyhow::Result<()>; } impl AsyncChannelSendUnlessFull for async_channel::Sender { - fn send_unless_full(&self, msg: T) -> Result<(), async_channel::SendError> { + fn send_unless_full(&self, msg: T) -> anyhow::Result<()> { use async_channel::*; match self.try_send(msg) { Ok(()) => Ok(()), - Err(TrySendError::Closed(msg)) => Err(async_channel::SendError(msg)), + Err(TrySendError::Closed(_)) => Err(anyhow::format_err!("channel is closed")), Err(TrySendError::Full(_)) => Ok(()), } } } +impl AsyncChannelSendUnlessFull for tokio::sync::mpsc::Sender { + fn send_unless_full(&self, msg: T) -> anyhow::Result<()> { + use tokio::sync::mpsc::*; + match self.try_send(msg) { + Ok(()) => Ok(()), + Err(error::TrySendError::Closed(_)) => Err(anyhow::format_err!("channel is closed")), + Err(error::TrySendError::Full(_)) => Ok(()), + } + } +} /// Like tokio::time::interval(), but with Delay as default MissedTickBehavior /// diff --git a/programs/mango-v4/tests/cases/test_health_check.rs b/programs/mango-v4/tests/cases/test_health_check.rs index b4624218b..bb0c792a3 100644 --- a/programs/mango-v4/tests/cases/test_health_check.rs +++ b/programs/mango-v4/tests/cases/test_health_check.rs @@ -7,8 +7,6 @@ use mango_v4::accounts_ix::{HealthCheck, HealthCheckKind}; use mango_v4::error::MangoError; use solana_sdk::transport::TransportError; -// TODO FAS - #[tokio::test] async fn test_health_check() -> Result<(), TransportError> { let context = TestContext::new().await; diff --git a/ts/client/scripts/liqtest/README.md b/ts/client/scripts/liqtest/README.md index 847fcfdaf..f1889404d 100644 --- a/ts/client/scripts/liqtest/README.md +++ b/ts/client/scripts/liqtest/README.md @@ -51,6 +51,7 @@ This creates a bunch of to-be-liquidated accounts as well as a LIQOR account. Run the liquidator on the group with the liqor account. Since devnet doesn't have any jupiter, run with + ``` JUPITER_VERSION=mock TCS_MODE=borrow-buy