mango-v4/bin/liquidator/src/main.rs

814 lines
28 KiB
Rust
Raw Normal View History

use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use anchor_client::Cluster;
use clap::Parser;
use mango_v4::state::{PerpMarketIndex, TokenIndex};
use mango_v4_client::{
account_update_stream, chain_data, error_tracking::ErrorTracking, jupiter, keypair_from_cli,
snapshot_source, websocket_source, Client, MangoClient, MangoClientError, MangoGroupContext,
TransactionBuilderConfig,
};
use itertools::Itertools;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use tracing::*;
pub mod liquidate;
pub mod metrics;
pub mod rebalance;
pub mod telemetry;
pub mod token_swap_info;
pub mod trigger_tcs;
pub mod util;
2023-08-14 06:06:28 -07:00
use crate::util::{is_mango_account, is_mint_info, is_perp_market};
// jemalloc seems to be better at keeping the memory footprint reasonable over
// longer periods of time
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
#[derive(Parser, Debug)]
#[clap()]
struct CliDotenv {
// When --dotenv <file> is passed, read the specified dotenv file before parsing args
#[clap(long)]
dotenv: std::path::PathBuf,
remaining_args: Vec<std::ffi::OsString>,
}
// Prefer "--rebalance false" over "--no-rebalance" because it works
// better with REBALANCE=false env values.
#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq)]
enum BoolArg {
True,
False,
}
#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq)]
enum JupiterVersionArg {
Mock,
V4,
V6,
}
impl From<JupiterVersionArg> for jupiter::Version {
fn from(a: JupiterVersionArg) -> Self {
match a {
JupiterVersionArg::Mock => jupiter::Version::Mock,
JupiterVersionArg::V4 => jupiter::Version::V4,
JupiterVersionArg::V6 => jupiter::Version::V6,
}
}
}
#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq)]
enum TcsMode {
BorrowBuy,
SwapSellIntoBuy,
SwapCollateralIntoBuy,
}
impl From<TcsMode> for trigger_tcs::Mode {
fn from(a: TcsMode) -> Self {
match a {
TcsMode::BorrowBuy => trigger_tcs::Mode::BorrowBuyToken,
TcsMode::SwapSellIntoBuy => trigger_tcs::Mode::SwapSellIntoBuy,
TcsMode::SwapCollateralIntoBuy => trigger_tcs::Mode::SwapCollateralIntoBuy,
}
}
}
#[derive(Parser)]
#[clap()]
struct Cli {
#[clap(short, long, env)]
rpc_url: String,
#[clap(long, env)]
2022-08-01 05:19:52 -07:00
liqor_mango_account: Pubkey,
#[clap(long, env)]
2022-08-01 05:19:52 -07:00
liqor_owner: String,
#[clap(long, env, default_value = "1000")]
check_interval_ms: u64,
#[clap(long, env, default_value = "300")]
snapshot_interval_secs: u64,
2022-08-05 04:50:44 -07:00
/// how many getMultipleAccounts requests to send in parallel
#[clap(long, env, default_value = "10")]
parallel_rpc_requests: usize,
2022-08-05 04:50:44 -07:00
/// typically 100 is the max number of accounts getMultipleAccounts will retrieve at once
#[clap(long, env, default_value = "100")]
get_multiple_accounts_count: usize,
2022-08-05 04:50:44 -07:00
/// liquidator health ratio should not fall below this value
#[clap(long, env, default_value = "50")]
min_health_ratio: f64,
/// if rebalancing is enabled
///
/// typically only disabled for tests where swaps are unavailable
#[clap(long, env, value_enum, default_value = "true")]
rebalance: BoolArg,
/// max slippage to request on swaps to rebalance spot tokens
#[clap(long, env, default_value = "100")]
rebalance_slippage_bps: u64,
/// tokens to not rebalance (in addition to USDC); use a comma separated list of names
#[clap(long, env, default_value = "")]
rebalance_skip_tokens: String,
/// if taking tcs orders is enabled
///
/// typically only disabled for tests where swaps are unavailable
#[clap(long, env, value_enum, default_value = "true")]
take_tcs: BoolArg,
/// profit margin at which to take tcs orders
#[clap(long, env, default_value = "0.0005")]
tcs_profit_fraction: f64,
/// control how tcs triggering provides buy tokens
#[clap(long, env, value_enum, default_value = "swap-sell-into-buy")]
tcs_mode: TcsMode,
/// prioritize each transaction with this many microlamports/cu
#[clap(long, env, default_value = "0")]
prioritization_micro_lamports: u64,
/// compute limit requested for liquidation instructions
#[clap(long, env, default_value = "250000")]
compute_limit_for_liquidation: u32,
/// compute limit requested for tcs trigger instructions
#[clap(long, env, default_value = "300000")]
compute_limit_for_tcs: u32,
/// control which version of jupiter to use
#[clap(long, env, value_enum, default_value = "v6")]
jupiter_version: JupiterVersionArg,
/// report liquidator's existence and pubkey
#[clap(long, env, value_enum, default_value = "true")]
telemetry: BoolArg,
}
pub fn encode_address(addr: &Pubkey) -> String {
bs58::encode(&addr.to_bytes()).into_string()
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
mango_v4_client::tracing_subscriber_init();
let args = if let Ok(cli_dotenv) = CliDotenv::try_parse() {
dotenv::from_path(cli_dotenv.dotenv)?;
cli_dotenv.remaining_args
} else {
dotenv::dotenv().ok();
std::env::args_os().collect()
};
let cli = Cli::parse_from(args);
2023-02-10 05:22:07 -08:00
let liqor_owner = Arc::new(keypair_from_cli(&cli.liqor_owner));
let rpc_url = cli.rpc_url;
let ws_url = rpc_url.replace("https", "wss");
let rpc_timeout = Duration::from_secs(10);
let cluster = Cluster::Custom(rpc_url.clone(), ws_url.clone());
let commitment = CommitmentConfig::processed();
let client = Client::new(
cluster.clone(),
commitment,
2023-02-10 05:22:07 -08:00
liqor_owner.clone(),
Some(rpc_timeout),
TransactionBuilderConfig {
prioritization_micro_lamports: (cli.prioritization_micro_lamports > 0)
.then_some(cli.prioritization_micro_lamports),
deploy -> dev (#759) * ts: get yarn lock from dev Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * v0.19.20 * ts: add missing dependency Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * ts: add error when no free token position is found (#707) Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Mc/tcs improvements (#706) * ts: additional tcs helpers Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Fixes from review Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Revert "Fixes from review" This reverts commit 1def10353511802c030a100fd23b2c2f4f198eaa. --------- Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * v0.19.21 * v0.19.22 * ts: tcs fix price display input to tx Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * v0.19.23 * v0.19.25 * script: log all Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * ts: fix tcs order price limits Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * v0.19.27 * ts: fix getTimeToNextBorrowLimitWindowStartsTs (#710) * ts: fix getTimeToNextBorrowLimitWindowStartsTs Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Fixes from review Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> --------- Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Mc/keeper (#714) * v0.19.28 * ts: tokenWithdrawAllDepositForMint Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Fix Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Fix Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * rust: dont include tokens with errors in crank Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Fixes from review Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * review fixes * Fixes from review Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> --------- Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> Co-authored-by: Christian Kamm <mail@ckamm.de> * v0.19.29 * ts: update debug script Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * release 19.1 -> deploy + serum3 open orders estimation ts patch (#719) * Serum3 open orders: Fix health overestimation (#716) When bids or asks crossed the oracle price, the serum3 health would be overestimated before. The health code has no access to the open order quantites or prices and used to assume all orders are at oracle price. Now we track an account's max bid and min ask in each market and use that as a worst-case price. The tracking isn't perfect for technical reasons (compute cost, no notifications on fill) but produces an upper bound on bids (lower bound on asks) that is sufficient to make health not overestimate. The tracked price is reset every time the serum3 open orders on a book side are completely cleared. (cherry picked from commit 2adc0339dc9f1cefa31ceb2bee6c5ca01cd69d69) * Changelog, version bump for program v0.19.1 * ts: ts patch for the PR Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> --------- Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> Co-authored-by: Christian Kamm <mail@ckamm.de> * Rust client: Use alts for every transaction (#720) (cherry picked from commit 40ad0b7b66bf8c2203d32b45ac9c13bf9683e831) * Jupiter: ensure source account is initialized Backport of 9b224eae1b0d37ee1565ab333083e018212d99c7 / #721 * client/liquidator: jupiter v6 (#684) Add rust client functions for v6 API that are usuable in parallel to the v4 ones. (cherry picked from commit 0f10cb4d925c78f8548da53e1ba518d4df521004) * Jupiter: Ensure source account is initialized (#721) (cherry picked from commit 9b224eae1b0d37ee1565ab333083e018212d99c7) * Mc/update cu budget for perp settle pnl (#724) * ts: bump perp settle pnl cu budget Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * ts: helpers for withdrawing tokens from bad oracles (#726) * ts: helpers for withdrawing tokens from bad oracles Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Fixes from review Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Fixes from review Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Fixes from review Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Fixes from review Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * rename Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * rename Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Fix usage of field Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Fixes from review Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> --------- Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * v0.19.31 * ts: higher min. cu limit for each tx (#727) Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * v0.19.32 * ts: if more ixs then more cu (#728) Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Mc/tcs p95 (#708) * use more fine grain price impact Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * ts: for computing tcs premium use more fine grain price impact Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> --------- Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * update Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Mc/settler cu limit (#725) * v0.19.30 * settler: extend cu limit to 250k for perp pnl settling Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * TransactionBuilder: add cu limit/price based on config --------- Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> Co-authored-by: Christian Kamm <mail@ckamm.de> * ts: rename params to indicate that they are in native Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * ts: cleanup tcs create parameter naming (#730) Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * wip: Mc/update risk params (#729) * v0.19.33 * ts: script to update risk params Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * create proposals helpers * fix * Update env params Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Update Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Update Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * simulate before run * fix presets * fix --------- Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> Co-authored-by: Adrian Brzeziński <a.brzezinski94@gmail.com> * ts: upgrade anchor (#735) * ts: upgrade anchor Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Fixes from review Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> --------- Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * script for tx error grouping, and ts helper code for finding tx error reason (#747) Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * v0.19.34 * ts: fix script for updating token params Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * Fix typo Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * script: update script to remove files which are of 0 size Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * script: error tx grouping, blacklist some more Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> * fix (#753) * jupiter: clearer slippage_bps argument name --------- Signed-off-by: microwavedcola1 <microwavedcola@gmail.com> Co-authored-by: Christian Kamm <mail@ckamm.de> Co-authored-by: Adrian Brzeziński <a.brzezinski94@gmail.com>
2023-10-19 01:03:07 -07:00
// Liquidation and tcs triggers set their own budgets, this is a default for other tx
compute_budget_per_instruction: Some(250_000),
},
);
2022-08-01 05:19:52 -07:00
// The representation of current on-chain account data
let chain_data = Arc::new(RwLock::new(chain_data::ChainData::new()));
// Reading accounts from chain_data
let account_fetcher = Arc::new(chain_data::AccountFetcher {
chain_data: chain_data.clone(),
rpc: client.rpc_async(),
2022-08-01 05:19:52 -07:00
});
let mango_account = account_fetcher
.fetch_fresh_mango_account(&cli.liqor_mango_account)
.await?;
2022-08-01 05:19:52 -07:00
let mango_group = mango_account.fixed.group;
let group_context = MangoGroupContext::new_from_rpc(&client.rpc_async(), mango_group).await?;
2022-08-05 04:50:44 -07:00
let mango_oracles = group_context
.tokens
.values()
.map(|value| value.mint_info.oracle)
.chain(group_context.perp_markets.values().map(|p| p.market.oracle))
.unique()
.collect::<Vec<Pubkey>>();
let serum_programs = group_context
.serum3_markets
.values()
.map(|s3| s3.market.serum_program)
.unique()
.collect_vec();
//
// feed setup
//
// FUTURE: decouple feed setup and liquidator business logic
// feed should send updates to a channel which liquidator can consume
info!("startup");
let metrics = metrics::start();
let (account_update_sender, account_update_receiver) =
async_channel::unbounded::<account_update_stream::Message>();
// Sourcing account and slot data from solana via websockets
// FUTURE: websocket feed should take which accounts to listen to as an input
websocket_source::start(
websocket_source::Config {
rpc_ws_url: ws_url.clone(),
serum_programs,
open_orders_authority: mango_group,
},
2022-08-05 04:50:44 -07:00
mango_oracles.clone(),
account_update_sender.clone(),
);
let first_websocket_slot = websocket_source::get_next_create_bank_slot(
account_update_receiver.clone(),
Duration::from_secs(10),
)
.await?;
// Getting solana account snapshots via jsonrpc
// FUTURE: of what to fetch a snapshot - should probably take as an input
snapshot_source::start(
snapshot_source::Config {
rpc_http_url: rpc_url.clone(),
mango_group,
get_multiple_accounts_count: cli.get_multiple_accounts_count,
parallel_rpc_requests: cli.parallel_rpc_requests,
snapshot_interval: Duration::from_secs(cli.snapshot_interval_secs),
min_slot: first_websocket_slot + 10,
},
2022-08-05 04:50:44 -07:00
mango_oracles,
account_update_sender,
);
start_chain_data_metrics(chain_data.clone(), &metrics);
let shared_state = Arc::new(RwLock::new(SharedState::default()));
//
// mango client setup
//
let mango_client = {
Arc::new(MangoClient::new_detail(
2022-08-01 05:19:52 -07:00
client,
cli.liqor_mango_account,
liqor_owner,
group_context,
account_fetcher.clone(),
)?)
};
let token_swap_info_config = token_swap_info::Config {
quote_index: 0, // USDC
quote_amount: 1_000_000_000, // TODO: config, $1000, should be >= tcs_config.max_trigger_quote_amount
jupiter_version: cli.jupiter_version.into(),
};
let token_swap_info_updater = Arc::new(token_swap_info::TokenSwapInfoUpdater::new(
mango_client.clone(),
token_swap_info_config,
));
2022-08-05 04:50:44 -07:00
let liq_config = liquidate::Config {
min_health_ratio: cli.min_health_ratio,
compute_limit_for_liq_ix: cli.compute_limit_for_liquidation,
// TODO: config
refresh_timeout: Duration::from_secs(30),
};
let tcs_config = trigger_tcs::Config {
min_health_ratio: cli.min_health_ratio,
max_trigger_quote_amount: 1_000_000_000, // TODO: config, $1000
compute_limit_for_trigger: cli.compute_limit_for_tcs,
profit_fraction: cli.tcs_profit_fraction,
collateral_token_index: 0, // USDC
// TODO: config
refresh_timeout: Duration::from_secs(30),
jupiter_version: cli.jupiter_version.into(),
jupiter_slippage_bps: cli.rebalance_slippage_bps,
mode: cli.tcs_mode.into(),
min_buy_fraction: 0.7,
2022-08-05 04:50:44 -07:00
};
let mut rebalance_interval = tokio::time::interval(Duration::from_secs(5));
let rebalance_config = rebalance::Config {
enabled: cli.rebalance == BoolArg::True,
slippage_bps: cli.rebalance_slippage_bps,
// TODO: config
borrow_settle_excess: 1.05,
refresh_timeout: Duration::from_secs(30),
jupiter_version: cli.jupiter_version.into(),
skip_tokens: cli
.rebalance_skip_tokens
.split(',')
.filter(|v| !v.is_empty())
.map(|name| mango_client.context.token_by_name(name).token_index)
.collect(),
};
let rebalancer = Arc::new(rebalance::Rebalancer {
mango_client: mango_client.clone(),
account_fetcher: account_fetcher.clone(),
mango_account_address: cli.liqor_mango_account,
config: rebalance_config,
});
let mut liquidation = Box::new(LiquidationState {
mango_client: mango_client.clone(),
account_fetcher,
liquidation_config: liq_config,
trigger_tcs_config: tcs_config,
rebalancer: rebalancer.clone(),
token_swap_info: token_swap_info_updater.clone(),
liq_errors: ErrorTracking {
skip_threshold: 5,
skip_duration: Duration::from_secs(120),
..ErrorTracking::default()
},
tcs_collection_hard_errors: ErrorTracking {
skip_threshold: 2,
skip_duration: Duration::from_secs(120),
..ErrorTracking::default()
},
tcs_collection_partial_errors: ErrorTracking {
skip_threshold: 2,
skip_duration: Duration::from_secs(120),
..ErrorTracking::default()
},
tcs_execution_errors: ErrorTracking {
skip_threshold: 2,
skip_duration: Duration::from_secs(120),
..ErrorTracking::default()
},
persistent_error_report_interval: Duration::from_secs(300),
persistent_error_min_duration: Duration::from_secs(300),
last_persistent_error_report: Instant::now(),
});
info!("main loop");
// Job to update chain_data and notify the liquidation job when a new check is needed.
let data_job = tokio::spawn({
use account_update_stream::Message;
let shared_state = shared_state.clone();
let mut metric_account_update_queue_len =
metrics.register_u64("account_update_queue_length".into());
let mut metric_mango_accounts = metrics.register_u64("mango_accounts".into());
let mut mint_infos = HashMap::<TokenIndex, Pubkey>::new();
let mut oracles = HashSet::<Pubkey>::new();
let mut perp_markets = HashMap::<PerpMarketIndex, Pubkey>::new();
async move {
loop {
let message = account_update_receiver
.recv()
.await
.expect("channel not closed");
metric_account_update_queue_len.set(account_update_receiver.len() as u64);
message.update_chain_data(&mut chain_data.write().unwrap());
match message {
Message::Account(account_write) => {
let mut state = shared_state.write().unwrap();
if is_mango_account(&account_write.account, &mango_group).is_some() {
// e.g. to render debug logs RUST_LOG="liquidator=debug"
debug!(
"change to mango account {}...",
&account_write.pubkey.to_string()[0..3]
);
// Track all MangoAccounts: we need to iterate over them later
state.mango_accounts.insert(account_write.pubkey);
metric_mango_accounts.set(state.mango_accounts.len() as u64);
}
}
Message::Snapshot(snapshot) => {
let mut state = shared_state.write().unwrap();
// Track all mango account pubkeys
for update in snapshot.iter() {
if is_mango_account(&update.account, &mango_group).is_some() {
state.mango_accounts.insert(update.pubkey);
}
if let Some(mint_info) = is_mint_info(&update.account, &mango_group) {
mint_infos.insert(mint_info.token_index, update.pubkey);
oracles.insert(mint_info.oracle);
}
if let Some(perp_market) = is_perp_market(&update.account, &mango_group)
{
perp_markets.insert(perp_market.perp_market_index, update.pubkey);
oracles.insert(perp_market.oracle);
}
}
metric_mango_accounts.set(state.mango_accounts.len() as u64);
state.one_snapshot_done = true;
}
_ => {}
}
}
}
});
// Could be refactored to only start the below jobs when the first snapshot is done.
// But need to take care to abort if the above job aborts beforehand.
let rebalance_job = tokio::spawn({
let shared_state = shared_state.clone();
async move {
loop {
rebalance_interval.tick().await;
if !shared_state.read().unwrap().one_snapshot_done {
continue;
}
if let Err(err) = rebalancer.zero_all_non_quote().await {
error!("failed to rebalance liqor: {:?}", err);
// Workaround: We really need a sequence enforcer in the liquidator since we don't want to
// accidentally send a similar tx again when we incorrectly believe an earlier one got forked
// off. For now, hard sleep on error to avoid the most frequent error cases.
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
});
let liquidation_job = tokio::spawn({
let mut interval = tokio::time::interval(Duration::from_millis(cli.check_interval_ms));
let shared_state = shared_state.clone();
async move {
loop {
2023-08-14 06:06:28 -07:00
interval.tick().await;
2023-08-14 06:06:28 -07:00
let account_addresses = {
let state = shared_state.write().unwrap();
if !state.one_snapshot_done {
continue;
}
2023-08-14 06:06:28 -07:00
state.mango_accounts.iter().cloned().collect_vec()
};
liquidation.log_persistent_errors();
let liquidated = liquidation
.maybe_liquidate_one(account_addresses.iter())
.await
.unwrap();
let mut took_tcs = false;
if !liquidated && cli.take_tcs == BoolArg::True {
took_tcs = liquidation
.maybe_take_token_conditional_swap(account_addresses.iter())
.await
.unwrap();
}
if liquidated || took_tcs {
// It's awkward that this rebalance can run in parallel with the one
// from the rebalance_job. Ideally we'd get only one at a time/in quick succession.
// However, we do want to rebalance after a liquidation before liquidating further.
if let Err(err) = liquidation.rebalancer.zero_all_non_quote().await {
error!("failed to rebalance liqor: {:?}", err);
}
}
}
}
});
let token_swap_info_job = tokio::spawn({
// TODO: configurable interval
let mut interval = tokio::time::interval(Duration::from_secs(60));
let mut startup_wait = tokio::time::interval(Duration::from_secs(1));
let shared_state = shared_state.clone();
async move {
loop {
startup_wait.tick().await;
if !shared_state.read().unwrap().one_snapshot_done {
continue;
}
2023-08-14 06:06:28 -07:00
interval.tick().await;
let token_indexes = token_swap_info_updater
.mango_client()
.context
.tokens
.keys()
.copied()
.collect_vec();
let mut min_delay = tokio::time::interval(Duration::from_secs(1));
for token_index in token_indexes {
min_delay.tick().await;
match token_swap_info_updater.update_one(token_index).await {
Ok(()) => {}
Err(err) => {
warn!(
2023-08-14 06:06:28 -07:00
"failed to update token swap info for token {token_index}: {err:?}",
);
}
}
}
token_swap_info_updater.log_all();
}
}
});
if cli.telemetry == BoolArg::True {
tokio::spawn(telemetry::report_regularly(
mango_client,
cli.min_health_ratio,
));
}
use futures::StreamExt;
let mut jobs: futures::stream::FuturesUnordered<_> = vec![
data_job,
rebalance_job,
liquidation_job,
token_swap_info_job,
]
.into_iter()
.collect();
jobs.next().await;
error!("a critical job aborted, exiting");
Ok(())
}
#[derive(Default)]
struct SharedState {
/// Addresses of the MangoAccounts belonging to the mango program.
/// Needed to check health of them all when the cache updates.
mango_accounts: HashSet<Pubkey>,
/// Is the first snapshot done? Only start checking account health when it is.
one_snapshot_done: bool,
}
struct LiquidationState {
mango_client: Arc<MangoClient>,
account_fetcher: Arc<chain_data::AccountFetcher>,
rebalancer: Arc<rebalance::Rebalancer>,
token_swap_info: Arc<token_swap_info::TokenSwapInfoUpdater>,
liquidation_config: liquidate::Config,
trigger_tcs_config: trigger_tcs::Config,
liq_errors: ErrorTracking,
/// Errors that suggest we maybe should skip trying to collect tcs for that pubkey
tcs_collection_hard_errors: ErrorTracking,
/// Recording errors when some tcs have errors during collection but others don't
tcs_collection_partial_errors: ErrorTracking,
tcs_execution_errors: ErrorTracking,
persistent_error_report_interval: Duration,
last_persistent_error_report: Instant,
persistent_error_min_duration: Duration,
}
impl LiquidationState {
async fn maybe_liquidate_one<'b>(
&mut self,
accounts_iter: impl Iterator<Item = &'b Pubkey>,
) -> anyhow::Result<bool> {
use rand::seq::SliceRandom;
let mut accounts = accounts_iter.collect::<Vec<&Pubkey>>();
{
let mut rng = rand::thread_rng();
accounts.shuffle(&mut rng);
}
for pubkey in accounts {
if self
.maybe_liquidate_and_log_error(pubkey)
.await
.unwrap_or(false)
{
return Ok(true);
}
}
Ok(false)
}
async fn maybe_liquidate_and_log_error(&mut self, pubkey: &Pubkey) -> anyhow::Result<bool> {
let now = Instant::now();
let error_tracking = &mut self.liq_errors;
// Skip a pubkey if there've been too many errors recently
if let Some(error_entry) = error_tracking.had_too_many_errors(pubkey, now) {
trace!(
%pubkey,
error_entry.count,
"skip checking account for liquidation, had errors recently",
);
return Ok(false);
}
let result = liquidate::maybe_liquidate_account(
&self.mango_client,
&self.account_fetcher,
pubkey,
&self.liquidation_config,
)
.await;
if let Err(err) = result.as_ref() {
// Keep track of pubkeys that had errors
error_tracking.record_error(pubkey, now, err.to_string());
// Not all errors need to be raised to the user's attention.
let mut is_error = true;
// Simulation errors due to liqee precondition failures on the liquidation instructions
// will commonly happen if our liquidator is late or if there are chain forks.
match err.downcast_ref::<MangoClientError>() {
Some(MangoClientError::SendTransactionPreflightFailure { logs, .. }) => {
if logs.iter().any(|line| {
line.contains("HealthMustBeNegative") || line.contains("IsNotBankrupt")
}) {
is_error = false;
}
}
_ => {}
};
if is_error {
error!("liquidating account {}: {:?}", pubkey, err);
} else {
trace!("liquidating account {}: {:?}", pubkey, err);
}
} else {
error_tracking.clear_errors(pubkey);
}
result
}
async fn maybe_take_token_conditional_swap<'b>(
&mut self,
accounts_iter: impl Iterator<Item = &'b Pubkey>,
) -> anyhow::Result<bool> {
let accounts = accounts_iter.collect::<Vec<&Pubkey>>();
let now = Instant::now();
let now_ts: u64 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
2023-11-08 00:51:36 -08:00
.as_secs();
let tcs_context = trigger_tcs::Context {
mango_client: self.mango_client.clone(),
account_fetcher: self.account_fetcher.clone(),
token_swap_info: self.token_swap_info.clone(),
config: self.trigger_tcs_config.clone(),
jupiter_quote_cache: Arc::new(trigger_tcs::JupiterQuoteCache::default()),
now_ts,
};
// Find interesting (pubkey, tcsid, volume)
let mut interesting_tcs = Vec::with_capacity(accounts.len());
for pubkey in accounts.iter() {
if let Some(error_entry) = self
.tcs_collection_hard_errors
.had_too_many_errors(pubkey, now)
{
trace!(
%pubkey,
error_entry.count,
"skip checking account for tcs, had errors recently",
);
continue;
}
match tcs_context.find_interesting_tcs_for_account(pubkey) {
Ok(v) => {
self.tcs_collection_hard_errors.clear_errors(pubkey);
if v.is_empty() {
self.tcs_collection_partial_errors.clear_errors(pubkey);
self.tcs_execution_errors.clear_errors(pubkey);
} else if v.iter().all(|it| it.is_ok()) {
self.tcs_collection_partial_errors.clear_errors(pubkey);
} else {
for it in v.iter() {
if let Err(e) = it {
info!("error on tcs find_interesting: {:?}", e);
self.tcs_collection_partial_errors.record_error(
pubkey,
now,
e.to_string(),
);
}
}
}
interesting_tcs.extend(v.iter().filter_map(|it| it.as_ref().ok()));
}
Err(e) => {
self.tcs_collection_hard_errors
.record_error(pubkey, now, e.to_string());
}
}
}
if interesting_tcs.is_empty() {
return Ok(false);
}
let (txsigs, mut changed_pubkeys) = tcs_context
.execute_tcs(&mut interesting_tcs, &mut self.tcs_execution_errors)
.await?;
changed_pubkeys.push(self.mango_client.mango_account_address);
// Force a refresh of affected accounts
let slot = self.account_fetcher.transaction_max_slot(&txsigs).await?;
if let Err(e) = self
.account_fetcher
.refresh_accounts_via_rpc_until_slot(
&changed_pubkeys,
slot,
self.liquidation_config.refresh_timeout,
)
.await
{
info!(slot, "could not refresh after tcs execution: {}", e);
}
Ok(true)
}
fn log_persistent_errors(&mut self) {
let now = Instant::now();
if now.duration_since(self.last_persistent_error_report)
< self.persistent_error_report_interval
{
return;
}
self.last_persistent_error_report = now;
let min_duration = self.persistent_error_min_duration;
self.liq_errors
.log_persistent_errors("liquidation", min_duration);
self.tcs_execution_errors
.log_persistent_errors("tcs execution", min_duration);
self.tcs_collection_hard_errors
.log_persistent_errors("tcs collection hard", min_duration);
self.tcs_collection_partial_errors
.log_persistent_errors("tcs collection partial", min_duration);
}
}
fn start_chain_data_metrics(chain: Arc<RwLock<chain_data::ChainData>>, metrics: &metrics::Metrics) {
let mut interval = tokio::time::interval(Duration::from_secs(600));
let mut metric_slots_count = metrics.register_u64("chain_data_slots_count".into());
let mut metric_accounts_count = metrics.register_u64("chain_data_accounts_count".into());
let mut metric_account_write_count =
metrics.register_u64("chain_data_account_write_count".into());
tokio::spawn(async move {
loop {
interval.tick().await;
let chain_lock = chain.read().unwrap();
metric_slots_count.set(chain_lock.slots_count() as u64);
metric_accounts_count.set(chain_lock.accounts_count() as u64);
metric_account_write_count.set(chain_lock.account_writes_count() as u64);
}
});
}