diff --git a/client/src/chain_data_fetcher.rs b/client/src/chain_data_fetcher.rs index 68ccec7ec..1def80c58 100644 --- a/client/src/chain_data_fetcher.rs +++ b/client/src/chain_data_fetcher.rs @@ -1,4 +1,6 @@ use std::sync::{Arc, RwLock}; +use std::thread; +use std::time::{Duration, Instant}; use crate::chain_data::*; @@ -11,7 +13,9 @@ use anyhow::Context; use solana_client::rpc_client::RpcClient; use solana_sdk::account::{AccountSharedData, ReadableAccount}; +use solana_sdk::clock::Slot; use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Signature; pub struct AccountFetcher { pub chain_data: Arc>, @@ -66,11 +70,12 @@ impl AccountFetcher { .clone()) } - pub fn refresh_account_via_rpc(&self, address: &Pubkey) -> anyhow::Result<()> { + pub fn refresh_account_via_rpc(&self, address: &Pubkey) -> anyhow::Result { let response = self .rpc .get_account_with_commitment(&address, self.rpc.commitment()) .with_context(|| format!("refresh account {} via rpc", address))?; + let slot = response.context.slot; let account = response .value .ok_or(anchor_client::ClientError::AccountNotFound) @@ -85,6 +90,43 @@ impl AccountFetcher { }, ); + Ok(slot) + } + + /// Return the maximum slot reported for the processing of the signatures + pub fn transaction_max_slot(&self, signatures: &[Signature]) -> anyhow::Result { + let statuses = self.rpc.get_signature_statuses(signatures)?.value; + Ok(statuses + .iter() + .map(|status_opt| status_opt.as_ref().map(|status| status.slot).unwrap_or(0)) + .max() + .unwrap_or(0)) + } + + /// Return success once all addresses have data >= min_slot + pub fn refresh_accounts_via_rpc_until_slot( + &self, + addresses: &[Pubkey], + min_slot: Slot, + timeout: Duration, + ) -> anyhow::Result<()> { + let start = Instant::now(); + for address in addresses { + loop { + if start.elapsed() > timeout { + anyhow::bail!( + "timeout while waiting for data for {} that's newer than slot {}", + address, + min_slot + ); + } + let data_slot = self.refresh_account_via_rpc(address)?; + if data_slot >= min_slot { + break; + } + thread::sleep(Duration::from_millis(500)); + } + } Ok(()) } } diff --git a/client/src/util.rs b/client/src/util.rs index 86a3d69c7..728d5eb0e 100644 --- a/client/src/util.rs +++ b/client/src/util.rs @@ -1,4 +1,15 @@ -use solana_sdk::signature::Keypair; +use solana_client::{ + client_error::Result as ClientResult, rpc_client::RpcClient, rpc_request::RpcError, +}; +use solana_sdk::transaction::Transaction; +use solana_sdk::{ + clock::Slot, + commitment_config::CommitmentConfig, + signature::{Keypair, Signature}, + transaction::uses_durable_nonce, +}; + +use std::{thread, time}; // #[allow(dead_code)] // pub fn retry(request: impl Fn() -> Result) -> anyhow::Result { @@ -24,3 +35,64 @@ impl MyClone for Keypair { Self::from_bytes(&self.to_bytes()).unwrap() } } + +/// A copy of RpcClient::send_and_confirm_transaction that returns the slot the +/// transaction confirmed in. +pub fn send_and_confirm_transaction( + rpc_client: &RpcClient, + transaction: &Transaction, +) -> ClientResult<(Signature, Slot)> { + const SEND_RETRIES: usize = 1; + const GET_STATUS_RETRIES: usize = usize::MAX; + + 'sending: for _ in 0..SEND_RETRIES { + let signature = rpc_client.send_transaction(transaction)?; + + let recent_blockhash = if uses_durable_nonce(transaction).is_some() { + let (recent_blockhash, ..) = + rpc_client.get_latest_blockhash_with_commitment(CommitmentConfig::processed())?; + recent_blockhash + } else { + transaction.message.recent_blockhash + }; + + for status_retry in 0..GET_STATUS_RETRIES { + let response = rpc_client.get_signature_statuses(&[signature])?.value; + match response[0] + .clone() + .filter(|result| result.satisfies_commitment(rpc_client.commitment())) + { + Some(tx_status) => { + return if let Some(e) = tx_status.err { + Err(e.into()) + } else { + Ok((signature, tx_status.slot)) + }; + } + None => { + if !rpc_client + .is_blockhash_valid(&recent_blockhash, CommitmentConfig::processed())? + { + // Block hash is not found by some reason + break 'sending; + } else if cfg!(not(test)) + // Ignore sleep at last step. + && status_retry < GET_STATUS_RETRIES + { + // Retry twice a second + thread::sleep(time::Duration::from_millis(500)); + continue; + } + } + } + } + } + + Err(RpcError::ForUser( + "unable to confirm transaction. \ + This can happen in situations such as transaction expiration \ + and insufficient fee-payer funds" + .to_string(), + ) + .into()) +} diff --git a/liquidator/src/liquidate.rs b/liquidator/src/liquidate.rs index db9769caf..1b9aca720 100644 --- a/liquidator/src/liquidate.rs +++ b/liquidator/src/liquidate.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::account_shared_data::KeyedAccountSharedData; use client::{chain_data, AccountFetcher, MangoClient, MangoClientError, MangoGroupContext}; @@ -10,6 +12,7 @@ use {anyhow::Context, fixed::types::I80F48, solana_sdk::pubkey::Pubkey}; pub struct Config { pub min_health_ratio: f64, + pub refresh_timeout: Duration, } pub fn new_health_cache_( @@ -169,7 +172,7 @@ pub fn maybe_liquidate_account( }; // try liquidating - if account.is_bankrupt() { + let txsig = if account.is_bankrupt() { if tokens.is_empty() { anyhow::bail!("mango account {}, is bankrupt has no active tokens", pubkey); } @@ -199,7 +202,7 @@ pub fn maybe_liquidate_account( maint_health, sig ); - return Ok(true); + sig } else if maint_health.is_negative() { let asset_token_index = tokens .iter() @@ -253,9 +256,21 @@ pub fn maybe_liquidate_account( maint_health, sig ); - return Ok(true); + sig + } else { + return Ok(false); + }; + + let slot = account_fetcher.transaction_max_slot(&[txsig])?; + if let Err(e) = account_fetcher.refresh_accounts_via_rpc_until_slot( + &[*pubkey, mango_client.mango_account_address], + slot, + config.refresh_timeout, + ) { + log::info!("could not refresh after liquidation: {}", e); } - Ok(false) + + Ok(true) } #[allow(clippy::too_many_arguments)] diff --git a/liquidator/src/main.rs b/liquidator/src/main.rs index aad6814ed..7c1e5741a 100644 --- a/liquidator/src/main.rs +++ b/liquidator/src/main.rs @@ -8,7 +8,6 @@ use client::{chain_data, keypair_from_cli, Client, MangoClient, MangoGroupContex use log::*; use mango_v4::state::{PerpMarketIndex, TokenIndex}; -use anyhow::Context; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::pubkey::Pubkey; use std::collections::HashSet; @@ -213,11 +212,15 @@ async fn main() -> anyhow::Result<()> { let liq_config = liquidate::Config { min_health_ratio: cli.min_health_ratio, + // TODO: config + refresh_timeout: Duration::from_secs(30), }; let mut rebalance_interval = tokio::time::interval(Duration::from_secs(5)); let rebalance_config = rebalance::Config { slippage: cli.rebalance_slippage, + // TODO: config + refresh_timeout: Duration::from_secs(30), }; info!("main loop"); @@ -312,8 +315,11 @@ async fn main() -> anyhow::Result<()> { }, _ = rebalance_interval.tick() => { - rebalance::zero_all_non_quote(&mango_client, &account_fetcher, &cli.liqor_mango_account, &rebalance_config) - .context("rebalancing liqor account")?; + if one_snapshot_done { + if let Err(err) = rebalance::zero_all_non_quote(&mango_client, &account_fetcher, &cli.liqor_mango_account, &rebalance_config) { + log::error!("failed to rebalance liqor: {:?}", err); + } + } } } } @@ -331,10 +337,11 @@ fn liquidate<'a>( } let liqor = &mango_client.mango_account_address; - account_fetcher.refresh_account_via_rpc(liqor)?; - - rebalance::zero_all_non_quote(mango_client, account_fetcher, liqor, &rebalance_config) - .context("rebalancing liqor account after liquidation")?; + if let Err(err) = + rebalance::zero_all_non_quote(mango_client, account_fetcher, liqor, &rebalance_config) + { + log::error!("failed to rebalance liqor: {:?}", err); + } Ok(()) } diff --git a/liquidator/src/rebalance.rs b/liquidator/src/rebalance.rs index 7a2cdf374..9801b5878 100644 --- a/liquidator/src/rebalance.rs +++ b/liquidator/src/rebalance.rs @@ -5,12 +5,14 @@ use mango_v4::state::{oracle_price, Bank, TokenIndex, TokenPosition, QUOTE_TOKEN use {fixed::types::I80F48, solana_sdk::pubkey::Pubkey}; -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; pub struct Config { pub slippage: f64, + pub refresh_timeout: Duration, } +#[derive(Debug)] struct TokenState { _price: I80F48, native_position: I80F48, @@ -68,7 +70,9 @@ pub fn zero_all_non_quote( )) }) .collect::>>()?; + log::trace!("account tokens: {:?}", tokens); + let mut txsigs = vec![]; for (token_index, token_state) in tokens { let token = mango_client.context.token(token_index); if token_index == quote_token.token_index { @@ -76,23 +80,52 @@ pub fn zero_all_non_quote( } if token_state.native_position > 0 { - mango_client.jupiter_swap( + let amount = token_state.native_position; + let txsig = mango_client.jupiter_swap( token.mint_info.mint, quote_token.mint_info.mint, - token_state.native_position.to_num::(), + amount.to_num::(), config.slippage, client::JupiterSwapMode::ExactIn, )?; + log::info!( + "sold {} {} for {} in tx {}", + token.native_to_ui(amount), + token.name, + quote_token.name, + txsig + ); + txsigs.push(txsig); } else if token_state.native_position < 0 { - mango_client.jupiter_swap( + let amount = -token_state.native_position; + let txsig = mango_client.jupiter_swap( quote_token.mint_info.mint, token.mint_info.mint, - (-token_state.native_position).to_num::(), + amount.to_num::(), config.slippage, client::JupiterSwapMode::ExactOut, )?; + log::info!( + "bought {} {} for {} in tx {}", + token.native_to_ui(amount), + token.name, + quote_token.name, + txsig + ); + txsigs.push(txsig); } } + let max_slot = account_fetcher.transaction_max_slot(&txsigs)?; + if let Err(e) = account_fetcher.refresh_accounts_via_rpc_until_slot( + &[*mango_account_address], + max_slot, + config.refresh_timeout, + ) { + // If we don't get fresh data, maybe the tx landed on a fork? + // Rebalance is technically still ok. + log::info!("could not refresh account data: {}", e); + } + Ok(()) }