liquidator: improve jupiter quote cache (#748)

This commit is contained in:
Christian Kamm 2023-10-09 19:56:45 +02:00 committed by GitHub
parent 81f4929648
commit a4745dae27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 168 additions and 88 deletions

View File

@ -81,16 +81,36 @@ pub enum JupiterQuoteCacheResult<T> {
BadPrice(f64), BadPrice(f64),
} }
#[derive(Clone)]
pub struct JupiterQuoteCacheEntry {
// The lowest seen price (starting at f64::MAX) in input-per-output tokens
//
// A separate mutex is necessary since we want to wait for the first jupiter quote for
// each pair to return before initiating any further ones. Later on there can be multiple
// jupiter quotes for a pair at the same time if the initial price check passes.
price: Arc<tokio::sync::Mutex<f64>>,
}
// While preparing tcs executions, there may be a lot of jupiter queries for the same // While preparing tcs executions, there may be a lot of jupiter queries for the same
// pairs. This caches results to avoid hitting jupiter with too many requests by allowing // pairs. This caches results to avoid hitting jupiter with too many requests by allowing
// a cheap-early out. // a cheap-early out.
#[derive(Default)] #[derive(Default)]
pub struct JupiterQuoteCache { pub struct JupiterQuoteCache {
// cache lowest price for a in-out mint pair, in input-per-output tokens // cache lowest price for each in-out mint pair
pub quote_cache: RwLock<HashMap<(Pubkey, Pubkey), f64>>, pub quote_cache: RwLock<HashMap<(Pubkey, Pubkey), JupiterQuoteCacheEntry>>,
} }
impl JupiterQuoteCache { impl JupiterQuoteCache {
fn cache_entry(&self, input_mint: Pubkey, output_mint: Pubkey) -> JupiterQuoteCacheEntry {
let mut quote_cache = self.quote_cache.write().unwrap();
quote_cache
.entry((input_mint, output_mint))
.or_insert_with(|| JupiterQuoteCacheEntry {
price: Arc::new(tokio::sync::Mutex::new(f64::MAX)),
})
.clone()
}
/// Quotes. Returns BadPrice if the cache or quote returns a price above max_in_per_out_price. /// Quotes. Returns BadPrice if the cache or quote returns a price above max_in_per_out_price.
pub async fn quote( pub async fn quote(
&self, &self,
@ -102,18 +122,29 @@ impl JupiterQuoteCache {
version: jupiter::Version, version: jupiter::Version,
max_in_per_out_price: f64, max_in_per_out_price: f64,
) -> anyhow::Result<JupiterQuoteCacheResult<(f64, jupiter::Quote)>> { ) -> anyhow::Result<JupiterQuoteCacheResult<(f64, jupiter::Quote)>> {
let cache_key = (input_mint, output_mint); let cache_entry = self.cache_entry(input_mint, output_mint);
{
let quote_cache = self.quote_cache.read().unwrap(); let held_lock = {
if let Some(&cached_price) = quote_cache.get(&cache_key) { let cached_price_lock = cache_entry.price.lock().await;
if cached_price > max_in_per_out_price {
return Ok(JupiterQuoteCacheResult::BadPrice(cached_price)); if *cached_price_lock == f64::MAX {
// If we're the first quote for this pair, run the quote while holding the lock:
// we don't want multiple parallel requests to go out that will all potentially
// tell us about the same poor price.
Some(cached_price_lock)
} else {
// If a cached price exists, check it against the max
if *cached_price_lock > max_in_per_out_price {
return Ok(JupiterQuoteCacheResult::BadPrice(*cached_price_lock));
} }
// Don't hold the lock, parallel requests are ok!
None
} }
} };
let (price, quote) = self let (price, quote) = self
.unchecked_quote( .quote_inner(
client, client,
input_mint, input_mint,
output_mint, output_mint,
@ -122,14 +153,26 @@ impl JupiterQuoteCache {
version, version,
) )
.await?; .await?;
if price > max_in_per_out_price {
return Ok(JupiterQuoteCacheResult::BadPrice(price)); {
let mut cached_price_lock = if let Some(lock) = held_lock {
lock
} else {
cache_entry.price.lock().await
};
if price < *cached_price_lock {
*cached_price_lock = price;
}
} }
Ok(JupiterQuoteCacheResult::Quote((price, quote))) Ok(if price > max_in_per_out_price {
JupiterQuoteCacheResult::BadPrice(price)
} else {
JupiterQuoteCacheResult::Quote((price, quote))
})
} }
async fn unchecked_quote( async fn quote_inner(
&self, &self,
client: &MangoClient, client: &MangoClient,
input_mint: Pubkey, input_mint: Pubkey,
@ -150,24 +193,47 @@ impl JupiterQuoteCache {
) )
.await?; .await?;
let quote_price = quote.in_amount as f64 / quote.out_amount as f64; let quote_price = quote.in_amount as f64 / quote.out_amount as f64;
Ok((quote_price, quote))
let cache_key = (input_mint, output_mint);
let mut quote_cache = self.quote_cache.write().unwrap();
let cached_price = quote_cache.get(&cache_key).copied().unwrap_or(f64::MAX);
if quote_price < cached_price {
quote_cache.insert(cache_key, quote_price);
}
return Ok((quote_price, quote));
} }
fn cached_price(&self, input_mint: Pubkey, output_mint: Pubkey) -> Option<f64> { async fn unchecked_quote(
&self,
client: &MangoClient,
input_mint: Pubkey,
output_mint: Pubkey,
input_amount: u64,
slippage_bps: u64,
version: jupiter::Version,
) -> anyhow::Result<(f64, jupiter::Quote)> {
match self
.quote(
client,
input_mint,
output_mint,
input_amount,
slippage_bps,
version,
f64::MAX,
)
.await?
{
JupiterQuoteCacheResult::Quote(v) => Ok(v),
_ => anyhow::bail!("unreachable case in unchecked_quote"),
}
}
async fn cached_price(&self, input_mint: Pubkey, output_mint: Pubkey) -> Option<f64> {
if input_mint == output_mint { if input_mint == output_mint {
return Some(1.0); return Some(1.0);
} }
let quote_cache = self.quote_cache.read().unwrap(); let cache_entry = self.cache_entry(input_mint, output_mint);
quote_cache.get(&(input_mint, output_mint)).copied() let cached_price = *cache_entry.price.lock().await;
if cached_price != f64::MAX {
Some(cached_price)
} else {
None
}
} }
/// Quotes collateral -> buy and sell -> collateral swaps /// Quotes collateral -> buy and sell -> collateral swaps
@ -191,8 +257,8 @@ impl JupiterQuoteCache {
> { > {
// First check if we have cached prices for both legs and // First check if we have cached prices for both legs and
// if those break the specified limit // if those break the specified limit
let cached_collateral_to_buy = self.cached_price(collateral_mint, buy_mint); let cached_collateral_to_buy = self.cached_price(collateral_mint, buy_mint).await;
let cached_sell_to_collateral = self.cached_price(sell_mint, collateral_mint); let cached_sell_to_collateral = self.cached_price(sell_mint, collateral_mint).await;
match (cached_collateral_to_buy, cached_sell_to_collateral) { match (cached_collateral_to_buy, cached_sell_to_collateral) {
(Some(c_to_b), Some(s_to_c)) => { (Some(c_to_b), Some(s_to_c)) => {
let s_to_b = s_to_c * c_to_b; let s_to_b = s_to_c * c_to_b;
@ -467,16 +533,6 @@ impl Context {
let max_buy = let max_buy =
max_buy_ignoring_net_borrows - buy_borrows + buy_borrows.min(available_buy_borrows); max_buy_ignoring_net_borrows - buy_borrows + buy_borrows.min(available_buy_borrows);
info!(
buy_borrows,
available_buy_borrows,
max_buy,
sell_borrows,
available_sell_borrows,
max_sell,
"borrows?"
);
let tiny_due_to_net_borrows = { let tiny_due_to_net_borrows = {
let buy_threshold = I80F48::from(NET_BORROW_EXECUTION_THRESHOLD) / buy_token_price; let buy_threshold = I80F48::from(NET_BORROW_EXECUTION_THRESHOLD) / buy_token_price;
let sell_threshold = I80F48::from(NET_BORROW_EXECUTION_THRESHOLD) / sell_token_price; let sell_threshold = I80F48::from(NET_BORROW_EXECUTION_THRESHOLD) / sell_token_price;
@ -496,24 +552,31 @@ impl Context {
) -> anyhow::Result<Vec<anyhow::Result<(Pubkey, u64, u64)>>> { ) -> anyhow::Result<Vec<anyhow::Result<(Pubkey, u64, u64)>>> {
let liqee = self.account_fetcher.fetch_mango_account(pubkey)?; let liqee = self.account_fetcher.fetch_mango_account(pubkey)?;
let interesting_tcs = liqee.active_token_conditional_swaps().filter_map(|tcs| { let interesting_tcs = liqee
match self.tcs_is_interesting(tcs) { .active_token_conditional_swaps()
Ok(true) => { .filter_map(|tcs| {
// Filter out Ok(None) resuts of tcs that shouldn't be executed right now match self.tcs_is_interesting(tcs) {
match self.tcs_max_volume(&liqee, tcs) { Ok(true) => {
Ok(Some(v)) => Some(Ok((*pubkey, tcs.id, v))), // Filter out Ok(None) resuts of tcs that shouldn't be executed right now
Ok(None) => None, match self.tcs_max_volume(&liqee, tcs) {
Err(e) => Some(Err(e)), Ok(Some(v)) => Some(Ok((*pubkey, tcs.id, v))),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
} }
Ok(false) => None,
Err(e) => Some(Err(e)),
} }
Ok(false) => None, })
Err(e) => Some(Err(e)), .collect_vec();
} if !interesting_tcs.is_empty() {
}); trace!(%pubkey, interesting_tcs_count=interesting_tcs.len(), "found interesting tcs");
Ok(interesting_tcs.collect_vec()) }
Ok(interesting_tcs)
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(%pubkey, %tcs_id))]
async fn prepare_token_conditional_swap( async fn prepare_token_conditional_swap(
&self, &self,
pubkey: &Pubkey, pubkey: &Pubkey,
@ -527,6 +590,7 @@ impl Context {
let tcs = liqee.token_conditional_swap_by_id(tcs_id)?.1; let tcs = liqee.token_conditional_swap_by_id(tcs_id)?.1;
if tcs.is_expired(now_ts) { if tcs.is_expired(now_ts) {
trace!("tcs is expired");
// Triggering like this will close the expired tcs and not affect the liqor // Triggering like this will close the expired tcs and not affect the liqor
Ok(Some(PreparedExecution { Ok(Some(PreparedExecution {
pubkey: *pubkey, pubkey: *pubkey,
@ -557,6 +621,7 @@ impl Context {
.await .await
.context("creating health cache 1")?; .context("creating health cache 1")?;
if health_cache.is_liquidatable() { if health_cache.is_liquidatable() {
trace!("account is liquidatable (pre-fetch)");
return Ok(None); return Ok(None);
} }
@ -567,6 +632,7 @@ impl Context {
.await?; .await?;
let (_, tcs) = liqee.token_conditional_swap_by_id(tcs_id)?; let (_, tcs) = liqee.token_conditional_swap_by_id(tcs_id)?;
if tcs.is_expired(self.now_ts) || !self.tcs_is_interesting(tcs)? { if tcs.is_expired(self.now_ts) || !self.tcs_is_interesting(tcs)? {
trace!("tcs is expired or uninteresting");
return Ok(None); return Ok(None);
} }
@ -574,6 +640,7 @@ impl Context {
.await .await
.context("creating health cache 2")?; .context("creating health cache 2")?;
if health_cache.is_liquidatable() { if health_cache.is_liquidatable() {
trace!("account is liquidatable (post-fetch)");
return Ok(None); return Ok(None);
} }
@ -582,7 +649,6 @@ impl Context {
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(%pubkey, tcs_id = tcs.id))]
async fn prepare_token_conditional_swap_inner2( async fn prepare_token_conditional_swap_inner2(
&self, &self,
pubkey: &Pubkey, pubkey: &Pubkey,
@ -605,7 +671,10 @@ impl Context {
let (liqee_max_buy, liqee_max_sell) = match self.tcs_max_liqee_execution(liqee, tcs)? { let (liqee_max_buy, liqee_max_sell) = match self.tcs_max_liqee_execution(liqee, tcs)? {
Some(v) => v, Some(v) => v,
None => return Ok(None), None => {
trace!("no liqee execution possible");
return Ok(None);
}
}; };
let max_sell_token_to_liqor = liqee_max_sell; let max_sell_token_to_liqor = liqee_max_sell;
@ -703,6 +772,13 @@ impl Context {
.min(liqee_max_buy); .min(liqee_max_buy);
if max_sell_token_to_liqor == 0 || max_buy_token_to_liqee == 0 { if max_sell_token_to_liqor == 0 || max_buy_token_to_liqee == 0 {
trace!(
liqee_max_buy,
liqee_max_sell,
max_buy = max_buy_token_to_liqee,
max_sell = max_sell_token_to_liqor,
"no execution possible"
);
return Ok(None); return Ok(None);
} }
@ -886,6 +962,7 @@ impl Context {
// maybe the tcs isn't executable after the account was updated // maybe the tcs isn't executable after the account was updated
} }
Err(e) => { Err(e) => {
trace!(%result.pubkey, "preparation error {:?}", e);
error_tracking.record_error(&result.pubkey, now, e.to_string()); error_tracking.record_error(&result.pubkey, now, e.to_string());
} }
} }
@ -963,6 +1040,7 @@ impl Context {
.filter_map(|(pubkey, result)| match result { .filter_map(|(pubkey, result)| match result {
Ok(v) => Some((pubkey, v)), Ok(v) => Some((pubkey, v)),
Err(err) => { Err(err) => {
trace!(%pubkey, "execution error {:?}", err);
error_tracking.record_error(&pubkey, Instant::now(), err.to_string()); error_tracking.record_error(&pubkey, Instant::now(), err.to_string());
None None
} }

View File

@ -407,55 +407,57 @@ async function placeAuction(): Promise<void> {
await client.tokenConditionalSwapCancelAll(group, mangoAccount!); await client.tokenConditionalSwapCancelAll(group, mangoAccount!);
// await client.tokenConditionalSwapCreateLinearAuction( for (let i = 0; i < 3; i += 1) {
// group, await client.tokenConditionalSwapCreateLinearAuction(
// mangoAccount!, group,
// usdcBank, mangoAccount!,
// solBank, usdcBank,
// 18.0, solBank,
// 22.0, 20.0,
// 2.0, 24.0,
// Number.MAX_SAFE_INTEGER, 0.1,
// true, Number.MAX_SAFE_INTEGER,
// false, true,
// true, false,
// Math.floor(Date.now() / 1000) + 30, true,
// 180, Math.floor(Date.now() / 1000) + 10,
// null, 180000,
// ); null,
);
}
// await client.tokenConditionalSwapCreateLinearAuction( // await client.tokenConditionalSwapCreateLinearAuction(
// group, // group,
// mangoAccount!, // mangoAccount!,
// solBank, // solBank,
// usdcBank, // usdcBank,
// 1 / 20.0, // 1 / 24.0,
// 1 / 19.0, // 1 / 19.0,
// 2.0, // 2.0,
// Number.MAX_SAFE_INTEGER, // Number.MAX_SAFE_INTEGER,
// true, // true,
// false, // false,
// true, // true,
// Math.floor(Date.now() / 1000) + 30, // Math.floor(Date.now() / 1000) + 300,
// 180, // 180,
// null, // null,
// ); // );
await client.tokenConditionalSwapCreatePremiumAuction( // await client.tokenConditionalSwapCreatePremiumAuction(
group, // group,
mangoAccount!, // mangoAccount!,
usdcBank, // usdcBank,
solBank, // solBank,
22.0, // 20.0,
26.0, // 26.0,
2.0, // 2.0,
Number.MAX_SAFE_INTEGER, // Number.MAX_SAFE_INTEGER,
null, // null,
1, // in percent, eww // 1, // in percent, eww
true, // true,
false, // false,
null, // null,
true, // true,
300, // 300,
); // );
} }
async function main(): Promise<void> { async function main(): Promise<void> {