Serge/liquidator split tcs and liquidation (#914)
liquidator: split TCS triggering and liquidation job Concurrent execution of candidate lookup and tx building/sending - Also added an health assertion IX to protect liqor in multi liquidation scenario - And a timeout for jupiter v6 queries (avoid blocking liquidation because of slow TCS)
This commit is contained in:
parent
69d866008c
commit
e4002acf8f
|
@ -250,4 +250,8 @@ pub struct Cli {
|
|||
/// override the sanctum http request timeout
|
||||
#[clap(long, env, default_value = "30")]
|
||||
pub(crate) sanctum_timeout_secs: u64,
|
||||
|
||||
/// max number of liquidation/tcs to do concurrently
|
||||
#[clap(long, env, default_value = "5")]
|
||||
pub(crate) max_parallel_operations: u64,
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ use mango_v4_client::{chain_data, MangoClient, PreparedInstructions};
|
|||
use solana_sdk::signature::Signature;
|
||||
|
||||
use futures::{stream, StreamExt, TryStreamExt};
|
||||
use mango_v4::accounts_ix::HealthCheckKind::MaintRatio;
|
||||
use rand::seq::SliceRandom;
|
||||
use tracing::*;
|
||||
use {anyhow::Context, fixed::types::I80F48, solana_sdk::pubkey::Pubkey};
|
||||
|
@ -260,7 +261,22 @@ impl<'a> LiquidateHelper<'a> {
|
|||
)
|
||||
.await
|
||||
.context("creating perp_liq_base_or_positive_pnl_instruction")?;
|
||||
|
||||
liq_ixs.cu = liq_ixs.cu.max(self.config.compute_limit_for_liq_ix);
|
||||
|
||||
let liqor = &self.client.mango_account().await?;
|
||||
liq_ixs.append(
|
||||
self.client
|
||||
.health_check_instruction(
|
||||
liqor,
|
||||
self.config.min_health_ratio,
|
||||
vec![],
|
||||
vec![*perp_market_index],
|
||||
MaintRatio,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
let txsig = self
|
||||
.client
|
||||
.send_and_confirm_authority_tx(liq_ixs.to_instructions())
|
||||
|
@ -501,6 +517,20 @@ impl<'a> LiquidateHelper<'a> {
|
|||
.await
|
||||
.context("creating liq_token_with_token ix")?;
|
||||
liq_ixs.cu = liq_ixs.cu.max(self.config.compute_limit_for_liq_ix);
|
||||
|
||||
let liqor = self.client.mango_account().await?;
|
||||
liq_ixs.append(
|
||||
self.client
|
||||
.health_check_instruction(
|
||||
&liqor,
|
||||
self.config.min_health_ratio,
|
||||
vec![asset_token_index, liab_token_index],
|
||||
vec![],
|
||||
MaintRatio,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
let txsig = self
|
||||
.client
|
||||
.send_and_confirm_authority_tx(liq_ixs.to_instructions())
|
||||
|
@ -651,14 +681,11 @@ impl<'a> LiquidateHelper<'a> {
|
|||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn maybe_liquidate_account(
|
||||
pub async fn can_liquidate_account(
|
||||
mango_client: &MangoClient,
|
||||
account_fetcher: &chain_data::AccountFetcher,
|
||||
pubkey: &Pubkey,
|
||||
config: &Config,
|
||||
) -> anyhow::Result<bool> {
|
||||
let liqor_min_health_ratio = I80F48::from_num(config.min_health_ratio);
|
||||
|
||||
let account = account_fetcher.fetch_mango_account(pubkey)?;
|
||||
let health_cache = mango_client
|
||||
.health_cache(&account)
|
||||
|
@ -675,6 +702,18 @@ pub async fn maybe_liquidate_account(
|
|||
"possible candidate",
|
||||
);
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn maybe_liquidate_account(
|
||||
mango_client: &MangoClient,
|
||||
account_fetcher: &chain_data::AccountFetcher,
|
||||
pubkey: &Pubkey,
|
||||
config: &Config,
|
||||
) -> anyhow::Result<bool> {
|
||||
let liqor_min_health_ratio = I80F48::from_num(config.min_health_ratio);
|
||||
|
||||
// Fetch a fresh account and re-compute
|
||||
// This is -- unfortunately -- needed because the websocket streams seem to not
|
||||
// be great at providing timely updates to the account data.
|
||||
|
|
|
@ -0,0 +1,238 @@
|
|||
use crate::cli_args::Cli;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::unwrappable_oracle_error::UnwrappableOracleError;
|
||||
use crate::{liquidate, LiqErrorType, SharedState};
|
||||
use anchor_lang::prelude::Pubkey;
|
||||
use itertools::Itertools;
|
||||
use mango_v4::state::TokenIndex;
|
||||
use mango_v4_client::error_tracking::ErrorTracking;
|
||||
use mango_v4_client::{chain_data, MangoClient, MangoClientError};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{error, trace, warn};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LiquidationState {
|
||||
pub mango_client: Arc<MangoClient>,
|
||||
pub account_fetcher: Arc<chain_data::AccountFetcher>,
|
||||
pub liquidation_config: liquidate::Config,
|
||||
|
||||
pub errors: Arc<RwLock<ErrorTracking<Pubkey, LiqErrorType>>>,
|
||||
pub oracle_errors: Arc<RwLock<ErrorTracking<TokenIndex, LiqErrorType>>>,
|
||||
}
|
||||
|
||||
impl LiquidationState {
|
||||
async fn find_candidates(
|
||||
&mut self,
|
||||
accounts_iter: impl Iterator<Item = &Pubkey>,
|
||||
action: impl Fn(Pubkey) -> anyhow::Result<()>,
|
||||
) -> anyhow::Result<u64> {
|
||||
let mut found_counter = 0u64;
|
||||
use rand::seq::SliceRandom;
|
||||
|
||||
let mut accounts = accounts_iter.collect::<Vec<&Pubkey>>();
|
||||
{
|
||||
let mut rng = rand::thread_rng();
|
||||
accounts.shuffle(&mut rng);
|
||||
}
|
||||
|
||||
for pubkey in accounts {
|
||||
if self.should_skip_execution(pubkey) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let result =
|
||||
liquidate::can_liquidate_account(&self.mango_client, &self.account_fetcher, pubkey)
|
||||
.await;
|
||||
|
||||
self.log_or_ignore_error(&result, pubkey);
|
||||
|
||||
if result.unwrap_or(false) {
|
||||
action(*pubkey)?;
|
||||
found_counter = found_counter + 1;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(found_counter)
|
||||
}
|
||||
|
||||
fn should_skip_execution(&mut self, pubkey: &Pubkey) -> bool {
|
||||
let now = Instant::now();
|
||||
let error_tracking = &mut self.errors;
|
||||
|
||||
// Skip a pubkey if there've been too many errors recently
|
||||
if let Some(error_entry) =
|
||||
error_tracking
|
||||
.read()
|
||||
.unwrap()
|
||||
.had_too_many_errors(LiqErrorType::Liq, pubkey, now)
|
||||
{
|
||||
trace!(
|
||||
%pubkey,
|
||||
error_entry.count,
|
||||
"skip checking account for liquidation, had errors recently",
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
fn log_or_ignore_error<T>(&mut self, result: &anyhow::Result<T>, pubkey: &Pubkey) {
|
||||
let error_tracking = &mut self.errors;
|
||||
|
||||
if let Err(err) = result.as_ref() {
|
||||
if let Some((ti, ti_name)) = err.try_unwrap_oracle_error() {
|
||||
if self
|
||||
.oracle_errors
|
||||
.read()
|
||||
.unwrap()
|
||||
.had_too_many_errors(LiqErrorType::Liq, &ti, Instant::now())
|
||||
.is_none()
|
||||
{
|
||||
warn!(
|
||||
"{:?} recording oracle error for token {} {}",
|
||||
chrono::offset::Utc::now(),
|
||||
ti_name,
|
||||
ti
|
||||
);
|
||||
}
|
||||
|
||||
self.oracle_errors
|
||||
.write()
|
||||
.unwrap()
|
||||
.record(LiqErrorType::Liq, &ti, err.to_string());
|
||||
return;
|
||||
}
|
||||
|
||||
// Keep track of pubkeys that had errors
|
||||
error_tracking
|
||||
.write()
|
||||
.unwrap()
|
||||
.record(LiqErrorType::Liq, pubkey, err.to_string());
|
||||
|
||||
// Not all errors need to be raised to the user's attention.
|
||||
let mut is_error = true;
|
||||
|
||||
// Simulation errors due to liqee precondition failures on the liquidation instructions
|
||||
// will commonly happen if our liquidator is late or if there are chain forks.
|
||||
match err.downcast_ref::<MangoClientError>() {
|
||||
Some(MangoClientError::SendTransactionPreflightFailure { logs, .. }) => {
|
||||
if logs.iter().any(|line| {
|
||||
line.contains("HealthMustBeNegative") || line.contains("IsNotBankrupt")
|
||||
}) {
|
||||
is_error = false;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
};
|
||||
if is_error {
|
||||
error!("liquidating account {}: {:?}", pubkey, err);
|
||||
} else {
|
||||
trace!("liquidating account {}: {:?}", pubkey, err);
|
||||
}
|
||||
} else {
|
||||
error_tracking
|
||||
.write()
|
||||
.unwrap()
|
||||
.clear(LiqErrorType::Liq, pubkey);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn maybe_liquidate_and_log_error(&mut self, pubkey: &Pubkey) -> anyhow::Result<bool> {
|
||||
if self.should_skip_execution(pubkey) {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let result = liquidate::maybe_liquidate_account(
|
||||
&self.mango_client,
|
||||
&self.account_fetcher,
|
||||
pubkey,
|
||||
&self.liquidation_config,
|
||||
)
|
||||
.await;
|
||||
|
||||
self.log_or_ignore_error(&result, pubkey);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_liquidation_job(
|
||||
cli: &Cli,
|
||||
shared_state: &Arc<RwLock<SharedState>>,
|
||||
tx_trigger_sender: async_channel::Sender<()>,
|
||||
mut liquidation: Box<LiquidationState>,
|
||||
metrics: &Metrics,
|
||||
) -> JoinHandle<()> {
|
||||
tokio::spawn({
|
||||
let mut interval =
|
||||
mango_v4_client::delay_interval(Duration::from_millis(cli.check_interval_ms));
|
||||
let mut metric_liquidation_check = metrics.register_latency("liquidation_check".into());
|
||||
let mut metric_liquidation_start_end =
|
||||
metrics.register_latency("liquidation_start_end".into());
|
||||
|
||||
let mut liquidation_start_time = None;
|
||||
|
||||
let shared_state = shared_state.clone();
|
||||
async move {
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
let account_addresses = {
|
||||
let mut state = shared_state.write().unwrap();
|
||||
if !state.one_snapshot_done {
|
||||
// discard first latency info as it will skew data too much
|
||||
state.oldest_chain_event_reception_time = None;
|
||||
continue;
|
||||
}
|
||||
if state.oldest_chain_event_reception_time.is_none()
|
||||
&& liquidation_start_time.is_none()
|
||||
{
|
||||
// no new update, skip computing
|
||||
continue;
|
||||
}
|
||||
|
||||
state.mango_accounts.iter().cloned().collect_vec()
|
||||
};
|
||||
|
||||
liquidation.errors.write().unwrap().update();
|
||||
liquidation.oracle_errors.write().unwrap().update();
|
||||
|
||||
if liquidation_start_time.is_none() {
|
||||
liquidation_start_time = Some(Instant::now());
|
||||
}
|
||||
|
||||
let found_candidates = liquidation
|
||||
.find_candidates(account_addresses.iter(), |p| {
|
||||
if shared_state
|
||||
.write()
|
||||
.unwrap()
|
||||
.liquidation_candidates_accounts
|
||||
.insert(p)
|
||||
{
|
||||
tx_trigger_sender.try_send(())?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
if found_candidates > 0 {
|
||||
tracing::debug!("found {} candidates for liquidation", found_candidates);
|
||||
}
|
||||
|
||||
let mut state = shared_state.write().unwrap();
|
||||
let reception_time = state.oldest_chain_event_reception_time.unwrap();
|
||||
let current_time = Instant::now();
|
||||
|
||||
state.oldest_chain_event_reception_time = None;
|
||||
|
||||
metric_liquidation_check.push(current_time - reception_time);
|
||||
metric_liquidation_start_end.push(current_time - liquidation_start_time.unwrap());
|
||||
liquidation_start_time = None;
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
|
@ -4,33 +4,41 @@ use std::sync::{Arc, RwLock};
|
|||
use std::time::{Duration, Instant};
|
||||
|
||||
use anchor_client::Cluster;
|
||||
use anyhow::Context;
|
||||
use clap::Parser;
|
||||
use futures_util::StreamExt;
|
||||
use mango_v4::state::{PerpMarketIndex, TokenIndex};
|
||||
use mango_v4_client::AsyncChannelSendUnlessFull;
|
||||
use mango_v4_client::{
|
||||
account_update_stream, chain_data, error_tracking::ErrorTracking, keypair_from_cli,
|
||||
snapshot_source, websocket_source, Client, MangoClient, MangoClientError, MangoGroupContext,
|
||||
snapshot_source, websocket_source, Client, MangoClient, MangoGroupContext,
|
||||
TransactionBuilderConfig,
|
||||
};
|
||||
|
||||
use crate::cli_args::{BoolArg, Cli, CliDotenv};
|
||||
use crate::liquidation_state::LiquidationState;
|
||||
use crate::rebalance::Rebalancer;
|
||||
use crate::tcs_state::TcsState;
|
||||
use crate::token_swap_info::TokenSwapInfoUpdater;
|
||||
use itertools::Itertools;
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use solana_sdk::program_stubs::{set_syscall_stubs, SyscallStubs};
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signer::Signer;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::*;
|
||||
|
||||
pub mod cli_args;
|
||||
pub mod liquidate;
|
||||
mod liquidation_state;
|
||||
pub mod metrics;
|
||||
pub mod rebalance;
|
||||
mod tcs_state;
|
||||
pub mod telemetry;
|
||||
pub mod token_swap_info;
|
||||
pub mod trigger_tcs;
|
||||
mod tx_sender;
|
||||
mod unwrappable_oracle_error;
|
||||
pub mod util;
|
||||
|
||||
use crate::unwrappable_oracle_error::UnwrappableOracleError;
|
||||
use crate::util::{is_mango_account, is_mint_info, is_perp_market};
|
||||
|
||||
// jemalloc seems to be better at keeping the memory footprint reasonable over
|
||||
|
@ -87,7 +95,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
// Client setup
|
||||
//
|
||||
let liqor_owner = Arc::new(keypair_from_cli(&cli.liqor_owner));
|
||||
let rpc_url = cli.rpc_url;
|
||||
let rpc_url = cli.rpc_url.clone();
|
||||
let ws_url = rpc_url.replace("https", "wss");
|
||||
let rpc_timeout = Duration::from_secs(10);
|
||||
let cluster = Cluster::Custom(rpc_url.clone(), ws_url.clone());
|
||||
|
@ -110,7 +118,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
.override_send_transaction_urls(cli.override_send_transaction_url)
|
||||
.override_send_transaction_urls(cli.override_send_transaction_url.clone())
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
|
@ -127,6 +135,11 @@ async fn main() -> anyhow::Result<()> {
|
|||
.await?;
|
||||
let mango_group = mango_account.fixed.group;
|
||||
|
||||
let signer_is_owner = mango_account.fixed.owner == liqor_owner.pubkey();
|
||||
if cli.rebalance == BoolArg::True && !signer_is_owner {
|
||||
warn!("rebalancing on delegated accounts will be unable to free token positions reliably, withdraw dust manually");
|
||||
}
|
||||
|
||||
let group_context = MangoGroupContext::new_from_rpc(client.rpc_async(), mango_group).await?;
|
||||
|
||||
let mango_oracles = group_context
|
||||
|
@ -230,17 +243,18 @@ async fn main() -> anyhow::Result<()> {
|
|||
compute_limit_for_liq_ix: cli.compute_limit_for_liquidation,
|
||||
max_cu_per_transaction: 1_000_000,
|
||||
refresh_timeout: Duration::from_secs(cli.liquidation_refresh_timeout_secs as u64),
|
||||
only_allowed_tokens: cli_args::cli_to_hashset::<TokenIndex>(cli.only_allow_tokens),
|
||||
forbidden_tokens: cli_args::cli_to_hashset::<TokenIndex>(cli.forbidden_tokens),
|
||||
only_allowed_tokens: cli_args::cli_to_hashset::<TokenIndex>(cli.only_allow_tokens.clone()),
|
||||
forbidden_tokens: cli_args::cli_to_hashset::<TokenIndex>(cli.forbidden_tokens.clone()),
|
||||
only_allowed_perp_markets: cli_args::cli_to_hashset::<PerpMarketIndex>(
|
||||
cli.liquidation_only_allow_perp_markets,
|
||||
cli.liquidation_only_allow_perp_markets.clone(),
|
||||
),
|
||||
forbidden_perp_markets: cli_args::cli_to_hashset::<PerpMarketIndex>(
|
||||
cli.liquidation_forbidden_perp_markets,
|
||||
cli.liquidation_forbidden_perp_markets.clone(),
|
||||
),
|
||||
};
|
||||
|
||||
let tcs_config = trigger_tcs::Config {
|
||||
refresh_timeout: Duration::from_secs(cli.tcs_refresh_timeout_secs),
|
||||
min_health_ratio: cli.min_health_ratio,
|
||||
max_trigger_quote_amount: (cli.tcs_max_trigger_amount * 1e6) as u64,
|
||||
compute_limit_for_trigger: cli.compute_limit_for_tcs,
|
||||
|
@ -257,17 +271,19 @@ async fn main() -> anyhow::Result<()> {
|
|||
forbidden_tokens: liq_config.forbidden_tokens.clone(),
|
||||
};
|
||||
|
||||
let mut rebalance_interval = tokio::time::interval(Duration::from_secs(30));
|
||||
let (rebalance_trigger_sender, rebalance_trigger_receiver) = async_channel::bounded::<()>(1);
|
||||
let (tx_tcs_trigger_sender, tx_tcs_trigger_receiver) = async_channel::unbounded::<()>();
|
||||
let (tx_liq_trigger_sender, tx_liq_trigger_receiver) = async_channel::unbounded::<()>();
|
||||
let rebalance_config = rebalance::Config {
|
||||
enabled: cli.rebalance == BoolArg::True,
|
||||
slippage_bps: cli.rebalance_slippage_bps,
|
||||
borrow_settle_excess: (1f64 + cli.rebalance_borrow_settle_excess).max(1f64),
|
||||
refresh_timeout: Duration::from_secs(cli.rebalance_refresh_timeout_secs),
|
||||
jupiter_version: cli.jupiter_version.into(),
|
||||
skip_tokens: cli.rebalance_skip_tokens.unwrap_or_default(),
|
||||
skip_tokens: cli.rebalance_skip_tokens.clone().unwrap_or(Vec::new()),
|
||||
alternate_jupiter_route_tokens: cli
|
||||
.rebalance_alternate_jupiter_route_tokens
|
||||
.clone()
|
||||
.unwrap_or_default(),
|
||||
alternate_sanctum_route_tokens: cli
|
||||
.rebalance_alternate_sanctum_route_tokens
|
||||
|
@ -286,23 +302,39 @@ async fn main() -> anyhow::Result<()> {
|
|||
sanctum_supported_mints: HashSet::<Pubkey>::new(),
|
||||
});
|
||||
|
||||
let mut liquidation = Box::new(LiquidationState {
|
||||
let liquidation = Box::new(LiquidationState {
|
||||
mango_client: mango_client.clone(),
|
||||
account_fetcher: account_fetcher.clone(),
|
||||
liquidation_config: liq_config,
|
||||
errors: Arc::new(RwLock::new(
|
||||
ErrorTracking::builder()
|
||||
.skip_threshold(2)
|
||||
.skip_threshold_for_type(LiqErrorType::Liq, 5)
|
||||
.skip_duration(Duration::from_secs(120))
|
||||
.build()?,
|
||||
)),
|
||||
oracle_errors: Arc::new(RwLock::new(
|
||||
ErrorTracking::builder()
|
||||
.skip_threshold(1)
|
||||
.skip_duration(Duration::from_secs(
|
||||
cli.skip_oracle_error_in_logs_duration_secs,
|
||||
))
|
||||
.build()?,
|
||||
)),
|
||||
});
|
||||
|
||||
let tcs = Box::new(TcsState {
|
||||
mango_client: mango_client.clone(),
|
||||
account_fetcher,
|
||||
liquidation_config: liq_config,
|
||||
trigger_tcs_config: tcs_config,
|
||||
token_swap_info: token_swap_info_updater.clone(),
|
||||
errors: ErrorTracking::builder()
|
||||
.skip_threshold(2)
|
||||
.skip_threshold_for_type(LiqErrorType::Liq, 5)
|
||||
.skip_duration(Duration::from_secs(120))
|
||||
.build()?,
|
||||
oracle_errors: ErrorTracking::builder()
|
||||
.skip_threshold(1)
|
||||
.skip_duration(Duration::from_secs(
|
||||
cli.skip_oracle_error_in_logs_duration_secs,
|
||||
))
|
||||
.build()?,
|
||||
errors: Arc::new(RwLock::new(
|
||||
ErrorTracking::builder()
|
||||
.skip_threshold(2)
|
||||
.skip_threshold_for_type(LiqErrorType::Liq, 5)
|
||||
.skip_duration(Duration::from_secs(120))
|
||||
.build()?,
|
||||
)),
|
||||
});
|
||||
|
||||
info!("main loop");
|
||||
|
@ -403,126 +435,87 @@ async fn main() -> anyhow::Result<()> {
|
|||
}
|
||||
});
|
||||
|
||||
let mut optional_jobs = vec![];
|
||||
|
||||
// Could be refactored to only start the below jobs when the first snapshot is done.
|
||||
// But need to take care to abort if the above job aborts beforehand.
|
||||
if cli.rebalance == BoolArg::True {
|
||||
let rebalance_job =
|
||||
spawn_rebalance_job(&shared_state, rebalance_trigger_receiver, rebalancer);
|
||||
optional_jobs.push(rebalance_job);
|
||||
}
|
||||
|
||||
let rebalance_job = tokio::spawn({
|
||||
let shared_state = shared_state.clone();
|
||||
async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = rebalance_interval.tick() => {}
|
||||
_ = rebalance_trigger_receiver.recv() => {}
|
||||
}
|
||||
if !shared_state.read().unwrap().one_snapshot_done {
|
||||
continue;
|
||||
}
|
||||
if let Err(err) = rebalancer.zero_all_non_quote().await {
|
||||
error!("failed to rebalance liqor: {:?}", err);
|
||||
if cli.liquidation_enabled == BoolArg::True {
|
||||
let liquidation_job = liquidation_state::spawn_liquidation_job(
|
||||
&cli,
|
||||
&shared_state,
|
||||
tx_liq_trigger_sender.clone(),
|
||||
liquidation.clone(),
|
||||
&metrics,
|
||||
);
|
||||
optional_jobs.push(liquidation_job);
|
||||
}
|
||||
|
||||
// Workaround: We really need a sequence enforcer in the liquidator since we don't want to
|
||||
// accidentally send a similar tx again when we incorrectly believe an earlier one got forked
|
||||
// off. For now, hard sleep on error to avoid the most frequent error cases.
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
if cli.take_tcs == BoolArg::True {
|
||||
let tcs_job = tcs_state::spawn_tcs_job(
|
||||
&cli,
|
||||
&shared_state,
|
||||
tx_tcs_trigger_sender.clone(),
|
||||
tcs.clone(),
|
||||
&metrics,
|
||||
);
|
||||
optional_jobs.push(tcs_job);
|
||||
}
|
||||
|
||||
let liquidation_job = tokio::spawn({
|
||||
let mut interval =
|
||||
mango_v4_client::delay_interval(Duration::from_millis(cli.check_interval_ms));
|
||||
let mut metric_liquidation_check = metrics.register_latency("liquidation_check".into());
|
||||
let mut metric_liquidation_start_end =
|
||||
metrics.register_latency("liquidation_start_end".into());
|
||||
if cli.liquidation_enabled == BoolArg::True || cli.take_tcs == BoolArg::True {
|
||||
let mut tx_sender_jobs = tx_sender::spawn_tx_senders_job(
|
||||
cli.max_parallel_operations,
|
||||
cli.liquidation_enabled == BoolArg::True,
|
||||
tx_liq_trigger_receiver,
|
||||
tx_tcs_trigger_receiver,
|
||||
tx_tcs_trigger_sender,
|
||||
rebalance_trigger_sender,
|
||||
shared_state.clone(),
|
||||
liquidation,
|
||||
tcs,
|
||||
);
|
||||
optional_jobs.append(&mut tx_sender_jobs);
|
||||
}
|
||||
|
||||
let mut liquidation_start_time = None;
|
||||
let mut tcs_start_time = None;
|
||||
if cli.telemetry == BoolArg::True {
|
||||
optional_jobs.push(spawn_telemetry_job(&cli, mango_client.clone()));
|
||||
}
|
||||
|
||||
let shared_state = shared_state.clone();
|
||||
async move {
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let token_swap_info_job =
|
||||
spawn_token_swap_refresh_job(&cli, shared_state, token_swap_info_updater);
|
||||
let check_changes_for_abort_job = spawn_context_change_watchdog_job(mango_client.clone());
|
||||
|
||||
let account_addresses = {
|
||||
let mut state = shared_state.write().unwrap();
|
||||
if !state.one_snapshot_done {
|
||||
// discard first latency info as it will skew data too much
|
||||
state.oldest_chain_event_reception_time = None;
|
||||
continue;
|
||||
}
|
||||
if state.oldest_chain_event_reception_time.is_none()
|
||||
&& liquidation_start_time.is_none()
|
||||
{
|
||||
// no new update, skip computing
|
||||
continue;
|
||||
}
|
||||
let mut jobs: futures::stream::FuturesUnordered<_> = vec![
|
||||
data_job,
|
||||
token_swap_info_job,
|
||||
check_changes_for_abort_job,
|
||||
snapshot_job,
|
||||
]
|
||||
.into_iter()
|
||||
.chain(optional_jobs)
|
||||
.chain(prio_jobs.into_iter())
|
||||
.collect();
|
||||
jobs.next().await;
|
||||
|
||||
state.mango_accounts.iter().cloned().collect_vec()
|
||||
};
|
||||
error!("a critical job aborted, exiting");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
liquidation.errors.update();
|
||||
liquidation.oracle_errors.update();
|
||||
|
||||
if liquidation_start_time.is_none() {
|
||||
liquidation_start_time = Some(Instant::now());
|
||||
}
|
||||
|
||||
let liquidated = liquidation
|
||||
.maybe_liquidate_one(account_addresses.iter())
|
||||
.await;
|
||||
|
||||
if !liquidated {
|
||||
// This will be incorrect if we liquidate the last checked account
|
||||
// (We will wait for next full run, skewing latency metrics)
|
||||
// Probability is very low, might not need to be fixed
|
||||
|
||||
let mut state = shared_state.write().unwrap();
|
||||
let reception_time = state.oldest_chain_event_reception_time.unwrap();
|
||||
let current_time = Instant::now();
|
||||
|
||||
state.oldest_chain_event_reception_time = None;
|
||||
|
||||
metric_liquidation_check.push(current_time - reception_time);
|
||||
metric_liquidation_start_end
|
||||
.push(current_time - liquidation_start_time.unwrap());
|
||||
liquidation_start_time = None;
|
||||
}
|
||||
|
||||
let mut took_tcs = false;
|
||||
if !liquidated && cli.take_tcs == BoolArg::True {
|
||||
tcs_start_time = Some(tcs_start_time.unwrap_or(Instant::now()));
|
||||
|
||||
took_tcs = liquidation
|
||||
.maybe_take_token_conditional_swap(account_addresses.iter())
|
||||
.await
|
||||
.unwrap_or_else(|err| {
|
||||
error!("error during maybe_take_token_conditional_swap: {err}");
|
||||
false
|
||||
});
|
||||
|
||||
if !took_tcs {
|
||||
let current_time = Instant::now();
|
||||
let mut metric_tcs_start_end =
|
||||
metrics.register_latency("tcs_start_end".into());
|
||||
metric_tcs_start_end.push(current_time - tcs_start_time.unwrap());
|
||||
tcs_start_time = None;
|
||||
}
|
||||
}
|
||||
|
||||
if liquidated || took_tcs {
|
||||
rebalance_trigger_sender.send_unless_full(()).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let token_swap_info_job = tokio::spawn({
|
||||
fn spawn_token_swap_refresh_job(
|
||||
cli: &Cli,
|
||||
shared_state: Arc<RwLock<SharedState>>,
|
||||
token_swap_info_updater: Arc<TokenSwapInfoUpdater>,
|
||||
) -> JoinHandle<()> {
|
||||
tokio::spawn({
|
||||
let mut interval = mango_v4_client::delay_interval(Duration::from_secs(
|
||||
cli.token_swap_refresh_interval_secs,
|
||||
));
|
||||
let mut startup_wait = mango_v4_client::delay_interval(Duration::from_secs(1));
|
||||
let shared_state = shared_state.clone();
|
||||
async move {
|
||||
loop {
|
||||
if !shared_state.read().unwrap().one_snapshot_done {
|
||||
|
@ -546,42 +539,56 @@ async fn main() -> anyhow::Result<()> {
|
|||
token_swap_info_updater.log_all();
|
||||
}
|
||||
}
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
let check_changes_for_abort_job =
|
||||
tokio::spawn(MangoClient::loop_check_for_context_changes_and_abort(
|
||||
mango_client.clone(),
|
||||
Duration::from_secs(300),
|
||||
));
|
||||
fn spawn_context_change_watchdog_job(mango_client: Arc<MangoClient>) -> JoinHandle<()> {
|
||||
tokio::spawn(MangoClient::loop_check_for_context_changes_and_abort(
|
||||
mango_client,
|
||||
Duration::from_secs(300),
|
||||
))
|
||||
}
|
||||
|
||||
if cli.telemetry == BoolArg::True {
|
||||
tokio::spawn(telemetry::report_regularly(
|
||||
mango_client,
|
||||
cli.min_health_ratio,
|
||||
));
|
||||
}
|
||||
fn spawn_telemetry_job(cli: &Cli, mango_client: Arc<MangoClient>) -> JoinHandle<()> {
|
||||
tokio::spawn(telemetry::report_regularly(
|
||||
mango_client,
|
||||
cli.min_health_ratio,
|
||||
))
|
||||
}
|
||||
|
||||
use cli_args::{BoolArg, Cli, CliDotenv};
|
||||
use futures::StreamExt;
|
||||
let mut jobs: futures::stream::FuturesUnordered<_> = vec![
|
||||
data_job,
|
||||
rebalance_job,
|
||||
liquidation_job,
|
||||
token_swap_info_job,
|
||||
check_changes_for_abort_job,
|
||||
snapshot_job,
|
||||
]
|
||||
.into_iter()
|
||||
.chain(prio_jobs.into_iter())
|
||||
.collect();
|
||||
jobs.next().await;
|
||||
fn spawn_rebalance_job(
|
||||
shared_state: &Arc<RwLock<SharedState>>,
|
||||
rebalance_trigger_receiver: async_channel::Receiver<()>,
|
||||
rebalancer: Arc<Rebalancer>,
|
||||
) -> JoinHandle<()> {
|
||||
let mut rebalance_interval = tokio::time::interval(Duration::from_secs(30));
|
||||
|
||||
error!("a critical job aborted, exiting");
|
||||
Ok(())
|
||||
tokio::spawn({
|
||||
let shared_state = shared_state.clone();
|
||||
async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = rebalance_interval.tick() => {}
|
||||
_ = rebalance_trigger_receiver.recv() => {}
|
||||
}
|
||||
if !shared_state.read().unwrap().one_snapshot_done {
|
||||
continue;
|
||||
}
|
||||
if let Err(err) = rebalancer.zero_all_non_quote().await {
|
||||
error!("failed to rebalance liqor: {:?}", err);
|
||||
|
||||
// Workaround: We really need a sequence enforcer in the liquidator since we don't want to
|
||||
// accidentally send a similar tx again when we incorrectly believe an earlier one got forked
|
||||
// off. For now, hard sleep on error to avoid the most frequent error cases.
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct SharedState {
|
||||
pub struct SharedState {
|
||||
/// Addresses of the MangoAccounts belonging to the mango program.
|
||||
/// Needed to check health of them all when the cache updates.
|
||||
mango_accounts: HashSet<Pubkey>,
|
||||
|
@ -591,6 +598,18 @@ struct SharedState {
|
|||
|
||||
/// Oldest chain event not processed yet
|
||||
oldest_chain_event_reception_time: Option<Instant>,
|
||||
|
||||
/// Liquidation candidates (locally identified as liquidatable)
|
||||
liquidation_candidates_accounts: indexmap::set::IndexSet<Pubkey>,
|
||||
|
||||
/// Interesting TCS that should be triggered
|
||||
interesting_tcs: indexmap::set::IndexSet<(Pubkey, u64, u64)>,
|
||||
|
||||
/// Liquidation currently being processed by a worker
|
||||
processing_liquidation: HashSet<Pubkey>,
|
||||
|
||||
// TCS currently being processed by a worker
|
||||
processing_tcs: HashSet<(Pubkey, u64, u64)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
|
@ -614,218 +633,6 @@ impl std::fmt::Display for LiqErrorType {
|
|||
}
|
||||
}
|
||||
|
||||
struct LiquidationState {
|
||||
mango_client: Arc<MangoClient>,
|
||||
account_fetcher: Arc<chain_data::AccountFetcher>,
|
||||
token_swap_info: Arc<token_swap_info::TokenSwapInfoUpdater>,
|
||||
liquidation_config: liquidate::Config,
|
||||
trigger_tcs_config: trigger_tcs::Config,
|
||||
|
||||
errors: ErrorTracking<Pubkey, LiqErrorType>,
|
||||
oracle_errors: ErrorTracking<TokenIndex, LiqErrorType>,
|
||||
}
|
||||
|
||||
impl LiquidationState {
|
||||
async fn maybe_liquidate_one<'b>(
|
||||
&mut self,
|
||||
accounts_iter: impl Iterator<Item = &'b Pubkey>,
|
||||
) -> bool {
|
||||
use rand::seq::SliceRandom;
|
||||
|
||||
let mut accounts = accounts_iter.collect::<Vec<&Pubkey>>();
|
||||
{
|
||||
let mut rng = rand::thread_rng();
|
||||
accounts.shuffle(&mut rng);
|
||||
}
|
||||
|
||||
for pubkey in accounts {
|
||||
if self
|
||||
.maybe_liquidate_and_log_error(pubkey)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
async fn maybe_liquidate_and_log_error(&mut self, pubkey: &Pubkey) -> anyhow::Result<bool> {
|
||||
let now = Instant::now();
|
||||
let error_tracking = &mut self.errors;
|
||||
|
||||
// Skip a pubkey if there've been too many errors recently
|
||||
if let Some(error_entry) =
|
||||
error_tracking.had_too_many_errors(LiqErrorType::Liq, pubkey, now)
|
||||
{
|
||||
trace!(
|
||||
%pubkey,
|
||||
error_entry.count,
|
||||
"skip checking account for liquidation, had errors recently",
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let result = liquidate::maybe_liquidate_account(
|
||||
&self.mango_client,
|
||||
&self.account_fetcher,
|
||||
pubkey,
|
||||
&self.liquidation_config,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Err(err) = result.as_ref() {
|
||||
if let Some((ti, ti_name)) = err.try_unwrap_oracle_error() {
|
||||
if self
|
||||
.oracle_errors
|
||||
.had_too_many_errors(LiqErrorType::Liq, &ti, Instant::now())
|
||||
.is_none()
|
||||
{
|
||||
warn!(
|
||||
"{:?} recording oracle error for token {} {}",
|
||||
chrono::offset::Utc::now(),
|
||||
ti_name,
|
||||
ti
|
||||
);
|
||||
}
|
||||
|
||||
self.oracle_errors
|
||||
.record(LiqErrorType::Liq, &ti, err.to_string());
|
||||
return result;
|
||||
}
|
||||
|
||||
// Keep track of pubkeys that had errors
|
||||
error_tracking.record(LiqErrorType::Liq, pubkey, err.to_string());
|
||||
|
||||
// Not all errors need to be raised to the user's attention.
|
||||
let mut is_error = true;
|
||||
|
||||
// Simulation errors due to liqee precondition failures on the liquidation instructions
|
||||
// will commonly happen if our liquidator is late or if there are chain forks.
|
||||
match err.downcast_ref::<MangoClientError>() {
|
||||
Some(MangoClientError::SendTransactionPreflightFailure { logs, .. }) => {
|
||||
if logs.iter().any(|line| {
|
||||
line.contains("HealthMustBeNegative") || line.contains("IsNotBankrupt")
|
||||
}) {
|
||||
is_error = false;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
};
|
||||
if is_error {
|
||||
error!("liquidating account {}: {:?}", pubkey, err);
|
||||
} else {
|
||||
trace!("liquidating account {}: {:?}", pubkey, err);
|
||||
}
|
||||
} else {
|
||||
error_tracking.clear(LiqErrorType::Liq, pubkey);
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
async fn maybe_take_token_conditional_swap(
|
||||
&mut self,
|
||||
accounts_iter: impl Iterator<Item = &Pubkey>,
|
||||
) -> anyhow::Result<bool> {
|
||||
let accounts = accounts_iter.collect::<Vec<&Pubkey>>();
|
||||
|
||||
let now = Instant::now();
|
||||
let now_ts: u64 = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)?
|
||||
.as_secs();
|
||||
|
||||
let tcs_context = trigger_tcs::Context {
|
||||
mango_client: self.mango_client.clone(),
|
||||
account_fetcher: self.account_fetcher.clone(),
|
||||
token_swap_info: self.token_swap_info.clone(),
|
||||
config: self.trigger_tcs_config.clone(),
|
||||
jupiter_quote_cache: Arc::new(trigger_tcs::JupiterQuoteCache::default()),
|
||||
now_ts,
|
||||
};
|
||||
|
||||
// Find interesting (pubkey, tcsid, volume)
|
||||
let mut interesting_tcs = Vec::with_capacity(accounts.len());
|
||||
for pubkey in accounts.iter() {
|
||||
if let Some(error_entry) =
|
||||
self.errors
|
||||
.had_too_many_errors(LiqErrorType::TcsCollectionHard, pubkey, now)
|
||||
{
|
||||
trace!(
|
||||
%pubkey,
|
||||
error_entry.count,
|
||||
"skip checking account for tcs, had errors recently",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
match tcs_context.find_interesting_tcs_for_account(pubkey) {
|
||||
Ok(v) => {
|
||||
self.errors.clear(LiqErrorType::TcsCollectionHard, pubkey);
|
||||
if v.is_empty() {
|
||||
self.errors
|
||||
.clear(LiqErrorType::TcsCollectionPartial, pubkey);
|
||||
self.errors.clear(LiqErrorType::TcsExecution, pubkey);
|
||||
} else if v.iter().all(|it| it.is_ok()) {
|
||||
self.errors
|
||||
.clear(LiqErrorType::TcsCollectionPartial, pubkey);
|
||||
} else {
|
||||
for it in v.iter() {
|
||||
if let Err(e) = it {
|
||||
self.errors.record(
|
||||
LiqErrorType::TcsCollectionPartial,
|
||||
pubkey,
|
||||
e.to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
interesting_tcs.extend(v.iter().filter_map(|it| it.as_ref().ok()));
|
||||
}
|
||||
Err(e) => {
|
||||
self.errors
|
||||
.record(LiqErrorType::TcsCollectionHard, pubkey, e.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
if interesting_tcs.is_empty() {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let (txsigs, mut changed_pubkeys) = tcs_context
|
||||
.execute_tcs(&mut interesting_tcs, &mut self.errors)
|
||||
.await?;
|
||||
for pubkey in changed_pubkeys.iter() {
|
||||
self.errors.clear(LiqErrorType::TcsExecution, pubkey);
|
||||
}
|
||||
if txsigs.is_empty() {
|
||||
return Ok(false);
|
||||
}
|
||||
changed_pubkeys.push(self.mango_client.mango_account_address);
|
||||
|
||||
// Force a refresh of affected accounts
|
||||
let slot = self
|
||||
.account_fetcher
|
||||
.transaction_max_slot(&txsigs)
|
||||
.await
|
||||
.context("transaction_max_slot")?;
|
||||
if let Err(e) = self
|
||||
.account_fetcher
|
||||
.refresh_accounts_via_rpc_until_slot(
|
||||
&changed_pubkeys,
|
||||
slot,
|
||||
self.liquidation_config.refresh_timeout,
|
||||
)
|
||||
.await
|
||||
{
|
||||
info!(slot, "could not refresh after tcs execution: {}", e);
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
fn start_chain_data_metrics(chain: Arc<RwLock<chain_data::ChainData>>, metrics: &metrics::Metrics) {
|
||||
let mut interval = mango_v4_client::delay_interval(Duration::from_secs(600));
|
||||
|
||||
|
|
|
@ -0,0 +1,218 @@
|
|||
use crate::cli_args::Cli;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::token_swap_info::TokenSwapInfoUpdater;
|
||||
use crate::{trigger_tcs, LiqErrorType, SharedState};
|
||||
use anchor_lang::prelude::Pubkey;
|
||||
use anyhow::Context;
|
||||
use itertools::Itertools;
|
||||
use mango_v4_client::error_tracking::ErrorTracking;
|
||||
use mango_v4_client::{chain_data, MangoClient};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{error, info, trace};
|
||||
|
||||
pub fn spawn_tcs_job(
|
||||
cli: &Cli,
|
||||
shared_state: &Arc<RwLock<SharedState>>,
|
||||
tx_trigger_sender: async_channel::Sender<()>,
|
||||
mut tcs: Box<TcsState>,
|
||||
metrics: &Metrics,
|
||||
) -> JoinHandle<()> {
|
||||
tokio::spawn({
|
||||
let mut interval =
|
||||
mango_v4_client::delay_interval(Duration::from_millis(cli.tcs_check_interval_ms));
|
||||
let mut tcs_start_time = None;
|
||||
let mut metric_tcs_start_end = metrics.register_latency("tcs_start_end".into());
|
||||
let shared_state = shared_state.clone();
|
||||
|
||||
async move {
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
let account_addresses = {
|
||||
let state = shared_state.write().unwrap();
|
||||
if !state.one_snapshot_done {
|
||||
continue;
|
||||
}
|
||||
state.mango_accounts.iter().cloned().collect_vec()
|
||||
};
|
||||
|
||||
tcs.errors.write().unwrap().update();
|
||||
|
||||
tcs_start_time = Some(tcs_start_time.unwrap_or(Instant::now()));
|
||||
|
||||
let found_candidates = tcs
|
||||
.find_candidates(account_addresses.iter(), |candidate| {
|
||||
if shared_state
|
||||
.write()
|
||||
.unwrap()
|
||||
.interesting_tcs
|
||||
.insert(candidate)
|
||||
{
|
||||
tx_trigger_sender.try_send(())?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err| {
|
||||
error!("error during find_candidate: {err}");
|
||||
0
|
||||
});
|
||||
|
||||
if found_candidates > 0 {
|
||||
tracing::debug!("found {} candidates for triggering", found_candidates);
|
||||
}
|
||||
|
||||
let current_time = Instant::now();
|
||||
metric_tcs_start_end.push(current_time - tcs_start_time.unwrap());
|
||||
tcs_start_time = None;
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TcsState {
|
||||
pub mango_client: Arc<MangoClient>,
|
||||
pub account_fetcher: Arc<chain_data::AccountFetcher>,
|
||||
pub token_swap_info: Arc<TokenSwapInfoUpdater>,
|
||||
pub trigger_tcs_config: trigger_tcs::Config,
|
||||
|
||||
pub errors: Arc<RwLock<ErrorTracking<Pubkey, LiqErrorType>>>,
|
||||
}
|
||||
|
||||
impl TcsState {
|
||||
async fn find_candidates(
|
||||
&mut self,
|
||||
accounts_iter: impl Iterator<Item = &Pubkey>,
|
||||
action: impl Fn((Pubkey, u64, u64)) -> anyhow::Result<()>,
|
||||
) -> anyhow::Result<usize> {
|
||||
let accounts = accounts_iter.collect::<Vec<&Pubkey>>();
|
||||
|
||||
let now = Instant::now();
|
||||
let now_ts: u64 = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)?
|
||||
.as_secs();
|
||||
|
||||
let tcs_context = trigger_tcs::Context {
|
||||
mango_client: self.mango_client.clone(),
|
||||
account_fetcher: self.account_fetcher.clone(),
|
||||
token_swap_info: self.token_swap_info.clone(),
|
||||
config: self.trigger_tcs_config.clone(),
|
||||
jupiter_quote_cache: Arc::new(trigger_tcs::JupiterQuoteCache::default()),
|
||||
now_ts,
|
||||
};
|
||||
|
||||
let mut found_counter = 0;
|
||||
|
||||
// Find interesting (pubkey, tcsid, volume)
|
||||
for pubkey in accounts.iter() {
|
||||
if let Some(error_entry) = self.errors.read().unwrap().had_too_many_errors(
|
||||
LiqErrorType::TcsCollectionHard,
|
||||
pubkey,
|
||||
now,
|
||||
) {
|
||||
trace!(
|
||||
%pubkey,
|
||||
error_entry.count,
|
||||
"skip checking account for tcs, had errors recently",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
let candidates = tcs_context.find_interesting_tcs_for_account(pubkey);
|
||||
let mut error_guard = self.errors.write().unwrap();
|
||||
|
||||
match candidates {
|
||||
Ok(v) => {
|
||||
error_guard.clear(LiqErrorType::TcsCollectionHard, pubkey);
|
||||
if v.is_empty() {
|
||||
error_guard.clear(LiqErrorType::TcsCollectionPartial, pubkey);
|
||||
error_guard.clear(LiqErrorType::TcsExecution, pubkey);
|
||||
} else if v.iter().all(|it| it.is_ok()) {
|
||||
error_guard.clear(LiqErrorType::TcsCollectionPartial, pubkey);
|
||||
} else {
|
||||
for it in v.iter() {
|
||||
if let Err(e) = it {
|
||||
error_guard.record(
|
||||
LiqErrorType::TcsCollectionPartial,
|
||||
pubkey,
|
||||
e.to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
for interesting_candidate_res in v.iter() {
|
||||
if let Ok(interesting_candidate) = interesting_candidate_res {
|
||||
action(*interesting_candidate).expect("failed to send TCS candidate");
|
||||
found_counter += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error_guard.record(LiqErrorType::TcsCollectionHard, pubkey, e.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(found_counter);
|
||||
}
|
||||
|
||||
pub async fn maybe_take_token_conditional_swap(
|
||||
&mut self,
|
||||
mut interesting_tcs: Vec<(Pubkey, u64, u64)>,
|
||||
) -> anyhow::Result<bool> {
|
||||
let now_ts: u64 = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)?
|
||||
.as_secs();
|
||||
|
||||
let tcs_context = trigger_tcs::Context {
|
||||
mango_client: self.mango_client.clone(),
|
||||
account_fetcher: self.account_fetcher.clone(),
|
||||
token_swap_info: self.token_swap_info.clone(),
|
||||
config: self.trigger_tcs_config.clone(),
|
||||
jupiter_quote_cache: Arc::new(trigger_tcs::JupiterQuoteCache::default()),
|
||||
now_ts,
|
||||
};
|
||||
|
||||
if interesting_tcs.is_empty() {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let (txsigs, mut changed_pubkeys) = tcs_context
|
||||
.execute_tcs(&mut interesting_tcs, self.errors.clone())
|
||||
.await?;
|
||||
for pubkey in changed_pubkeys.iter() {
|
||||
self.errors
|
||||
.write()
|
||||
.unwrap()
|
||||
.clear(LiqErrorType::TcsExecution, pubkey);
|
||||
}
|
||||
if txsigs.is_empty() {
|
||||
return Ok(false);
|
||||
}
|
||||
changed_pubkeys.push(self.mango_client.mango_account_address);
|
||||
|
||||
// Force a refresh of affected accounts
|
||||
let slot = self
|
||||
.account_fetcher
|
||||
.transaction_max_slot(&txsigs)
|
||||
.await
|
||||
.context("transaction_max_slot")?;
|
||||
if let Err(e) = self
|
||||
.account_fetcher
|
||||
.refresh_accounts_via_rpc_until_slot(
|
||||
&changed_pubkeys,
|
||||
slot,
|
||||
self.trigger_tcs_config.refresh_timeout,
|
||||
)
|
||||
.await
|
||||
{
|
||||
info!(slot, "could not refresh after tcs execution: {}", e);
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
}
|
|
@ -1,4 +1,5 @@
|
|||
use std::collections::HashSet;
|
||||
use std::time::Duration;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
pin::Pin,
|
||||
|
@ -15,6 +16,7 @@ use mango_v4::{
|
|||
use mango_v4_client::{chain_data, swap, MangoClient, TransactionBuilder};
|
||||
|
||||
use anyhow::Context as AnyhowContext;
|
||||
use mango_v4::accounts_ix::HealthCheckKind::MaintRatio;
|
||||
use solana_sdk::signature::Signature;
|
||||
use tracing::*;
|
||||
use {fixed::types::I80F48, solana_sdk::pubkey::Pubkey};
|
||||
|
@ -55,6 +57,7 @@ pub enum Mode {
|
|||
|
||||
#[derive(Clone)]
|
||||
pub struct Config {
|
||||
pub refresh_timeout: Duration,
|
||||
pub min_health_ratio: f64,
|
||||
pub max_trigger_quote_amount: u64,
|
||||
pub compute_limit_for_trigger: u32,
|
||||
|
@ -999,7 +1002,7 @@ impl Context {
|
|||
pub async fn execute_tcs(
|
||||
&self,
|
||||
tcs: &mut [(Pubkey, u64, u64)],
|
||||
error_tracking: &mut ErrorTracking<Pubkey, LiqErrorType>,
|
||||
error_tracking: Arc<RwLock<ErrorTracking<Pubkey, LiqErrorType>>>,
|
||||
) -> anyhow::Result<(Vec<Signature>, Vec<Pubkey>)> {
|
||||
use rand::distributions::{Distribution, WeightedError, WeightedIndex};
|
||||
|
||||
|
@ -1048,7 +1051,7 @@ impl Context {
|
|||
}
|
||||
Err(e) => {
|
||||
trace!(%result.pubkey, "preparation error {:?}", e);
|
||||
error_tracking.record(
|
||||
error_tracking.write().unwrap().record(
|
||||
LiqErrorType::TcsExecution,
|
||||
&result.pubkey,
|
||||
e.to_string(),
|
||||
|
@ -1092,7 +1095,7 @@ impl Context {
|
|||
};
|
||||
|
||||
// start the new one
|
||||
if let Some(job) = self.prepare_job(&pubkey, tcs_id, volume, error_tracking) {
|
||||
if let Some(job) = self.prepare_job(&pubkey, tcs_id, volume, error_tracking.clone()) {
|
||||
pending_volume += volume;
|
||||
pending.push(job);
|
||||
}
|
||||
|
@ -1129,7 +1132,11 @@ impl Context {
|
|||
Ok(v) => Some((pubkey, v)),
|
||||
Err(err) => {
|
||||
trace!(%pubkey, "execution error {:?}", err);
|
||||
error_tracking.record(LiqErrorType::TcsExecution, &pubkey, err.to_string());
|
||||
error_tracking.write().unwrap().record(
|
||||
LiqErrorType::TcsExecution,
|
||||
&pubkey,
|
||||
err.to_string(),
|
||||
);
|
||||
None
|
||||
}
|
||||
});
|
||||
|
@ -1144,12 +1151,14 @@ impl Context {
|
|||
pubkey: &Pubkey,
|
||||
tcs_id: u64,
|
||||
volume: u64,
|
||||
error_tracking: &ErrorTracking<Pubkey, LiqErrorType>,
|
||||
error_tracking: Arc<RwLock<ErrorTracking<Pubkey, LiqErrorType>>>,
|
||||
) -> Option<Pin<Box<dyn Future<Output = PreparationResult> + Send>>> {
|
||||
// Skip a pubkey if there've been too many errors recently
|
||||
if let Some(error_entry) =
|
||||
error_tracking.had_too_many_errors(LiqErrorType::TcsExecution, pubkey, Instant::now())
|
||||
{
|
||||
if let Some(error_entry) = error_tracking.read().unwrap().had_too_many_errors(
|
||||
LiqErrorType::TcsExecution,
|
||||
pubkey,
|
||||
Instant::now(),
|
||||
) {
|
||||
trace!(
|
||||
"skip checking for tcs on account {pubkey}, had {} errors recently",
|
||||
error_entry.count
|
||||
|
@ -1224,6 +1233,27 @@ impl Context {
|
|||
.instructions
|
||||
.append(&mut trigger_ixs.instructions);
|
||||
|
||||
let (_, tcs) = liqee.token_conditional_swap_by_id(pending.tcs_id)?;
|
||||
let affected_tokens = allowed_tokens
|
||||
.iter()
|
||||
.chain(&[tcs.buy_token_index, tcs.sell_token_index])
|
||||
.copied()
|
||||
.collect_vec();
|
||||
let liqor = &self.mango_client.mango_account().await?;
|
||||
tx_builder.instructions.append(
|
||||
&mut self
|
||||
.mango_client
|
||||
.health_check_instruction(
|
||||
liqor,
|
||||
self.config.min_health_ratio,
|
||||
affected_tokens,
|
||||
vec![],
|
||||
MaintRatio,
|
||||
)
|
||||
.await?
|
||||
.instructions,
|
||||
);
|
||||
|
||||
let txsig = tx_builder
|
||||
.send_and_confirm(&self.mango_client.client)
|
||||
.await?;
|
||||
|
|
|
@ -0,0 +1,241 @@
|
|||
use crate::liquidation_state::LiquidationState;
|
||||
use crate::tcs_state::TcsState;
|
||||
use crate::SharedState;
|
||||
use anchor_lang::prelude::Pubkey;
|
||||
use async_channel::{Receiver, Sender};
|
||||
use mango_v4_client::AsyncChannelSendUnlessFull;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{debug, error, trace};
|
||||
|
||||
enum WorkerTask {
|
||||
Liquidation(Pubkey),
|
||||
Tcs(Vec<(Pubkey, u64, u64)>),
|
||||
|
||||
// Given two workers: #0=LIQ_only, #1=LIQ+TCS
|
||||
// If they are both busy, and the scanning jobs find a new TCS and a new LIQ candidates and enqueue them in the channel
|
||||
// Then if #1 wake up first, it will consume the LIQ candidate (LIQ always have priority)
|
||||
// Then when #0 wake up, it will not find any LIQ candidate, and would not do anything (it won't take a TCS)
|
||||
// But if we do nothing, #1 would never wake up again (no new task in channel)
|
||||
// So we use this `GiveUpTcs` that will be handled by #0 by queuing a new signal the channel and will wake up #1 again
|
||||
GiveUpTcs,
|
||||
|
||||
// Can happen if TCS is batched (2 TCS enqueued, 2 workers waken, but first one take both tasks)
|
||||
NoWork,
|
||||
}
|
||||
|
||||
pub fn spawn_tx_senders_job(
|
||||
max_parallel_operations: u64,
|
||||
enable_liquidation: bool,
|
||||
tx_liq_trigger_receiver: Receiver<()>,
|
||||
tx_tcs_trigger_receiver: Receiver<()>,
|
||||
tx_tcs_trigger_sender: Sender<()>,
|
||||
rebalance_trigger_sender: Sender<()>,
|
||||
shared_state: Arc<RwLock<SharedState>>,
|
||||
liquidation: Box<LiquidationState>,
|
||||
tcs: Box<TcsState>,
|
||||
) -> Vec<JoinHandle<()>> {
|
||||
if max_parallel_operations < 1 {
|
||||
error!("max_parallel_operations must be >= 1");
|
||||
std::process::exit(1)
|
||||
}
|
||||
|
||||
let reserve_one_worker_for_liquidation = max_parallel_operations > 1 && enable_liquidation;
|
||||
|
||||
let workers: Vec<JoinHandle<()>> = (0..max_parallel_operations)
|
||||
.map(|worker_id| {
|
||||
tokio::spawn({
|
||||
let shared_state = shared_state.clone();
|
||||
let receiver_liq = tx_liq_trigger_receiver.clone();
|
||||
let receiver_tcs = tx_tcs_trigger_receiver.clone();
|
||||
let sender_tcs = tx_tcs_trigger_sender.clone();
|
||||
let rebalance_trigger_sender = rebalance_trigger_sender.clone();
|
||||
let liquidation = liquidation.clone();
|
||||
let tcs = tcs.clone();
|
||||
async move {
|
||||
worker_loop(
|
||||
shared_state,
|
||||
receiver_liq,
|
||||
receiver_tcs,
|
||||
sender_tcs,
|
||||
rebalance_trigger_sender,
|
||||
liquidation,
|
||||
tcs,
|
||||
worker_id,
|
||||
reserve_one_worker_for_liquidation && worker_id == 0,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
workers
|
||||
}
|
||||
|
||||
async fn worker_loop(
|
||||
shared_state: Arc<RwLock<SharedState>>,
|
||||
liq_receiver: Receiver<()>,
|
||||
tcs_receiver: Receiver<()>,
|
||||
tcs_sender: Sender<()>,
|
||||
rebalance_trigger_sender: Sender<()>,
|
||||
mut liquidation: Box<LiquidationState>,
|
||||
mut tcs: Box<TcsState>,
|
||||
id: u64,
|
||||
only_liquidation: bool,
|
||||
) {
|
||||
loop {
|
||||
debug!(
|
||||
"Worker #{} waiting for task (only_liq={})",
|
||||
id, only_liquidation
|
||||
);
|
||||
|
||||
let _ = if only_liquidation {
|
||||
liq_receiver.recv().await.expect("receive failed")
|
||||
} else {
|
||||
tokio::select!(
|
||||
_ = liq_receiver.recv() => {},
|
||||
_ = tcs_receiver.recv() => {},
|
||||
)
|
||||
};
|
||||
|
||||
// a task must be available to process
|
||||
// find it in global shared state, and mark it as processing
|
||||
let task = worker_pull_task(&shared_state, id, only_liquidation)
|
||||
.expect("Worker woke up but has nothing to do");
|
||||
|
||||
// execute the task
|
||||
let need_rebalancing = match &task {
|
||||
WorkerTask::Liquidation(l) => worker_execute_liquidation(&mut liquidation, *l).await,
|
||||
WorkerTask::Tcs(t) => worker_execute_tcs(&mut tcs, t.clone()).await,
|
||||
WorkerTask::GiveUpTcs => worker_give_up_tcs(&tcs_sender).await,
|
||||
WorkerTask::NoWork => false,
|
||||
};
|
||||
|
||||
if need_rebalancing {
|
||||
rebalance_trigger_sender.send_unless_full(()).unwrap();
|
||||
}
|
||||
|
||||
// remove from shared state
|
||||
worker_finalize_task(&shared_state, id, task, need_rebalancing);
|
||||
}
|
||||
}
|
||||
|
||||
async fn worker_give_up_tcs(sender: &Sender<()>) -> bool {
|
||||
sender.send(()).await.expect("sending task failed");
|
||||
false
|
||||
}
|
||||
|
||||
async fn worker_execute_tcs(tcs: &mut Box<TcsState>, candidates: Vec<(Pubkey, u64, u64)>) -> bool {
|
||||
tcs.maybe_take_token_conditional_swap(candidates)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
async fn worker_execute_liquidation(
|
||||
liquidation: &mut Box<LiquidationState>,
|
||||
candidate: Pubkey,
|
||||
) -> bool {
|
||||
liquidation
|
||||
.maybe_liquidate_and_log_error(&candidate)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn worker_pull_task(
|
||||
shared_state: &Arc<RwLock<SharedState>>,
|
||||
id: u64,
|
||||
only_liquidation: bool,
|
||||
) -> anyhow::Result<WorkerTask> {
|
||||
let mut writer = shared_state.write().unwrap();
|
||||
|
||||
// print out list of all task for debugging
|
||||
for x in &writer.liquidation_candidates_accounts {
|
||||
if !writer.processing_liquidation.contains(x) {
|
||||
trace!(" - LIQ {:?}", x);
|
||||
}
|
||||
}
|
||||
|
||||
// next liq task to execute
|
||||
if let Some(liq_candidate) = writer
|
||||
.liquidation_candidates_accounts
|
||||
.iter()
|
||||
.find(|x| !writer.processing_liquidation.contains(x))
|
||||
.copied()
|
||||
{
|
||||
debug!("worker #{} got a liq candidate -> {}", id, liq_candidate);
|
||||
writer.processing_liquidation.insert(liq_candidate);
|
||||
return Ok(WorkerTask::Liquidation(liq_candidate));
|
||||
}
|
||||
|
||||
let tcs_todo = writer.interesting_tcs.len() - writer.processing_tcs.len();
|
||||
|
||||
if only_liquidation {
|
||||
debug!("worker #{} giving up TCS (todo count: {})", id, tcs_todo);
|
||||
return Ok(WorkerTask::GiveUpTcs);
|
||||
}
|
||||
|
||||
for x in &writer.interesting_tcs {
|
||||
if !writer.processing_tcs.contains(x) {
|
||||
trace!(" - TCS {:?}", x);
|
||||
}
|
||||
}
|
||||
|
||||
// next tcs task to execute
|
||||
let max_tcs_batch_size = 20;
|
||||
let tcs_candidates: Vec<(Pubkey, u64, u64)> = writer
|
||||
.interesting_tcs
|
||||
.iter()
|
||||
.filter(|x| !writer.processing_tcs.contains(x))
|
||||
.take(max_tcs_batch_size)
|
||||
.copied()
|
||||
.collect();
|
||||
|
||||
for tcs_candidate in &tcs_candidates {
|
||||
debug!(
|
||||
"worker #{} got a tcs candidate -> {:?} (out of {})",
|
||||
id,
|
||||
tcs_candidate,
|
||||
writer.interesting_tcs.len()
|
||||
);
|
||||
writer.processing_tcs.insert(tcs_candidate.clone());
|
||||
}
|
||||
|
||||
if tcs_candidates.len() > 0 {
|
||||
Ok(WorkerTask::Tcs(tcs_candidates))
|
||||
} else {
|
||||
debug!("worker #{} got nothing", id);
|
||||
Ok(WorkerTask::NoWork)
|
||||
}
|
||||
}
|
||||
|
||||
fn worker_finalize_task(
|
||||
shared_state: &Arc<RwLock<SharedState>>,
|
||||
id: u64,
|
||||
task: WorkerTask,
|
||||
done: bool,
|
||||
) {
|
||||
let mut writer = shared_state.write().unwrap();
|
||||
match task {
|
||||
WorkerTask::Liquidation(liq) => {
|
||||
debug!(
|
||||
"worker #{} - checked liq {:?} with success ? {}",
|
||||
id, liq, done
|
||||
);
|
||||
writer.liquidation_candidates_accounts.shift_remove(&liq);
|
||||
writer.processing_liquidation.remove(&liq);
|
||||
}
|
||||
WorkerTask::Tcs(tcs_list) => {
|
||||
for tcs in tcs_list {
|
||||
debug!(
|
||||
"worker #{} - checked tcs {:?} with success ? {}",
|
||||
id, tcs, done
|
||||
);
|
||||
writer.interesting_tcs.shift_remove(&tcs);
|
||||
writer.processing_tcs.remove(&tcs);
|
||||
}
|
||||
}
|
||||
WorkerTask::GiveUpTcs => {}
|
||||
WorkerTask::NoWork => {}
|
||||
}
|
||||
}
|
|
@ -2170,7 +2170,10 @@ impl MangoClient {
|
|||
}
|
||||
|
||||
pub fn jupiter_v6(&self) -> swap::jupiter_v6::JupiterV6 {
|
||||
swap::jupiter_v6::JupiterV6 { mango_client: self }
|
||||
swap::jupiter_v6::JupiterV6 {
|
||||
mango_client: self,
|
||||
timeout_duration: self.client.config.jupiter_timeout,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sanctum(&self) -> swap::sanctum::Sanctum {
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use anchor_lang::prelude::Pubkey;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -133,6 +134,7 @@ impl TryFrom<&AccountMeta> for solana_sdk::instruction::AccountMeta {
|
|||
|
||||
pub struct JupiterV6<'a> {
|
||||
pub mango_client: &'a MangoClient,
|
||||
pub timeout_duration: Duration,
|
||||
}
|
||||
|
||||
impl<'a> JupiterV6<'a> {
|
||||
|
@ -198,6 +200,7 @@ impl<'a> JupiterV6<'a> {
|
|||
.http_client
|
||||
.get(format!("{}/quote", config.jupiter_v6_url))
|
||||
.query(&query_args)
|
||||
.timeout(self.timeout_duration)
|
||||
.send()
|
||||
.await
|
||||
.context("quote request to jupiter")?;
|
||||
|
@ -284,6 +287,7 @@ impl<'a> JupiterV6<'a> {
|
|||
destination_token_account: None, // default to user ata
|
||||
quote_response: quote.clone(),
|
||||
})
|
||||
.timeout(self.timeout_duration)
|
||||
.send()
|
||||
.await
|
||||
.context("swap transaction request to jupiter")?;
|
||||
|
|
|
@ -21,19 +21,29 @@ impl<T, E: std::fmt::Debug> AnyhowWrap for Result<T, E> {
|
|||
/// Push to an async_channel::Sender and ignore if the channel is full
|
||||
pub trait AsyncChannelSendUnlessFull<T> {
|
||||
/// Send a message if the channel isn't full
|
||||
fn send_unless_full(&self, msg: T) -> Result<(), async_channel::SendError<T>>;
|
||||
fn send_unless_full(&self, msg: T) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
impl<T> AsyncChannelSendUnlessFull<T> for async_channel::Sender<T> {
|
||||
fn send_unless_full(&self, msg: T) -> Result<(), async_channel::SendError<T>> {
|
||||
fn send_unless_full(&self, msg: T) -> anyhow::Result<()> {
|
||||
use async_channel::*;
|
||||
match self.try_send(msg) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(TrySendError::Closed(msg)) => Err(async_channel::SendError(msg)),
|
||||
Err(TrySendError::Closed(_)) => Err(anyhow::format_err!("channel is closed")),
|
||||
Err(TrySendError::Full(_)) => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<T> AsyncChannelSendUnlessFull<T> for tokio::sync::mpsc::Sender<T> {
|
||||
fn send_unless_full(&self, msg: T) -> anyhow::Result<()> {
|
||||
use tokio::sync::mpsc::*;
|
||||
match self.try_send(msg) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(error::TrySendError::Closed(_)) => Err(anyhow::format_err!("channel is closed")),
|
||||
Err(error::TrySendError::Full(_)) => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Like tokio::time::interval(), but with Delay as default MissedTickBehavior
|
||||
///
|
||||
|
|
|
@ -7,8 +7,6 @@ use mango_v4::accounts_ix::{HealthCheck, HealthCheckKind};
|
|||
use mango_v4::error::MangoError;
|
||||
use solana_sdk::transport::TransportError;
|
||||
|
||||
// TODO FAS
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_health_check() -> Result<(), TransportError> {
|
||||
let context = TestContext::new().await;
|
||||
|
|
Loading…
Reference in New Issue