Liquidator: blocking-wait for data after actions

To avoid sending a second jupiter-swap while the first one is still
in-flight.
This commit is contained in:
Christian Kamm 2022-08-07 20:04:19 +02:00
parent cce1881223
commit 2552bffc66
5 changed files with 187 additions and 18 deletions

View File

@ -1,4 +1,6 @@
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};
use crate::chain_data::*;
@ -11,7 +13,9 @@ use anyhow::Context;
use solana_client::rpc_client::RpcClient;
use solana_sdk::account::{AccountSharedData, ReadableAccount};
use solana_sdk::clock::Slot;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
pub struct AccountFetcher {
pub chain_data: Arc<RwLock<ChainData>>,
@ -66,11 +70,12 @@ impl AccountFetcher {
.clone())
}
pub fn refresh_account_via_rpc(&self, address: &Pubkey) -> anyhow::Result<()> {
pub fn refresh_account_via_rpc(&self, address: &Pubkey) -> anyhow::Result<Slot> {
let response = self
.rpc
.get_account_with_commitment(&address, self.rpc.commitment())
.with_context(|| format!("refresh account {} via rpc", address))?;
let slot = response.context.slot;
let account = response
.value
.ok_or(anchor_client::ClientError::AccountNotFound)
@ -85,6 +90,43 @@ impl AccountFetcher {
},
);
Ok(slot)
}
/// Return the maximum slot reported for the processing of the signatures
pub fn transaction_max_slot(&self, signatures: &[Signature]) -> anyhow::Result<Slot> {
let statuses = self.rpc.get_signature_statuses(signatures)?.value;
Ok(statuses
.iter()
.map(|status_opt| status_opt.as_ref().map(|status| status.slot).unwrap_or(0))
.max()
.unwrap_or(0))
}
/// Return success once all addresses have data >= min_slot
pub fn refresh_accounts_via_rpc_until_slot(
&self,
addresses: &[Pubkey],
min_slot: Slot,
timeout: Duration,
) -> anyhow::Result<()> {
let start = Instant::now();
for address in addresses {
loop {
if start.elapsed() > timeout {
anyhow::bail!(
"timeout while waiting for data for {} that's newer than slot {}",
address,
min_slot
);
}
let data_slot = self.refresh_account_via_rpc(address)?;
if data_slot >= min_slot {
break;
}
thread::sleep(Duration::from_millis(500));
}
}
Ok(())
}
}

View File

@ -1,4 +1,15 @@
use solana_sdk::signature::Keypair;
use solana_client::{
client_error::Result as ClientResult, rpc_client::RpcClient, rpc_request::RpcError,
};
use solana_sdk::transaction::Transaction;
use solana_sdk::{
clock::Slot,
commitment_config::CommitmentConfig,
signature::{Keypair, Signature},
transaction::uses_durable_nonce,
};
use std::{thread, time};
// #[allow(dead_code)]
// pub fn retry<T>(request: impl Fn() -> Result<T, anchor_client::ClientError>) -> anyhow::Result<T> {
@ -24,3 +35,64 @@ impl MyClone for Keypair {
Self::from_bytes(&self.to_bytes()).unwrap()
}
}
/// A copy of RpcClient::send_and_confirm_transaction that returns the slot the
/// transaction confirmed in.
pub fn send_and_confirm_transaction(
rpc_client: &RpcClient,
transaction: &Transaction,
) -> ClientResult<(Signature, Slot)> {
const SEND_RETRIES: usize = 1;
const GET_STATUS_RETRIES: usize = usize::MAX;
'sending: for _ in 0..SEND_RETRIES {
let signature = rpc_client.send_transaction(transaction)?;
let recent_blockhash = if uses_durable_nonce(transaction).is_some() {
let (recent_blockhash, ..) =
rpc_client.get_latest_blockhash_with_commitment(CommitmentConfig::processed())?;
recent_blockhash
} else {
transaction.message.recent_blockhash
};
for status_retry in 0..GET_STATUS_RETRIES {
let response = rpc_client.get_signature_statuses(&[signature])?.value;
match response[0]
.clone()
.filter(|result| result.satisfies_commitment(rpc_client.commitment()))
{
Some(tx_status) => {
return if let Some(e) = tx_status.err {
Err(e.into())
} else {
Ok((signature, tx_status.slot))
};
}
None => {
if !rpc_client
.is_blockhash_valid(&recent_blockhash, CommitmentConfig::processed())?
{
// Block hash is not found by some reason
break 'sending;
} else if cfg!(not(test))
// Ignore sleep at last step.
&& status_retry < GET_STATUS_RETRIES
{
// Retry twice a second
thread::sleep(time::Duration::from_millis(500));
continue;
}
}
}
}
}
Err(RpcError::ForUser(
"unable to confirm transaction. \
This can happen in situations such as transaction expiration \
and insufficient fee-payer funds"
.to_string(),
)
.into())
}

View File

@ -1,3 +1,5 @@
use std::time::Duration;
use crate::account_shared_data::KeyedAccountSharedData;
use client::{chain_data, AccountFetcher, MangoClient, MangoClientError, MangoGroupContext};
@ -10,6 +12,7 @@ use {anyhow::Context, fixed::types::I80F48, solana_sdk::pubkey::Pubkey};
pub struct Config {
pub min_health_ratio: f64,
pub refresh_timeout: Duration,
}
pub fn new_health_cache_(
@ -169,7 +172,7 @@ pub fn maybe_liquidate_account(
};
// try liquidating
if account.is_bankrupt() {
let txsig = if account.is_bankrupt() {
if tokens.is_empty() {
anyhow::bail!("mango account {}, is bankrupt has no active tokens", pubkey);
}
@ -199,7 +202,7 @@ pub fn maybe_liquidate_account(
maint_health,
sig
);
return Ok(true);
sig
} else if maint_health.is_negative() {
let asset_token_index = tokens
.iter()
@ -253,9 +256,21 @@ pub fn maybe_liquidate_account(
maint_health,
sig
);
return Ok(true);
sig
} else {
return Ok(false);
};
let slot = account_fetcher.transaction_max_slot(&[txsig])?;
if let Err(e) = account_fetcher.refresh_accounts_via_rpc_until_slot(
&[*pubkey, mango_client.mango_account_address],
slot,
config.refresh_timeout,
) {
log::info!("could not refresh after liquidation: {}", e);
}
Ok(false)
Ok(true)
}
#[allow(clippy::too_many_arguments)]

View File

@ -8,7 +8,6 @@ use client::{chain_data, keypair_from_cli, Client, MangoClient, MangoGroupContex
use log::*;
use mango_v4::state::{PerpMarketIndex, TokenIndex};
use anyhow::Context;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashSet;
@ -213,11 +212,15 @@ async fn main() -> anyhow::Result<()> {
let liq_config = liquidate::Config {
min_health_ratio: cli.min_health_ratio,
// TODO: config
refresh_timeout: Duration::from_secs(30),
};
let mut rebalance_interval = tokio::time::interval(Duration::from_secs(5));
let rebalance_config = rebalance::Config {
slippage: cli.rebalance_slippage,
// TODO: config
refresh_timeout: Duration::from_secs(30),
};
info!("main loop");
@ -312,8 +315,11 @@ async fn main() -> anyhow::Result<()> {
},
_ = rebalance_interval.tick() => {
rebalance::zero_all_non_quote(&mango_client, &account_fetcher, &cli.liqor_mango_account, &rebalance_config)
.context("rebalancing liqor account")?;
if one_snapshot_done {
if let Err(err) = rebalance::zero_all_non_quote(&mango_client, &account_fetcher, &cli.liqor_mango_account, &rebalance_config) {
log::error!("failed to rebalance liqor: {:?}", err);
}
}
}
}
}
@ -331,10 +337,11 @@ fn liquidate<'a>(
}
let liqor = &mango_client.mango_account_address;
account_fetcher.refresh_account_via_rpc(liqor)?;
rebalance::zero_all_non_quote(mango_client, account_fetcher, liqor, &rebalance_config)
.context("rebalancing liqor account after liquidation")?;
if let Err(err) =
rebalance::zero_all_non_quote(mango_client, account_fetcher, liqor, &rebalance_config)
{
log::error!("failed to rebalance liqor: {:?}", err);
}
Ok(())
}

View File

@ -5,12 +5,14 @@ use mango_v4::state::{oracle_price, Bank, TokenIndex, TokenPosition, QUOTE_TOKEN
use {fixed::types::I80F48, solana_sdk::pubkey::Pubkey};
use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};
pub struct Config {
pub slippage: f64,
pub refresh_timeout: Duration,
}
#[derive(Debug)]
struct TokenState {
_price: I80F48,
native_position: I80F48,
@ -68,7 +70,9 @@ pub fn zero_all_non_quote(
))
})
.collect::<anyhow::Result<HashMap<TokenIndex, TokenState>>>()?;
log::trace!("account tokens: {:?}", tokens);
let mut txsigs = vec![];
for (token_index, token_state) in tokens {
let token = mango_client.context.token(token_index);
if token_index == quote_token.token_index {
@ -76,23 +80,52 @@ pub fn zero_all_non_quote(
}
if token_state.native_position > 0 {
mango_client.jupiter_swap(
let amount = token_state.native_position;
let txsig = mango_client.jupiter_swap(
token.mint_info.mint,
quote_token.mint_info.mint,
token_state.native_position.to_num::<u64>(),
amount.to_num::<u64>(),
config.slippage,
client::JupiterSwapMode::ExactIn,
)?;
log::info!(
"sold {} {} for {} in tx {}",
token.native_to_ui(amount),
token.name,
quote_token.name,
txsig
);
txsigs.push(txsig);
} else if token_state.native_position < 0 {
mango_client.jupiter_swap(
let amount = -token_state.native_position;
let txsig = mango_client.jupiter_swap(
quote_token.mint_info.mint,
token.mint_info.mint,
(-token_state.native_position).to_num::<u64>(),
amount.to_num::<u64>(),
config.slippage,
client::JupiterSwapMode::ExactOut,
)?;
log::info!(
"bought {} {} for {} in tx {}",
token.native_to_ui(amount),
token.name,
quote_token.name,
txsig
);
txsigs.push(txsig);
}
}
let max_slot = account_fetcher.transaction_max_slot(&txsigs)?;
if let Err(e) = account_fetcher.refresh_accounts_via_rpc_until_slot(
&[*mango_account_address],
max_slot,
config.refresh_timeout,
) {
// If we don't get fresh data, maybe the tx landed on a fork?
// Rebalance is technically still ok.
log::info!("could not refresh account data: {}", e);
}
Ok(())
}