mango-v4/bin/liquidator/src/main.rs

781 lines
27 KiB
Rust
Raw Normal View History

use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use anchor_client::Cluster;
use clap::Parser;
use mango_v4::state::{PerpMarketIndex, TokenIndex};
use mango_v4_client::{
account_update_stream, chain_data, keypair_from_cli, snapshot_source, websocket_source,
AsyncChannelSendUnlessFull, Client, MangoClient, MangoClientError, MangoGroupContext,
TransactionBuilderConfig,
};
use itertools::Itertools;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use tracing::*;
pub mod liquidate;
pub mod metrics;
pub mod rebalance;
pub mod telemetry;
pub mod token_swap_info;
pub mod trigger_tcs;
pub mod util;
use crate::util::{is_mango_account, is_mango_bank, is_mint_info, is_perp_market};
// jemalloc seems to be better at keeping the memory footprint reasonable over
// longer periods of time
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
#[derive(Parser, Debug)]
#[clap()]
struct CliDotenv {
// When --dotenv <file> is passed, read the specified dotenv file before parsing args
#[clap(long)]
dotenv: std::path::PathBuf,
remaining_args: Vec<std::ffi::OsString>,
}
// Prefer "--rebalance false" over "--no-rebalance" because it works
// better with REBALANCE=false env values.
#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq)]
enum BoolArg {
True,
False,
}
#[derive(Parser)]
#[clap()]
struct Cli {
#[clap(short, long, env)]
rpc_url: String,
#[clap(long, env)]
2022-08-01 05:19:52 -07:00
liqor_mango_account: Pubkey,
#[clap(long, env)]
2022-08-01 05:19:52 -07:00
liqor_owner: String,
#[clap(long, env, default_value = "300")]
snapshot_interval_secs: u64,
2022-08-05 04:50:44 -07:00
/// how many getMultipleAccounts requests to send in parallel
#[clap(long, env, default_value = "10")]
parallel_rpc_requests: usize,
2022-08-05 04:50:44 -07:00
/// typically 100 is the max number of accounts getMultipleAccounts will retrieve at once
#[clap(long, env, default_value = "100")]
get_multiple_accounts_count: usize,
2022-08-05 04:50:44 -07:00
/// liquidator health ratio should not fall below this value
#[clap(long, env, default_value = "50")]
min_health_ratio: f64,
/// if rebalancing is enabled
///
/// typically only disabled for tests where swaps are unavailable
#[clap(long, env, value_enum, default_value = "true")]
rebalance: BoolArg,
/// max slippage to request on swaps to rebalance spot tokens
#[clap(long, env, default_value = "100")]
rebalance_slippage_bps: u64,
/// prioritize each transaction with this many microlamports/cu
#[clap(long, env, default_value = "0")]
prioritization_micro_lamports: u64,
/// use a jupiter mock instead of actual queries
///
/// This is required for devnet testing.
#[clap(long, env, value_enum, default_value = "false")]
mock_jupiter: BoolArg,
/// report liquidator's existence and pubkey
#[clap(long, env, value_enum, default_value = "true")]
telemetry: BoolArg,
}
pub fn encode_address(addr: &Pubkey) -> String {
bs58::encode(&addr.to_bytes()).into_string()
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
mango_v4_client::tracing_subscriber_init();
let args = if let Ok(cli_dotenv) = CliDotenv::try_parse() {
dotenv::from_path(cli_dotenv.dotenv)?;
cli_dotenv.remaining_args
} else {
dotenv::dotenv().ok();
std::env::args_os().collect()
};
let cli = Cli::parse_from(args);
2023-02-10 05:22:07 -08:00
let liqor_owner = Arc::new(keypair_from_cli(&cli.liqor_owner));
let rpc_url = cli.rpc_url;
let ws_url = rpc_url.replace("https", "wss");
let rpc_timeout = Duration::from_secs(10);
let cluster = Cluster::Custom(rpc_url.clone(), ws_url.clone());
let commitment = CommitmentConfig::processed();
let client = Client::new(
cluster.clone(),
commitment,
2023-02-10 05:22:07 -08:00
liqor_owner.clone(),
Some(rpc_timeout),
TransactionBuilderConfig {
prioritization_micro_lamports: (cli.prioritization_micro_lamports > 0)
.then_some(cli.prioritization_micro_lamports),
},
);
2022-08-01 05:19:52 -07:00
// The representation of current on-chain account data
let chain_data = Arc::new(RwLock::new(chain_data::ChainData::new()));
// Reading accounts from chain_data
let account_fetcher = Arc::new(chain_data::AccountFetcher {
chain_data: chain_data.clone(),
rpc: client.rpc_async(),
2022-08-01 05:19:52 -07:00
});
let mango_account = account_fetcher
.fetch_fresh_mango_account(&cli.liqor_mango_account)
.await?;
2022-08-01 05:19:52 -07:00
let mango_group = mango_account.fixed.group;
let group_context = MangoGroupContext::new_from_rpc(&client.rpc_async(), mango_group).await?;
2022-08-05 04:50:44 -07:00
let mango_oracles = group_context
.tokens
.values()
.map(|value| value.mint_info.oracle)
.chain(group_context.perp_markets.values().map(|p| p.market.oracle))
.unique()
.collect::<Vec<Pubkey>>();
let serum_programs = group_context
.serum3_markets
.values()
.map(|s3| s3.market.serum_program)
.unique()
.collect_vec();
// TODO: Currently the websocket source only supports a single serum program address!
assert_eq!(serum_programs.len(), 1);
//
// feed setup
//
// FUTURE: decouple feed setup and liquidator business logic
// feed should send updates to a channel which liquidator can consume
info!("startup");
let metrics = metrics::start();
let (account_update_sender, account_update_receiver) =
async_channel::unbounded::<account_update_stream::Message>();
// Sourcing account and slot data from solana via websockets
// FUTURE: websocket feed should take which accounts to listen to as an input
websocket_source::start(
websocket_source::Config {
rpc_ws_url: ws_url.clone(),
serum_program: *serum_programs.first().unwrap(),
open_orders_authority: mango_group,
},
2022-08-05 04:50:44 -07:00
mango_oracles.clone(),
account_update_sender.clone(),
);
let first_websocket_slot = websocket_source::get_next_create_bank_slot(
account_update_receiver.clone(),
Duration::from_secs(10),
)
.await?;
// Getting solana account snapshots via jsonrpc
// FUTURE: of what to fetch a snapshot - should probably take as an input
snapshot_source::start(
snapshot_source::Config {
rpc_http_url: rpc_url.clone(),
mango_group,
get_multiple_accounts_count: cli.get_multiple_accounts_count,
parallel_rpc_requests: cli.parallel_rpc_requests,
snapshot_interval: std::time::Duration::from_secs(cli.snapshot_interval_secs),
min_slot: first_websocket_slot + 10,
},
2022-08-05 04:50:44 -07:00
mango_oracles,
account_update_sender,
);
start_chain_data_metrics(chain_data.clone(), &metrics);
let shared_state = Arc::new(RwLock::new(SharedState::default()));
//
// mango client setup
//
let mango_client = {
Arc::new(MangoClient::new_detail(
2022-08-01 05:19:52 -07:00
client,
cli.liqor_mango_account,
liqor_owner,
group_context,
account_fetcher.clone(),
)?)
};
let token_swap_info_config = token_swap_info::Config {
quote_index: 0, // USDC
quote_amount: 1_000_000_000, // TODO: config, $1000, should be >= tcs_config.max_trigger_quote_amount
mock_jupiter: cli.mock_jupiter == BoolArg::True,
};
let token_swap_info_updater = Arc::new(token_swap_info::TokenSwapInfoUpdater::new(
mango_client.clone(),
token_swap_info_config,
));
2022-08-05 04:50:44 -07:00
let liq_config = liquidate::Config {
min_health_ratio: cli.min_health_ratio,
mock_jupiter: cli.mock_jupiter == BoolArg::True,
// TODO: config
refresh_timeout: Duration::from_secs(30),
};
let tcs_config = trigger_tcs::Config {
min_health_ratio: cli.min_health_ratio,
max_trigger_quote_amount: 1_000_000_000, // TODO: config, $1000
mock_jupiter: cli.mock_jupiter == BoolArg::True,
// TODO: config
refresh_timeout: Duration::from_secs(30),
2022-08-05 04:50:44 -07:00
};
let mut rebalance_interval = tokio::time::interval(Duration::from_secs(5));
let rebalance_config = rebalance::Config {
enabled: cli.rebalance == BoolArg::True,
slippage_bps: cli.rebalance_slippage_bps,
// TODO: config
borrow_settle_excess: 1.05,
refresh_timeout: Duration::from_secs(30),
};
let rebalancer = Arc::new(rebalance::Rebalancer {
mango_client: mango_client.clone(),
account_fetcher: account_fetcher.clone(),
mango_account_address: cli.liqor_mango_account,
config: rebalance_config,
});
let mut liquidation = Box::new(LiquidationState {
mango_client: mango_client.clone(),
account_fetcher,
liquidation_config: liq_config,
trigger_tcs_config: tcs_config,
rebalancer: rebalancer.clone(),
token_swap_info: token_swap_info_updater.clone(),
liq_errors: ErrorTracking {
skip_threshold: 5,
skip_duration: std::time::Duration::from_secs(120),
reset_duration: std::time::Duration::from_secs(360),
..ErrorTracking::default()
},
tcs_errors: ErrorTracking {
skip_threshold: 2,
skip_duration: std::time::Duration::from_secs(120),
reset_duration: std::time::Duration::from_secs(360),
..ErrorTracking::default()
},
});
let (liquidation_trigger_sender, liquidation_trigger_receiver) =
async_channel::bounded::<()>(1);
info!("main loop");
// Job to update chain_data and notify the liquidation job when a new check is needed.
let data_job = tokio::spawn({
use account_update_stream::Message;
let shared_state = shared_state.clone();
let mut metric_account_update_queue_len =
metrics.register_u64("account_update_queue_length".into());
let mut metric_mango_accounts = metrics.register_u64("mango_accounts".into());
let mut mint_infos = HashMap::<TokenIndex, Pubkey>::new();
let mut oracles = HashSet::<Pubkey>::new();
let mut perp_markets = HashMap::<PerpMarketIndex, Pubkey>::new();
async move {
loop {
let message = account_update_receiver
.recv()
.await
.expect("channel not closed");
metric_account_update_queue_len.set(account_update_receiver.len() as u64);
message.update_chain_data(&mut chain_data.write().unwrap());
match message {
Message::Account(account_write) => {
let mut state = shared_state.write().unwrap();
if is_mango_account(&account_write.account, &mango_group).is_some() {
// e.g. to render debug logs RUST_LOG="liquidator=debug"
debug!(
"change to mango account {}...",
&account_write.pubkey.to_string()[0..3]
);
// Track all MangoAccounts: we need to iterate over them later
state.mango_accounts.insert(account_write.pubkey);
metric_mango_accounts.set(state.mango_accounts.len() as u64);
if !state.health_check_all {
state.health_check_accounts.push(account_write.pubkey);
}
liquidation_trigger_sender.send_unless_full(()).unwrap();
} else {
let mut must_check_all = false;
if is_mango_bank(&account_write.account, &mango_group).is_some() {
debug!("change to bank {}", &account_write.pubkey);
must_check_all = true;
}
if is_perp_market(&account_write.account, &mango_group).is_some() {
debug!("change to perp market {}", &account_write.pubkey);
must_check_all = true;
}
if oracles.contains(&account_write.pubkey) {
debug!("change to oracle {}", &account_write.pubkey);
must_check_all = true;
}
if must_check_all {
state.health_check_all = true;
liquidation_trigger_sender.send_unless_full(()).unwrap();
}
}
}
Message::Snapshot(snapshot) => {
let mut state = shared_state.write().unwrap();
// Track all mango account pubkeys
for update in snapshot.iter() {
if is_mango_account(&update.account, &mango_group).is_some() {
state.mango_accounts.insert(update.pubkey);
}
if let Some(mint_info) = is_mint_info(&update.account, &mango_group) {
mint_infos.insert(mint_info.token_index, update.pubkey);
oracles.insert(mint_info.oracle);
}
if let Some(perp_market) = is_perp_market(&update.account, &mango_group)
{
perp_markets.insert(perp_market.perp_market_index, update.pubkey);
oracles.insert(perp_market.oracle);
}
}
metric_mango_accounts.set(state.mango_accounts.len() as u64);
state.one_snapshot_done = true;
state.health_check_all = true;
liquidation_trigger_sender.send_unless_full(()).unwrap();
}
_ => {}
}
}
}
});
// Could be refactored to only start the below jobs when the first snapshot is done.
// But need to take care to abort if the above job aborts beforehand.
let rebalance_job = tokio::spawn({
let shared_state = shared_state.clone();
async move {
loop {
rebalance_interval.tick().await;
if !shared_state.read().unwrap().one_snapshot_done {
continue;
}
if let Err(err) = rebalancer.zero_all_non_quote().await {
error!("failed to rebalance liqor: {:?}", err);
// Workaround: We really need a sequence enforcer in the liquidator since we don't want to
// accidentally send a similar tx again when we incorrectly believe an earlier one got forked
// off. For now, hard sleep on error to avoid the most frequent error cases.
std::thread::sleep(Duration::from_secs(10));
}
}
}
});
let liquidation_job = tokio::spawn({
let shared_state = shared_state.clone();
async move {
loop {
liquidation_trigger_receiver.recv().await.unwrap();
let account_addresses;
{
let mut state = shared_state.write().unwrap();
if !state.one_snapshot_done {
continue;
}
account_addresses = if state.health_check_all {
state.mango_accounts.iter().cloned().collect()
} else {
state.health_check_accounts.clone()
};
state.health_check_all = false;
state.health_check_accounts = vec![];
}
let liquidated = liquidation
.maybe_liquidate_one_and_rebalance(account_addresses.iter())
.await
.unwrap();
if !liquidated {
liquidation
.maybe_take_token_conditional_swap(account_addresses.iter())
.await
.unwrap();
}
}
}
});
let token_swap_info_job = tokio::spawn({
// TODO: configurable interval
let mut interval = tokio::time::interval(Duration::from_secs(60));
let mut min_delay = tokio::time::interval(Duration::from_secs(1));
let shared_state = shared_state.clone();
async move {
loop {
if !shared_state.read().unwrap().one_snapshot_done {
continue;
}
let token_indexes = token_swap_info_updater
.mango_client()
.context
.token_indexes_by_name
.values()
.copied()
.collect_vec();
for token_index in token_indexes {
match token_swap_info_updater.update_one(token_index).await {
Ok(()) => {}
Err(err) => {
warn!(
"failed to update token swap info for token {token_index}: {:?}",
err
);
}
}
min_delay.tick().await;
}
token_swap_info_updater.log_all();
interval.tick().await;
}
}
});
if cli.telemetry == BoolArg::True {
tokio::spawn(telemetry::report_regularly(
mango_client,
cli.min_health_ratio,
));
}
use futures::StreamExt;
let mut jobs: futures::stream::FuturesUnordered<_> = vec![
data_job,
rebalance_job,
liquidation_job,
token_swap_info_job,
]
.into_iter()
.collect();
jobs.next().await;
error!("a critical job aborted, exiting");
Ok(())
}
#[derive(Default)]
struct SharedState {
/// Addresses of the MangoAccounts belonging to the mango program.
/// Needed to check health of them all when the cache updates.
mango_accounts: HashSet<Pubkey>,
/// Is the first snapshot done? Only start checking account health when it is.
one_snapshot_done: bool,
/// Accounts whose health might have changed
health_check_accounts: Vec<Pubkey>,
/// Check all accounts?
health_check_all: bool,
}
#[derive(Clone)]
struct AccountErrorState {
count: u64,
last_at: std::time::Instant,
}
#[derive(Default)]
struct ErrorTracking {
accounts: HashMap<Pubkey, AccountErrorState>,
skip_threshold: u64,
skip_duration: std::time::Duration,
reset_duration: std::time::Duration,
}
impl ErrorTracking {
pub fn had_too_many_errors(&self, pubkey: &Pubkey, now: Instant) -> Option<AccountErrorState> {
if let Some(error_entry) = self.accounts.get(pubkey) {
if error_entry.count >= self.skip_threshold
&& now.duration_since(error_entry.last_at) < self.skip_duration
{
Some(error_entry.clone())
} else {
None
}
} else {
None
}
}
pub fn record_error(&mut self, pubkey: &Pubkey, now: Instant) {
let error_entry = self.accounts.entry(*pubkey).or_insert(AccountErrorState {
count: 0,
last_at: now,
});
if now.duration_since(error_entry.last_at) > self.reset_duration {
error_entry.count = 0;
}
error_entry.count += 1;
error_entry.last_at = now;
}
pub fn clear_errors(&mut self, pubkey: &Pubkey) {
self.accounts.remove(pubkey);
}
}
struct LiquidationState {
mango_client: Arc<MangoClient>,
account_fetcher: Arc<chain_data::AccountFetcher>,
rebalancer: Arc<rebalance::Rebalancer>,
token_swap_info: Arc<token_swap_info::TokenSwapInfoUpdater>,
liquidation_config: liquidate::Config,
trigger_tcs_config: trigger_tcs::Config,
liq_errors: ErrorTracking,
tcs_errors: ErrorTracking,
}
impl LiquidationState {
async fn maybe_liquidate_one_and_rebalance<'b>(
&mut self,
accounts_iter: impl Iterator<Item = &'b Pubkey>,
) -> anyhow::Result<bool> {
use rand::seq::SliceRandom;
let mut accounts = accounts_iter.collect::<Vec<&Pubkey>>();
{
let mut rng = rand::thread_rng();
accounts.shuffle(&mut rng);
}
let mut liquidated_one = false;
for pubkey in accounts {
if self
.maybe_liquidate_and_log_error(pubkey)
.await
.unwrap_or(false)
{
liquidated_one = true;
break;
}
}
if !liquidated_one {
return Ok(false);
}
if let Err(err) = self.rebalancer.zero_all_non_quote().await {
error!("failed to rebalance liqor: {:?}", err);
}
Ok(true)
}
async fn maybe_liquidate_and_log_error(&mut self, pubkey: &Pubkey) -> anyhow::Result<bool> {
let now = std::time::Instant::now();
let error_tracking = &mut self.liq_errors;
// Skip a pubkey if there've been too many errors recently
if let Some(error_entry) = error_tracking.had_too_many_errors(pubkey, now) {
trace!(
"skip checking account {pubkey}, had {} errors recently",
error_entry.count
);
return Ok(false);
}
let result = liquidate::maybe_liquidate_account(
&self.mango_client,
&self.account_fetcher,
pubkey,
&self.liquidation_config,
)
.await;
if let Err(err) = result.as_ref() {
// Keep track of pubkeys that had errors
error_tracking.record_error(pubkey, now);
// Not all errors need to be raised to the user's attention.
let mut is_error = true;
// Simulation errors due to liqee precondition failures on the liquidation instructions
// will commonly happen if our liquidator is late or if there are chain forks.
match err.downcast_ref::<MangoClientError>() {
Some(MangoClientError::SendTransactionPreflightFailure { logs, .. }) => {
if logs.iter().any(|line| {
line.contains("HealthMustBeNegative") || line.contains("IsNotBankrupt")
}) {
is_error = false;
}
}
_ => {}
};
if is_error {
error!("liquidating account {}: {:?}", pubkey, err);
} else {
trace!("liquidating account {}: {:?}", pubkey, err);
}
} else {
error_tracking.clear_errors(pubkey);
}
result
}
async fn maybe_take_token_conditional_swap<'b>(
&mut self,
accounts_iter: impl Iterator<Item = &'b Pubkey>,
) -> anyhow::Result<()> {
use rand::seq::SliceRandom;
let mut accounts = accounts_iter.collect::<Vec<&Pubkey>>();
{
let mut rng = rand::thread_rng();
accounts.shuffle(&mut rng);
}
let mut took_one = false;
for pubkey in accounts {
if self
.maybe_take_conditional_swap_and_log_error(pubkey)
.await
.unwrap_or(false)
{
took_one = true;
break;
}
}
if !took_one {
return Ok(());
}
if let Err(err) = self.rebalancer.zero_all_non_quote().await {
error!("failed to rebalance liqor: {:?}", err);
}
Ok(())
}
async fn maybe_take_conditional_swap_and_log_error(
&mut self,
pubkey: &Pubkey,
) -> anyhow::Result<bool> {
let now = std::time::Instant::now();
let error_tracking = &mut self.tcs_errors;
// Skip a pubkey if there've been too many errors recently
if let Some(error_entry) = error_tracking.had_too_many_errors(pubkey, now) {
trace!(
"skip checking for tcs on account {pubkey}, had {} errors recently",
error_entry.count
);
return Ok(false);
}
let result = trigger_tcs::maybe_execute_token_conditional_swap(
&self.mango_client,
&self.account_fetcher,
&self.token_swap_info,
pubkey,
&self.trigger_tcs_config,
)
.await;
if let Err(err) = result.as_ref() {
// Keep track of pubkeys that had errors
error_tracking.record_error(pubkey, now);
// Not all errors need to be raised to the user's attention.
let mut is_error = true;
// Simulation errors due to liqee precondition failures
// will commonly happen if our liquidator is late or if there are chain forks.
match err.downcast_ref::<MangoClientError>() {
Some(MangoClientError::SendTransactionPreflightFailure { logs, .. }) => {
if logs
.iter()
.any(|line| line.contains("TokenConditionalSwapPriceNotInRange"))
{
is_error = false;
}
}
_ => {}
};
if is_error {
error!("token conditional swap on account {}: {:?}", pubkey, err);
} else {
trace!("token conditional swap on account {}: {:?}", pubkey, err);
}
} else {
error_tracking.clear_errors(pubkey);
}
result
}
}
fn start_chain_data_metrics(chain: Arc<RwLock<chain_data::ChainData>>, metrics: &metrics::Metrics) {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(600));
let mut metric_slots_count = metrics.register_u64("chain_data_slots_count".into());
let mut metric_accounts_count = metrics.register_u64("chain_data_accounts_count".into());
let mut metric_account_write_count =
metrics.register_u64("chain_data_account_write_count".into());
tokio::spawn(async move {
loop {
interval.tick().await;
let chain_lock = chain.read().unwrap();
metric_slots_count.set(chain_lock.slots_count() as u64);
metric_accounts_count.set(chain_lock.accounts_count() as u64);
metric_account_write_count.set(chain_lock.account_writes_count() as u64);
}
});
}