diff --git a/bin/liquidator/src/liquidate.rs b/bin/liquidator/src/liquidate.rs index b81227a07..13dd0c91f 100644 --- a/bin/liquidator/src/liquidate.rs +++ b/bin/liquidator/src/liquidate.rs @@ -381,7 +381,6 @@ impl<'a> LiquidateHelper<'a> { price, self.liqor_min_health_ratio, ) - .await } async fn token_liq(&self) -> anyhow::Result> { diff --git a/bin/liquidator/src/main.rs b/bin/liquidator/src/main.rs index 9cf7b5925..7c1ff3119 100644 --- a/bin/liquidator/src/main.rs +++ b/bin/liquidator/src/main.rs @@ -532,13 +532,13 @@ struct SharedState { } #[derive(Clone)] -struct AccountErrorState { - count: u64, - last_at: std::time::Instant, +pub struct AccountErrorState { + pub count: u64, + pub last_at: std::time::Instant, } #[derive(Default)] -struct ErrorTracking { +pub struct ErrorTracking { accounts: HashMap, skip_threshold: u64, skip_duration: std::time::Duration, @@ -678,18 +678,55 @@ impl LiquidationState { &mut self, accounts_iter: impl Iterator, ) -> anyhow::Result<()> { - use rand::seq::SliceRandom; + let accounts = accounts_iter.collect::>(); - let mut accounts = accounts_iter.collect::>(); - { - let mut rng = rand::thread_rng(); - accounts.shuffle(&mut rng); + let now = std::time::Instant::now(); + let now_ts: u64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs() + .try_into()?; + + // Find interesting (pubkey, tcsid, volume) + let mut interesting_tcs = Vec::with_capacity(accounts.len()); + for pubkey in accounts.iter() { + if let Ok(mut v) = trigger_tcs::find_interesting_tcs_for_account( + pubkey, + &self.mango_client, + &self.account_fetcher, + &self.token_swap_info, + &self.tcs_errors, + now, + now_ts, + ) { + interesting_tcs.append(&mut v); + } + } + if interesting_tcs.is_empty() { + return Ok(()); } + // Repeatedly pick one randomly (volume-weighted) and try to execute + use rand::distributions::{Distribution, WeightedIndex}; + let weights = interesting_tcs.iter().map(|(_, _, volume)| { + (*volume) + .min(self.trigger_tcs_config.max_trigger_quote_amount) + .max(1) + }); + let mut dist = WeightedIndex::new(weights).unwrap(); let mut took_one = false; - for pubkey in accounts { + for i in 0..interesting_tcs.len() { + let (pubkey, tcs_id, _) = { + let mut rng = rand::thread_rng(); + let sample = dist.sample(&mut rng); + if i != interesting_tcs.len() - 1 { + // Would error if we updated the last weight to 0 + dist.update_weights(&[(sample, &0)])?; + } + &interesting_tcs[sample] + }; + if self - .maybe_take_conditional_swap_and_log_error(pubkey) + .maybe_take_conditional_swap_and_log_error(pubkey, *tcs_id) .await .unwrap_or(false) { @@ -697,6 +734,7 @@ impl LiquidationState { break; } } + if !took_one { return Ok(()); } @@ -710,6 +748,7 @@ impl LiquidationState { async fn maybe_take_conditional_swap_and_log_error( &mut self, pubkey: &Pubkey, + tcs_id: u64, ) -> anyhow::Result { let now = std::time::Instant::now(); let error_tracking = &mut self.tcs_errors; @@ -728,6 +767,7 @@ impl LiquidationState { &self.account_fetcher, &self.token_swap_info, pubkey, + tcs_id, &self.trigger_tcs_config, ) .await; diff --git a/bin/liquidator/src/trigger_tcs.rs b/bin/liquidator/src/trigger_tcs.rs index b017bb72a..2509669df 100644 --- a/bin/liquidator/src/trigger_tcs.rs +++ b/bin/liquidator/src/trigger_tcs.rs @@ -1,13 +1,25 @@ -use std::time::Duration; +use std::time::{Duration, Instant}; -use mango_v4::state::{MangoAccountValue, TokenConditionalSwap}; -use mango_v4_client::{chain_data, health_cache, JupiterSwapMode, MangoClient}; +use itertools::Itertools; +use mango_v4::{ + i80f48::ClampToInt, + state::{Bank, MangoAccountValue, TokenConditionalSwap}, +}; +use mango_v4_client::{chain_data, health_cache, JupiterSwapMode, MangoClient, MangoGroupContext}; -use rand::seq::SliceRandom; use tracing::*; use {anyhow::Context, fixed::types::I80F48, solana_sdk::pubkey::Pubkey}; -use crate::{token_swap_info, util}; +use crate::{token_swap_info, util, ErrorTracking}; + +// The liqee health ratio to aim for when executing tcs orders that are bigger +// than the liqee can support. +// +// The background here is that the program considers bringing the liqee health ratio +// below 1% as "the tcs was completely fulfilled" and then closes the tcs. +// Choosing a value too close to 0 is problematic, since then small oracle fluctuations +// could bring the final health below 0 and make the triggering invalid! +const TARGET_HEALTH_RATIO: f64 = 0.5; pub struct Config { pub min_health_ratio: f64, @@ -16,12 +28,15 @@ pub struct Config { pub mock_jupiter: bool, } -async fn tcs_is_in_price_range( - mango_client: &MangoClient, +fn tcs_is_in_price_range( + context: &MangoGroupContext, + account_fetcher: &chain_data::AccountFetcher, tcs: &TokenConditionalSwap, ) -> anyhow::Result { - let buy_token_price = mango_client.bank_oracle_price(tcs.buy_token_index).await?; - let sell_token_price = mango_client.bank_oracle_price(tcs.sell_token_index).await?; + let buy_bank = context.mint_info(tcs.buy_token_index).first_bank(); + let sell_bank = context.mint_info(tcs.sell_token_index).first_bank(); + let buy_token_price = account_fetcher.fetch_bank_price(&buy_bank)?; + let sell_token_price = account_fetcher.fetch_bank_price(&sell_bank)?; let base_price = (buy_token_price / sell_token_price).to_num(); if !tcs.price_in_range(base_price) { return Ok(false); @@ -57,15 +72,16 @@ fn tcs_has_plausible_premium( Ok(cost <= premium) } -async fn tcs_is_interesting( - mango_client: &MangoClient, +fn tcs_is_interesting( + context: &MangoGroupContext, + account_fetcher: &chain_data::AccountFetcher, tcs: &TokenConditionalSwap, token_swap_info: &token_swap_info::TokenSwapInfoUpdater, now_ts: u64, ) -> anyhow::Result { - Ok(!tcs.is_expired(now_ts) - && tcs_is_in_price_range(mango_client, tcs).await? - && tcs_has_plausible_premium(tcs, token_swap_info)?) + Ok(tcs.is_expired(now_ts) + || (tcs_is_in_price_range(context, account_fetcher, tcs)? + && tcs_has_plausible_premium(tcs, token_swap_info)?)) } #[allow(clippy::too_many_arguments)] @@ -89,7 +105,15 @@ async fn maybe_execute_token_conditional_swap_inner( // get a fresh account and re-check the tcs and health let liqee = account_fetcher.fetch_fresh_mango_account(pubkey).await?; let (_, tcs) = liqee.token_conditional_swap_by_id(tcs_id)?; - if !tcs_is_interesting(mango_client, tcs, token_swap_info, now_ts).await? { + if tcs.is_expired(now_ts) + || !tcs_is_interesting( + &mango_client.context, + account_fetcher, + tcs, + token_swap_info, + now_ts, + )? + { return Ok(false); } @@ -116,8 +140,16 @@ async fn execute_token_conditional_swap( let liqor_min_health_ratio = I80F48::from_num(config.min_health_ratio); // Compute the max viable swap (for liqor and liqee) and min it - let buy_token_price = mango_client.bank_oracle_price(tcs.buy_token_index).await?; - let sell_token_price = mango_client.bank_oracle_price(tcs.sell_token_index).await?; + let buy_bank = mango_client + .context + .mint_info(tcs.buy_token_index) + .first_bank(); + let sell_bank = mango_client + .context + .mint_info(tcs.sell_token_index) + .first_bank(); + let buy_token_price = account_fetcher.fetch_bank_price(&buy_bank)?; + let sell_token_price = account_fetcher.fetch_bank_price(&sell_bank)?; let base_price = buy_token_price / sell_token_price; let premium_price = tcs.premium_price(base_price.to_num()); @@ -126,11 +158,7 @@ async fn execute_token_conditional_swap( let max_take_quote = I80F48::from(config.max_trigger_quote_amount); - // The background here is that the program considers bringing the liqee health ratio - // below 1% as "the tcs was completely fulfilled" and then closes the tcs. - // Choosing a value too close to 0 is problematic, since then small oracle fluctuations - // could bring the final health below 0 and make the triggering invalid! - let liqee_target_health_ratio = I80F48::from_num(0.5); + let liqee_target_health_ratio = I80F48::from_num(TARGET_HEALTH_RATIO); let max_sell_token_to_liqor = util::max_swap_source( mango_client, @@ -140,8 +168,7 @@ async fn execute_token_conditional_swap( tcs.buy_token_index, I80F48::ONE / maker_price, liqee_target_health_ratio, - ) - .await? + )? .min(max_take_quote / sell_token_price) .floor() .to_num::() @@ -155,8 +182,7 @@ async fn execute_token_conditional_swap( tcs.sell_token_index, taker_price, liqor_min_health_ratio, - ) - .await? + )? .min(max_take_quote / buy_token_price) .floor() .to_num::() @@ -265,6 +291,7 @@ pub async fn maybe_execute_token_conditional_swap( account_fetcher: &chain_data::AccountFetcher, token_swap_info: &token_swap_info::TokenSwapInfoUpdater, pubkey: &Pubkey, + tcs_id: u64, config: &Config, ) -> anyhow::Result { let now_ts: u64 = std::time::SystemTime::now() @@ -272,35 +299,111 @@ pub async fn maybe_execute_token_conditional_swap( .as_secs() .try_into()?; let liqee = account_fetcher.fetch_mango_account(pubkey)?; + let tcs = liqee.token_conditional_swap_by_id(tcs_id)?.1; - // Find an interesting triggerable conditional swap - let mut tcs_shuffled = liqee.active_token_conditional_swaps().collect::>(); - { - let mut rng = rand::thread_rng(); - tcs_shuffled.shuffle(&mut rng); + if tcs.is_expired(now_ts) { + remove_expired_token_conditional_swap(mango_client, pubkey, &liqee, tcs.id).await + } else { + maybe_execute_token_conditional_swap_inner( + mango_client, + account_fetcher, + token_swap_info, + pubkey, + &liqee, + tcs.id, + config, + now_ts, + ) + .await } - - for tcs in tcs_shuffled.iter() { - if tcs_is_interesting(mango_client, tcs, token_swap_info, now_ts).await? { - return maybe_execute_token_conditional_swap_inner( - mango_client, - account_fetcher, - token_swap_info, - pubkey, - &liqee, - tcs.id, - config, - now_ts, - ) - .await; - } - } - for tcs in tcs_shuffled { - if tcs.is_expired(now_ts) { - return remove_expired_token_conditional_swap(mango_client, pubkey, &liqee, tcs.id) - .await; - } - } - - Ok(false) +} + +/// Returns the maximum execution size of a tcs order in quote units +fn tcs_max_volume( + account: &MangoAccountValue, + mango_client: &MangoClient, + account_fetcher: &chain_data::AccountFetcher, + tcs: &TokenConditionalSwap, +) -> anyhow::Result { + // Compute the max viable swap (for liqor and liqee) and min it + let buy_bank_pk = mango_client + .context + .mint_info(tcs.buy_token_index) + .first_bank(); + let sell_bank_pk = mango_client + .context + .mint_info(tcs.sell_token_index) + .first_bank(); + let buy_bank: Bank = account_fetcher.fetch(&buy_bank_pk)?; + let sell_bank: Bank = account_fetcher.fetch(&sell_bank_pk)?; + let buy_token_price = account_fetcher.fetch_bank_price(&buy_bank_pk)?; + let sell_token_price = account_fetcher.fetch_bank_price(&sell_bank_pk)?; + + let buy_position = account + .token_position(tcs.buy_token_index) + .map(|p| p.native(&buy_bank)) + .unwrap_or(I80F48::ZERO); + let sell_position = account + .token_position(tcs.sell_token_index) + .map(|p| p.native(&sell_bank)) + .unwrap_or(I80F48::ZERO); + + let base_price = buy_token_price / sell_token_price; + let premium_price = tcs.premium_price(base_price.to_num()); + let maker_price = tcs.maker_price(premium_price); + + let liqee_target_health_ratio = I80F48::from_num(TARGET_HEALTH_RATIO); + + let max_sell = util::max_swap_source( + mango_client, + account_fetcher, + &account, + tcs.sell_token_index, + tcs.buy_token_index, + I80F48::from_num(1.0 / maker_price), + liqee_target_health_ratio, + )? + .floor() + .to_num::() + .min(tcs.max_sell_for_position(sell_position, &sell_bank)); + + let max_buy = tcs.max_buy_for_position(buy_position, &buy_bank); + + let max_quote = + (I80F48::from(max_buy) * buy_token_price).min(I80F48::from(max_sell) * sell_token_price); + + Ok(max_quote.floor().clamp_to_u64()) +} + +pub fn find_interesting_tcs_for_account( + pubkey: &Pubkey, + mango_client: &MangoClient, + account_fetcher: &chain_data::AccountFetcher, + token_swap_info: &token_swap_info::TokenSwapInfoUpdater, + error_tracking: &ErrorTracking, + now: Instant, + now_ts: u64, +) -> anyhow::Result> { + if error_tracking.had_too_many_errors(pubkey, now).is_some() { + anyhow::bail!("too many errors"); + } + let liqee = account_fetcher.fetch_mango_account(pubkey)?; + + let interesting_tcs = liqee.active_token_conditional_swaps().filter_map(|tcs| { + match tcs_is_interesting( + &mango_client.context, + account_fetcher, + tcs, + token_swap_info, + now_ts, + ) { + Ok(false) | Err(_) => None, + Ok(true) => { + let volume = + tcs_max_volume(&liqee, mango_client, account_fetcher, tcs).unwrap_or(1); + Some((*pubkey, tcs.id, volume)) + } + } + }); + Ok(interesting_tcs.collect_vec()) } diff --git a/bin/liquidator/src/util.rs b/bin/liquidator/src/util.rs index c1d611298..ba846a933 100644 --- a/bin/liquidator/src/util.rs +++ b/bin/liquidator/src/util.rs @@ -105,7 +105,7 @@ pub async fn jupiter_route( } /// Convenience wrapper for getting max swap amounts for a token pair -pub async fn max_swap_source( +pub fn max_swap_source( client: &MangoClient, account_fetcher: &chain_data::AccountFetcher, account: &MangoAccountValue, @@ -122,12 +122,13 @@ pub async fn max_swap_source( account.ensure_token_position(target)?; let health_cache = - mango_v4_client::health_cache::new(&client.context, account_fetcher, &account) - .await + mango_v4_client::health_cache::new_sync(&client.context, account_fetcher, &account) .expect("always ok"); - let source_bank = client.first_bank(source).await?; - let target_bank = client.first_bank(target).await?; + let source_bank: Bank = + account_fetcher.fetch(&client.context.mint_info(source).first_bank())?; + let target_bank: Bank = + account_fetcher.fetch(&client.context.mint_info(target).first_bank())?; let source_price = health_cache.token_info(source).unwrap().prices.oracle; diff --git a/lib/client/src/chain_data_fetcher.rs b/lib/client/src/chain_data_fetcher.rs index a0cc210c1..9e38bb096 100644 --- a/lib/client/src/chain_data_fetcher.rs +++ b/lib/client/src/chain_data_fetcher.rs @@ -6,8 +6,9 @@ use crate::chain_data::*; use anchor_lang::Discriminator; -use mango_v4::accounts_zerocopy::LoadZeroCopy; -use mango_v4::state::{MangoAccount, MangoAccountValue}; +use fixed::types::I80F48; +use mango_v4::accounts_zerocopy::{KeyedAccountSharedData, LoadZeroCopy}; +use mango_v4::state::{Bank, MangoAccount, MangoAccountValue}; use anyhow::Context; @@ -17,6 +18,13 @@ use solana_sdk::clock::Slot; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; +/// A complex account fetcher that mostly depends on an external job keeping +/// the chain_data up to date. +/// +/// In addition to the usual async fetching interface, it also has synchronous +/// functions to access some kinds of data with less overhead. +/// +/// Also, there's functions for fetching up to date data via rpc. pub struct AccountFetcher { pub chain_data: Arc>, pub rpc: RpcClientAsync, @@ -54,6 +62,13 @@ impl AccountFetcher { .with_context(|| format!("loading mango account {}", address)) } + pub fn fetch_bank_price(&self, bank: &Pubkey) -> anyhow::Result { + let bank: Bank = self.fetch(bank)?; + let oracle = self.fetch_raw(&bank.oracle)?; + let price = bank.oracle_price(&KeyedAccountSharedData::new(bank.oracle, oracle), None)?; + Ok(price) + } + // fetches via RPC, stores in ChainData, returns new version pub async fn fetch_fresh( &self, diff --git a/lib/client/src/health_cache.rs b/lib/client/src/health_cache.rs index 0e39ce9c7..3b8ebacd6 100644 --- a/lib/client/src/health_cache.rs +++ b/lib/client/src/health_cache.rs @@ -35,3 +35,34 @@ pub async fn new( }; mango_v4::health::new_health_cache(&account.borrow(), &retriever).context("make health cache") } + +pub fn new_sync( + context: &MangoGroupContext, + account_fetcher: &crate::chain_data::AccountFetcher, + account: &MangoAccountValue, +) -> anyhow::Result { + let active_token_len = account.active_token_positions().count(); + let active_perp_len = account.active_perp_positions().count(); + + let metas = + context.derive_health_check_remaining_account_metas(account, vec![], vec![], vec![])?; + let accounts = metas + .iter() + .map(|meta| { + Ok(KeyedAccountSharedData::new( + meta.pubkey, + account_fetcher.fetch_raw(&meta.pubkey)?, + )) + }) + .collect::>>()?; + + let retriever = FixedOrderAccountRetriever { + ais: accounts, + n_banks: active_token_len, + n_perps: active_perp_len, + begin_perp: active_token_len * 2, + begin_serum3: active_token_len * 2 + active_perp_len * 2, + staleness_slot: None, + }; + mango_v4::health::new_health_cache(&account.borrow(), &retriever).context("make health cache") +} diff --git a/programs/mango-v4/src/instructions/token_conditional_swap_trigger.rs b/programs/mango-v4/src/instructions/token_conditional_swap_trigger.rs index 2615bf691..a09576257 100644 --- a/programs/mango-v4/src/instructions/token_conditional_swap_trigger.rs +++ b/programs/mango-v4/src/instructions/token_conditional_swap_trigger.rs @@ -123,15 +123,7 @@ fn trade_amount( sell_bank: &Bank, ) -> (u64, u64) { let max_buy = max_buy_token_to_liqee - .min(tcs.remaining_buy()) - .min( - if tcs.allow_creating_deposits() && !buy_bank.are_deposits_reduce_only() { - u64::MAX - } else { - // ceil() because we're ok reaching 0..1 deposited native tokens - (-liqee_buy_balance).ceil().clamp_to_u64() - }, - ) + .min(tcs.max_buy_for_position(liqee_buy_balance, buy_bank)) .min(if buy_bank.are_borrows_reduce_only() { // floor() so we never go below 0 liqor_buy_balance.floor().clamp_to_u64() @@ -139,15 +131,7 @@ fn trade_amount( u64::MAX }); let max_sell = max_sell_token_to_liqor - .min(tcs.remaining_sell()) - .min( - if tcs.allow_creating_borrows() && !sell_bank.are_borrows_reduce_only() { - u64::MAX - } else { - // floor() so we never go below 0 - liqee_sell_balance.floor().clamp_to_u64() - }, - ) + .min(tcs.max_sell_for_position(liqee_sell_balance, sell_bank)) .min(if sell_bank.are_deposits_reduce_only() { // ceil() because we're ok reaching 0..1 deposited native tokens (-liqor_sell_balance).ceil().clamp_to_u64() diff --git a/programs/mango-v4/src/state/token_conditional_swap.rs b/programs/mango-v4/src/state/token_conditional_swap.rs index 831e92ac4..a2822a7c3 100644 --- a/programs/mango-v4/src/state/token_conditional_swap.rs +++ b/programs/mango-v4/src/state/token_conditional_swap.rs @@ -6,6 +6,7 @@ use num_enum::{IntoPrimitive, TryFromPrimitive}; use static_assertions::const_assert_eq; use std::mem::size_of; +use crate::i80f48::ClampToInt; use crate::state::*; #[derive( @@ -199,4 +200,34 @@ impl TokenConditionalSwap { pub fn price_in_range(&self, price: f64) -> bool { price >= self.price_lower_limit && price <= self.price_upper_limit } + + /// The remaining buy amount, taking the current buy token position and + /// buy bank's reduce-only status into account. + /// + /// Note that the account health might further restrict execution. + pub fn max_buy_for_position(&self, buy_position: I80F48, buy_bank: &Bank) -> u64 { + self.remaining_buy().min( + if self.allow_creating_deposits() && !buy_bank.are_deposits_reduce_only() { + u64::MAX + } else { + // ceil() because we're ok reaching 0..1 deposited native tokens + (-buy_position).ceil().clamp_to_u64() + }, + ) + } + + /// The remaining sell amount, taking the current sell token position and + /// sell bank's reduce-only status into account. + /// + /// Note that the account health might further restrict execution. + pub fn max_sell_for_position(&self, sell_position: I80F48, sell_bank: &Bank) -> u64 { + self.remaining_sell().min( + if self.allow_creating_borrows() && !sell_bank.are_borrows_reduce_only() { + u64::MAX + } else { + // floor() so we never go below 0 + sell_position.floor().clamp_to_u64() + }, + ) + } }