795 lines
27 KiB
Rust
795 lines
27 KiB
Rust
use std::collections::HashMap;
|
|
use std::collections::HashSet;
|
|
use std::sync::{Arc, RwLock};
|
|
use std::time::{Duration, Instant};
|
|
|
|
use anchor_client::Cluster;
|
|
use clap::Parser;
|
|
use mango_v4::state::{PerpMarketIndex, TokenIndex};
|
|
use mango_v4_client::{
|
|
account_update_stream, chain_data, error_tracking::ErrorTracking, jupiter, keypair_from_cli,
|
|
snapshot_source, websocket_source, Client, MangoClient, MangoClientError, MangoGroupContext,
|
|
TransactionBuilderConfig,
|
|
};
|
|
|
|
use itertools::Itertools;
|
|
use solana_sdk::commitment_config::CommitmentConfig;
|
|
use solana_sdk::pubkey::Pubkey;
|
|
use tracing::*;
|
|
|
|
pub mod liquidate;
|
|
pub mod metrics;
|
|
pub mod rebalance;
|
|
pub mod telemetry;
|
|
pub mod token_swap_info;
|
|
pub mod trigger_tcs;
|
|
pub mod util;
|
|
|
|
use crate::util::{is_mango_account, is_mint_info, is_perp_market};
|
|
|
|
// jemalloc seems to be better at keeping the memory footprint reasonable over
|
|
// longer periods of time
|
|
#[global_allocator]
|
|
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
|
|
|
#[derive(Parser, Debug)]
|
|
#[clap()]
|
|
struct CliDotenv {
|
|
// When --dotenv <file> is passed, read the specified dotenv file before parsing args
|
|
#[clap(long)]
|
|
dotenv: std::path::PathBuf,
|
|
|
|
remaining_args: Vec<std::ffi::OsString>,
|
|
}
|
|
|
|
// Prefer "--rebalance false" over "--no-rebalance" because it works
|
|
// better with REBALANCE=false env values.
|
|
#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq)]
|
|
enum BoolArg {
|
|
True,
|
|
False,
|
|
}
|
|
|
|
#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq)]
|
|
enum JupiterVersionArg {
|
|
Mock,
|
|
V4,
|
|
V6,
|
|
}
|
|
|
|
impl From<JupiterVersionArg> for jupiter::Version {
|
|
fn from(a: JupiterVersionArg) -> Self {
|
|
match a {
|
|
JupiterVersionArg::Mock => jupiter::Version::Mock,
|
|
JupiterVersionArg::V4 => jupiter::Version::V4,
|
|
JupiterVersionArg::V6 => jupiter::Version::V6,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq)]
|
|
enum TcsMode {
|
|
BorrowBuy,
|
|
SwapSellIntoBuy,
|
|
SwapCollateralIntoBuy,
|
|
}
|
|
|
|
impl From<TcsMode> for trigger_tcs::Mode {
|
|
fn from(a: TcsMode) -> Self {
|
|
match a {
|
|
TcsMode::BorrowBuy => trigger_tcs::Mode::BorrowBuyToken,
|
|
TcsMode::SwapSellIntoBuy => trigger_tcs::Mode::SwapSellIntoBuy,
|
|
TcsMode::SwapCollateralIntoBuy => trigger_tcs::Mode::SwapCollateralIntoBuy,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Parser)]
|
|
#[clap()]
|
|
struct Cli {
|
|
#[clap(short, long, env)]
|
|
rpc_url: String,
|
|
|
|
#[clap(long, env)]
|
|
liqor_mango_account: Pubkey,
|
|
|
|
#[clap(long, env)]
|
|
liqor_owner: String,
|
|
|
|
#[clap(long, env, default_value = "1000")]
|
|
check_interval_ms: u64,
|
|
|
|
#[clap(long, env, default_value = "300")]
|
|
snapshot_interval_secs: u64,
|
|
|
|
/// how many getMultipleAccounts requests to send in parallel
|
|
#[clap(long, env, default_value = "10")]
|
|
parallel_rpc_requests: usize,
|
|
|
|
/// typically 100 is the max number of accounts getMultipleAccounts will retrieve at once
|
|
#[clap(long, env, default_value = "100")]
|
|
get_multiple_accounts_count: usize,
|
|
|
|
/// liquidator health ratio should not fall below this value
|
|
#[clap(long, env, default_value = "50")]
|
|
min_health_ratio: f64,
|
|
|
|
/// if rebalancing is enabled
|
|
///
|
|
/// typically only disabled for tests where swaps are unavailable
|
|
#[clap(long, env, value_enum, default_value = "true")]
|
|
rebalance: BoolArg,
|
|
|
|
/// max slippage to request on swaps to rebalance spot tokens
|
|
#[clap(long, env, default_value = "100")]
|
|
rebalance_slippage_bps: u64,
|
|
|
|
/// tokens to not rebalance (in addition to USDC); use a comma separated list of names
|
|
#[clap(long, env, default_value = "")]
|
|
rebalance_skip_tokens: String,
|
|
|
|
/// if taking tcs orders is enabled
|
|
///
|
|
/// typically only disabled for tests where swaps are unavailable
|
|
#[clap(long, env, value_enum, default_value = "true")]
|
|
take_tcs: BoolArg,
|
|
|
|
/// profit margin at which to take tcs orders
|
|
#[clap(long, env, default_value = "0.0005")]
|
|
tcs_profit_fraction: f64,
|
|
|
|
/// control how tcs triggering provides buy tokens
|
|
#[clap(long, env, value_enum, default_value = "swap-sell-into-buy")]
|
|
tcs_mode: TcsMode,
|
|
|
|
/// prioritize each transaction with this many microlamports/cu
|
|
#[clap(long, env, default_value = "0")]
|
|
prioritization_micro_lamports: u64,
|
|
|
|
/// compute limit requested for liquidation instructions
|
|
#[clap(long, env, default_value = "250000")]
|
|
compute_limit_for_liquidation: u32,
|
|
|
|
/// compute limit requested for tcs trigger instructions
|
|
#[clap(long, env, default_value = "300000")]
|
|
compute_limit_for_tcs: u32,
|
|
|
|
/// control which version of jupiter to use
|
|
#[clap(long, env, value_enum, default_value = "v6")]
|
|
jupiter_version: JupiterVersionArg,
|
|
|
|
/// override the url to jupiter v4
|
|
#[clap(long, env, default_value = "https://quote-api.jup.ag/v4")]
|
|
jupiter_v4_url: String,
|
|
|
|
/// override the url to jupiter v6
|
|
#[clap(long, env, default_value = "https://quote-api.jup.ag/v6")]
|
|
jupiter_v6_url: String,
|
|
|
|
/// report liquidator's existence and pubkey
|
|
#[clap(long, env, value_enum, default_value = "true")]
|
|
telemetry: BoolArg,
|
|
}
|
|
|
|
pub fn encode_address(addr: &Pubkey) -> String {
|
|
bs58::encode(&addr.to_bytes()).into_string()
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> anyhow::Result<()> {
|
|
mango_v4_client::tracing_subscriber_init();
|
|
|
|
let args = if let Ok(cli_dotenv) = CliDotenv::try_parse() {
|
|
dotenv::from_path(cli_dotenv.dotenv)?;
|
|
cli_dotenv.remaining_args
|
|
} else {
|
|
dotenv::dotenv().ok();
|
|
std::env::args_os().collect()
|
|
};
|
|
let cli = Cli::parse_from(args);
|
|
|
|
let liqor_owner = Arc::new(keypair_from_cli(&cli.liqor_owner));
|
|
|
|
let rpc_url = cli.rpc_url;
|
|
let ws_url = rpc_url.replace("https", "wss");
|
|
|
|
let rpc_timeout = Duration::from_secs(10);
|
|
let cluster = Cluster::Custom(rpc_url.clone(), ws_url.clone());
|
|
let commitment = CommitmentConfig::processed();
|
|
let client = Client::builder()
|
|
.cluster(cluster.clone())
|
|
.commitment(commitment)
|
|
.fee_payer(Some(liqor_owner.clone()))
|
|
.timeout(Some(rpc_timeout))
|
|
.jupiter_v4_url(cli.jupiter_v4_url)
|
|
.jupiter_v6_url(cli.jupiter_v6_url)
|
|
.transaction_builder_config(TransactionBuilderConfig {
|
|
prioritization_micro_lamports: (cli.prioritization_micro_lamports > 0)
|
|
.then_some(cli.prioritization_micro_lamports),
|
|
// Liquidation and tcs triggers set their own budgets, this is a default for other tx
|
|
compute_budget_per_instruction: Some(250_000),
|
|
})
|
|
.build()
|
|
.unwrap();
|
|
|
|
// The representation of current on-chain account data
|
|
let chain_data = Arc::new(RwLock::new(chain_data::ChainData::new()));
|
|
// Reading accounts from chain_data
|
|
let account_fetcher = Arc::new(chain_data::AccountFetcher {
|
|
chain_data: chain_data.clone(),
|
|
rpc: client.rpc_async(),
|
|
});
|
|
|
|
let mango_account = account_fetcher
|
|
.fetch_fresh_mango_account(&cli.liqor_mango_account)
|
|
.await?;
|
|
let mango_group = mango_account.fixed.group;
|
|
|
|
let group_context = MangoGroupContext::new_from_rpc(&client.rpc_async(), mango_group).await?;
|
|
|
|
let mango_oracles = group_context
|
|
.tokens
|
|
.values()
|
|
.map(|value| value.oracle)
|
|
.chain(group_context.perp_markets.values().map(|p| p.oracle))
|
|
.unique()
|
|
.collect::<Vec<Pubkey>>();
|
|
|
|
let serum_programs = group_context
|
|
.serum3_markets
|
|
.values()
|
|
.map(|s3| s3.serum_program)
|
|
.unique()
|
|
.collect_vec();
|
|
|
|
//
|
|
// feed setup
|
|
//
|
|
// FUTURE: decouple feed setup and liquidator business logic
|
|
// feed should send updates to a channel which liquidator can consume
|
|
|
|
info!("startup");
|
|
|
|
let metrics = metrics::start();
|
|
|
|
let (account_update_sender, account_update_receiver) =
|
|
async_channel::unbounded::<account_update_stream::Message>();
|
|
|
|
// Sourcing account and slot data from solana via websockets
|
|
// FUTURE: websocket feed should take which accounts to listen to as an input
|
|
websocket_source::start(
|
|
websocket_source::Config {
|
|
rpc_ws_url: ws_url.clone(),
|
|
serum_programs,
|
|
open_orders_authority: mango_group,
|
|
},
|
|
mango_oracles.clone(),
|
|
account_update_sender.clone(),
|
|
);
|
|
|
|
let first_websocket_slot = websocket_source::get_next_create_bank_slot(
|
|
account_update_receiver.clone(),
|
|
Duration::from_secs(10),
|
|
)
|
|
.await?;
|
|
|
|
// Getting solana account snapshots via jsonrpc
|
|
// FUTURE: of what to fetch a snapshot - should probably take as an input
|
|
snapshot_source::start(
|
|
snapshot_source::Config {
|
|
rpc_http_url: rpc_url.clone(),
|
|
mango_group,
|
|
get_multiple_accounts_count: cli.get_multiple_accounts_count,
|
|
parallel_rpc_requests: cli.parallel_rpc_requests,
|
|
snapshot_interval: Duration::from_secs(cli.snapshot_interval_secs),
|
|
min_slot: first_websocket_slot + 10,
|
|
},
|
|
mango_oracles,
|
|
account_update_sender,
|
|
);
|
|
|
|
start_chain_data_metrics(chain_data.clone(), &metrics);
|
|
|
|
let shared_state = Arc::new(RwLock::new(SharedState::default()));
|
|
|
|
//
|
|
// mango client setup
|
|
//
|
|
let mango_client = {
|
|
Arc::new(MangoClient::new_detail(
|
|
client,
|
|
cli.liqor_mango_account,
|
|
liqor_owner,
|
|
group_context,
|
|
account_fetcher.clone(),
|
|
)?)
|
|
};
|
|
|
|
let token_swap_info_config = token_swap_info::Config {
|
|
quote_index: 0, // USDC
|
|
quote_amount: 1_000_000_000, // TODO: config, $1000, should be >= tcs_config.max_trigger_quote_amount
|
|
jupiter_version: cli.jupiter_version.into(),
|
|
};
|
|
|
|
let token_swap_info_updater = Arc::new(token_swap_info::TokenSwapInfoUpdater::new(
|
|
mango_client.clone(),
|
|
token_swap_info_config,
|
|
));
|
|
|
|
let liq_config = liquidate::Config {
|
|
min_health_ratio: cli.min_health_ratio,
|
|
compute_limit_for_liq_ix: cli.compute_limit_for_liquidation,
|
|
// TODO: config
|
|
refresh_timeout: Duration::from_secs(30),
|
|
};
|
|
|
|
let tcs_config = trigger_tcs::Config {
|
|
min_health_ratio: cli.min_health_ratio,
|
|
max_trigger_quote_amount: 1_000_000_000, // TODO: config, $1000
|
|
compute_limit_for_trigger: cli.compute_limit_for_tcs,
|
|
profit_fraction: cli.tcs_profit_fraction,
|
|
collateral_token_index: 0, // USDC
|
|
// TODO: config
|
|
refresh_timeout: Duration::from_secs(30),
|
|
|
|
jupiter_version: cli.jupiter_version.into(),
|
|
jupiter_slippage_bps: cli.rebalance_slippage_bps,
|
|
|
|
mode: cli.tcs_mode.into(),
|
|
min_buy_fraction: 0.7,
|
|
};
|
|
|
|
let rebalance_config = rebalance::Config {
|
|
enabled: cli.rebalance == BoolArg::True,
|
|
slippage_bps: cli.rebalance_slippage_bps,
|
|
// TODO: config
|
|
borrow_settle_excess: 1.05,
|
|
refresh_timeout: Duration::from_secs(30),
|
|
jupiter_version: cli.jupiter_version.into(),
|
|
skip_tokens: cli
|
|
.rebalance_skip_tokens
|
|
.split(',')
|
|
.filter(|v| !v.is_empty())
|
|
.map(|name| mango_client.context.token_by_name(name).token_index)
|
|
.collect(),
|
|
};
|
|
|
|
let rebalancer = Arc::new(rebalance::Rebalancer {
|
|
mango_client: mango_client.clone(),
|
|
account_fetcher: account_fetcher.clone(),
|
|
mango_account_address: cli.liqor_mango_account,
|
|
config: rebalance_config,
|
|
});
|
|
|
|
let mut liquidation = Box::new(LiquidationState {
|
|
mango_client: mango_client.clone(),
|
|
account_fetcher,
|
|
liquidation_config: liq_config,
|
|
trigger_tcs_config: tcs_config,
|
|
rebalancer: rebalancer.clone(),
|
|
token_swap_info: token_swap_info_updater.clone(),
|
|
errors: ErrorTracking::builder()
|
|
.skip_threshold(2)
|
|
.skip_threshold_for_type(LiqErrorType::Liq, 5)
|
|
.skip_duration(Duration::from_secs(120))
|
|
.build()?,
|
|
});
|
|
|
|
info!("main loop");
|
|
|
|
// Job to update chain_data and notify the liquidation job when a new check is needed.
|
|
let data_job = tokio::spawn({
|
|
use account_update_stream::Message;
|
|
|
|
let shared_state = shared_state.clone();
|
|
|
|
let mut metric_account_update_queue_len =
|
|
metrics.register_u64("account_update_queue_length".into());
|
|
let mut metric_mango_accounts = metrics.register_u64("mango_accounts".into());
|
|
|
|
let mut mint_infos = HashMap::<TokenIndex, Pubkey>::new();
|
|
let mut oracles = HashSet::<Pubkey>::new();
|
|
let mut perp_markets = HashMap::<PerpMarketIndex, Pubkey>::new();
|
|
|
|
async move {
|
|
loop {
|
|
let message = account_update_receiver
|
|
.recv()
|
|
.await
|
|
.expect("channel not closed");
|
|
metric_account_update_queue_len.set(account_update_receiver.len() as u64);
|
|
|
|
message.update_chain_data(&mut chain_data.write().unwrap());
|
|
|
|
match message {
|
|
Message::Account(account_write) => {
|
|
let mut state = shared_state.write().unwrap();
|
|
if is_mango_account(&account_write.account, &mango_group).is_some() {
|
|
// e.g. to render debug logs RUST_LOG="liquidator=debug"
|
|
debug!(
|
|
"change to mango account {}...",
|
|
&account_write.pubkey.to_string()[0..3]
|
|
);
|
|
|
|
// Track all MangoAccounts: we need to iterate over them later
|
|
state.mango_accounts.insert(account_write.pubkey);
|
|
metric_mango_accounts.set(state.mango_accounts.len() as u64);
|
|
}
|
|
}
|
|
Message::Snapshot(snapshot) => {
|
|
let mut state = shared_state.write().unwrap();
|
|
// Track all mango account pubkeys
|
|
for update in snapshot.iter() {
|
|
if is_mango_account(&update.account, &mango_group).is_some() {
|
|
state.mango_accounts.insert(update.pubkey);
|
|
}
|
|
if let Some(mint_info) = is_mint_info(&update.account, &mango_group) {
|
|
mint_infos.insert(mint_info.token_index, update.pubkey);
|
|
oracles.insert(mint_info.oracle);
|
|
}
|
|
if let Some(perp_market) = is_perp_market(&update.account, &mango_group)
|
|
{
|
|
perp_markets.insert(perp_market.perp_market_index, update.pubkey);
|
|
oracles.insert(perp_market.oracle);
|
|
}
|
|
}
|
|
metric_mango_accounts.set(state.mango_accounts.len() as u64);
|
|
|
|
state.one_snapshot_done = true;
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
// Could be refactored to only start the below jobs when the first snapshot is done.
|
|
// But need to take care to abort if the above job aborts beforehand.
|
|
|
|
let liquidation_job = tokio::spawn({
|
|
let mut interval = tokio::time::interval(Duration::from_millis(cli.check_interval_ms));
|
|
let shared_state = shared_state.clone();
|
|
async move {
|
|
let mut must_rebalance = true;
|
|
let rebalance_delay = Duration::from_secs(5);
|
|
let mut last_rebalance = Instant::now();
|
|
loop {
|
|
interval.tick().await;
|
|
|
|
let account_addresses = {
|
|
let state = shared_state.write().unwrap();
|
|
if !state.one_snapshot_done {
|
|
continue;
|
|
}
|
|
state.mango_accounts.iter().cloned().collect_vec()
|
|
};
|
|
|
|
liquidation.errors.update();
|
|
|
|
if must_rebalance || last_rebalance.elapsed() > rebalance_delay {
|
|
if let Err(err) = liquidation.rebalancer.zero_all_non_quote().await {
|
|
error!("failed to rebalance liqor: {:?}", err);
|
|
}
|
|
must_rebalance = false;
|
|
last_rebalance = Instant::now();
|
|
}
|
|
|
|
let liquidated = liquidation
|
|
.maybe_liquidate_one(account_addresses.iter())
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut took_tcs = false;
|
|
if !liquidated && cli.take_tcs == BoolArg::True {
|
|
took_tcs = liquidation
|
|
.maybe_take_token_conditional_swap(account_addresses.iter())
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
must_rebalance = must_rebalance || liquidated || took_tcs;
|
|
}
|
|
}
|
|
});
|
|
|
|
let token_swap_info_job = tokio::spawn({
|
|
// TODO: configurable interval
|
|
let mut interval = tokio::time::interval(Duration::from_secs(60));
|
|
let mut startup_wait = tokio::time::interval(Duration::from_secs(1));
|
|
let shared_state = shared_state.clone();
|
|
async move {
|
|
loop {
|
|
startup_wait.tick().await;
|
|
if !shared_state.read().unwrap().one_snapshot_done {
|
|
continue;
|
|
}
|
|
|
|
interval.tick().await;
|
|
let token_indexes = token_swap_info_updater
|
|
.mango_client()
|
|
.context
|
|
.tokens
|
|
.keys()
|
|
.copied()
|
|
.collect_vec();
|
|
let mut min_delay = tokio::time::interval(Duration::from_secs(1));
|
|
for token_index in token_indexes {
|
|
min_delay.tick().await;
|
|
match token_swap_info_updater.update_one(token_index).await {
|
|
Ok(()) => {}
|
|
Err(err) => {
|
|
warn!(
|
|
"failed to update token swap info for token {token_index}: {err:?}",
|
|
);
|
|
}
|
|
}
|
|
}
|
|
token_swap_info_updater.log_all();
|
|
}
|
|
}
|
|
});
|
|
|
|
let check_changes_for_abort_job =
|
|
tokio::spawn(MangoClient::loop_check_for_context_changes_and_abort(
|
|
mango_client.clone(),
|
|
Duration::from_secs(300),
|
|
));
|
|
|
|
if cli.telemetry == BoolArg::True {
|
|
tokio::spawn(telemetry::report_regularly(
|
|
mango_client,
|
|
cli.min_health_ratio,
|
|
));
|
|
}
|
|
|
|
use futures::StreamExt;
|
|
let mut jobs: futures::stream::FuturesUnordered<_> = vec![
|
|
data_job,
|
|
liquidation_job,
|
|
token_swap_info_job,
|
|
check_changes_for_abort_job,
|
|
]
|
|
.into_iter()
|
|
.collect();
|
|
jobs.next().await;
|
|
|
|
error!("a critical job aborted, exiting");
|
|
Ok(())
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct SharedState {
|
|
/// Addresses of the MangoAccounts belonging to the mango program.
|
|
/// Needed to check health of them all when the cache updates.
|
|
mango_accounts: HashSet<Pubkey>,
|
|
|
|
/// Is the first snapshot done? Only start checking account health when it is.
|
|
one_snapshot_done: bool,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
|
pub enum LiqErrorType {
|
|
Liq,
|
|
/// Errors that suggest we maybe should skip trying to collect tcs for that pubkey
|
|
TcsCollectionHard,
|
|
/// Recording errors when some tcs have errors during collection but others don't
|
|
TcsCollectionPartial,
|
|
TcsExecution,
|
|
}
|
|
|
|
impl std::fmt::Display for LiqErrorType {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
match self {
|
|
Self::Liq => write!(f, "liq"),
|
|
Self::TcsCollectionHard => write!(f, "tcs-collection-hard"),
|
|
Self::TcsCollectionPartial => write!(f, "tcs-collection-partial"),
|
|
Self::TcsExecution => write!(f, "tcs-execution"),
|
|
}
|
|
}
|
|
}
|
|
|
|
struct LiquidationState {
|
|
mango_client: Arc<MangoClient>,
|
|
account_fetcher: Arc<chain_data::AccountFetcher>,
|
|
rebalancer: Arc<rebalance::Rebalancer>,
|
|
token_swap_info: Arc<token_swap_info::TokenSwapInfoUpdater>,
|
|
liquidation_config: liquidate::Config,
|
|
trigger_tcs_config: trigger_tcs::Config,
|
|
|
|
errors: ErrorTracking<Pubkey, LiqErrorType>,
|
|
}
|
|
|
|
impl LiquidationState {
|
|
async fn maybe_liquidate_one<'b>(
|
|
&mut self,
|
|
accounts_iter: impl Iterator<Item = &'b Pubkey>,
|
|
) -> anyhow::Result<bool> {
|
|
use rand::seq::SliceRandom;
|
|
|
|
let mut accounts = accounts_iter.collect::<Vec<&Pubkey>>();
|
|
{
|
|
let mut rng = rand::thread_rng();
|
|
accounts.shuffle(&mut rng);
|
|
}
|
|
|
|
for pubkey in accounts {
|
|
if self
|
|
.maybe_liquidate_and_log_error(pubkey)
|
|
.await
|
|
.unwrap_or(false)
|
|
{
|
|
return Ok(true);
|
|
}
|
|
}
|
|
|
|
Ok(false)
|
|
}
|
|
|
|
async fn maybe_liquidate_and_log_error(&mut self, pubkey: &Pubkey) -> anyhow::Result<bool> {
|
|
let now = Instant::now();
|
|
let error_tracking = &mut self.errors;
|
|
|
|
// Skip a pubkey if there've been too many errors recently
|
|
if let Some(error_entry) =
|
|
error_tracking.had_too_many_errors(LiqErrorType::Liq, pubkey, now)
|
|
{
|
|
trace!(
|
|
%pubkey,
|
|
error_entry.count,
|
|
"skip checking account for liquidation, had errors recently",
|
|
);
|
|
return Ok(false);
|
|
}
|
|
|
|
let result = liquidate::maybe_liquidate_account(
|
|
&self.mango_client,
|
|
&self.account_fetcher,
|
|
pubkey,
|
|
&self.liquidation_config,
|
|
)
|
|
.await;
|
|
|
|
if let Err(err) = result.as_ref() {
|
|
// Keep track of pubkeys that had errors
|
|
error_tracking.record(LiqErrorType::Liq, pubkey, err.to_string());
|
|
|
|
// Not all errors need to be raised to the user's attention.
|
|
let mut is_error = true;
|
|
|
|
// Simulation errors due to liqee precondition failures on the liquidation instructions
|
|
// will commonly happen if our liquidator is late or if there are chain forks.
|
|
match err.downcast_ref::<MangoClientError>() {
|
|
Some(MangoClientError::SendTransactionPreflightFailure { logs, .. }) => {
|
|
if logs.iter().any(|line| {
|
|
line.contains("HealthMustBeNegative") || line.contains("IsNotBankrupt")
|
|
}) {
|
|
is_error = false;
|
|
}
|
|
}
|
|
_ => {}
|
|
};
|
|
if is_error {
|
|
error!("liquidating account {}: {:?}", pubkey, err);
|
|
} else {
|
|
trace!("liquidating account {}: {:?}", pubkey, err);
|
|
}
|
|
} else {
|
|
error_tracking.clear(LiqErrorType::Liq, pubkey);
|
|
}
|
|
|
|
result
|
|
}
|
|
|
|
async fn maybe_take_token_conditional_swap<'b>(
|
|
&mut self,
|
|
accounts_iter: impl Iterator<Item = &'b Pubkey>,
|
|
) -> anyhow::Result<bool> {
|
|
let accounts = accounts_iter.collect::<Vec<&Pubkey>>();
|
|
|
|
let now = Instant::now();
|
|
let now_ts: u64 = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)?
|
|
.as_secs();
|
|
|
|
let tcs_context = trigger_tcs::Context {
|
|
mango_client: self.mango_client.clone(),
|
|
account_fetcher: self.account_fetcher.clone(),
|
|
token_swap_info: self.token_swap_info.clone(),
|
|
config: self.trigger_tcs_config.clone(),
|
|
jupiter_quote_cache: Arc::new(trigger_tcs::JupiterQuoteCache::default()),
|
|
now_ts,
|
|
};
|
|
|
|
// Find interesting (pubkey, tcsid, volume)
|
|
let mut interesting_tcs = Vec::with_capacity(accounts.len());
|
|
for pubkey in accounts.iter() {
|
|
if let Some(error_entry) =
|
|
self.errors
|
|
.had_too_many_errors(LiqErrorType::TcsCollectionHard, pubkey, now)
|
|
{
|
|
trace!(
|
|
%pubkey,
|
|
error_entry.count,
|
|
"skip checking account for tcs, had errors recently",
|
|
);
|
|
continue;
|
|
}
|
|
|
|
match tcs_context.find_interesting_tcs_for_account(pubkey) {
|
|
Ok(v) => {
|
|
self.errors.clear(LiqErrorType::TcsCollectionHard, pubkey);
|
|
if v.is_empty() {
|
|
self.errors
|
|
.clear(LiqErrorType::TcsCollectionPartial, pubkey);
|
|
self.errors.clear(LiqErrorType::TcsExecution, pubkey);
|
|
} else if v.iter().all(|it| it.is_ok()) {
|
|
self.errors
|
|
.clear(LiqErrorType::TcsCollectionPartial, pubkey);
|
|
} else {
|
|
for it in v.iter() {
|
|
if let Err(e) = it {
|
|
self.errors.record(
|
|
LiqErrorType::TcsCollectionPartial,
|
|
pubkey,
|
|
e.to_string(),
|
|
);
|
|
}
|
|
}
|
|
}
|
|
interesting_tcs.extend(v.iter().filter_map(|it| it.as_ref().ok()));
|
|
}
|
|
Err(e) => {
|
|
self.errors
|
|
.record(LiqErrorType::TcsCollectionHard, pubkey, e.to_string());
|
|
}
|
|
}
|
|
}
|
|
if interesting_tcs.is_empty() {
|
|
return Ok(false);
|
|
}
|
|
|
|
let (txsigs, mut changed_pubkeys) = tcs_context
|
|
.execute_tcs(&mut interesting_tcs, &mut self.errors)
|
|
.await?;
|
|
for pubkey in changed_pubkeys.iter() {
|
|
self.errors.clear(LiqErrorType::TcsExecution, pubkey);
|
|
}
|
|
changed_pubkeys.push(self.mango_client.mango_account_address);
|
|
|
|
// Force a refresh of affected accounts
|
|
let slot = self.account_fetcher.transaction_max_slot(&txsigs).await?;
|
|
if let Err(e) = self
|
|
.account_fetcher
|
|
.refresh_accounts_via_rpc_until_slot(
|
|
&changed_pubkeys,
|
|
slot,
|
|
self.liquidation_config.refresh_timeout,
|
|
)
|
|
.await
|
|
{
|
|
info!(slot, "could not refresh after tcs execution: {}", e);
|
|
}
|
|
|
|
Ok(true)
|
|
}
|
|
}
|
|
|
|
fn start_chain_data_metrics(chain: Arc<RwLock<chain_data::ChainData>>, metrics: &metrics::Metrics) {
|
|
let mut interval = tokio::time::interval(Duration::from_secs(600));
|
|
|
|
let mut metric_slots_count = metrics.register_u64("chain_data_slots_count".into());
|
|
let mut metric_accounts_count = metrics.register_u64("chain_data_accounts_count".into());
|
|
let mut metric_account_write_count =
|
|
metrics.register_u64("chain_data_account_write_count".into());
|
|
|
|
tokio::spawn(async move {
|
|
loop {
|
|
interval.tick().await;
|
|
let chain_lock = chain.read().unwrap();
|
|
metric_slots_count.set(chain_lock.slots_count() as u64);
|
|
metric_accounts_count.set(chain_lock.accounts_count() as u64);
|
|
metric_account_write_count.set(chain_lock.account_writes_count() as u64);
|
|
}
|
|
});
|
|
}
|