diff --git a/bin/liquidator/src/main.rs b/bin/liquidator/src/main.rs index 7c1ff3119..283a8d9c9 100644 --- a/bin/liquidator/src/main.rs +++ b/bin/liquidator/src/main.rs @@ -213,7 +213,7 @@ async fn main() -> anyhow::Result<()> { mango_group, get_multiple_accounts_count: cli.get_multiple_accounts_count, parallel_rpc_requests: cli.parallel_rpc_requests, - snapshot_interval: std::time::Duration::from_secs(cli.snapshot_interval_secs), + snapshot_interval: Duration::from_secs(cli.snapshot_interval_secs), min_slot: first_websocket_slot + 10, }, mango_oracles, @@ -289,16 +289,27 @@ async fn main() -> anyhow::Result<()> { token_swap_info: token_swap_info_updater.clone(), liq_errors: ErrorTracking { skip_threshold: 5, - skip_duration: std::time::Duration::from_secs(120), - reset_duration: std::time::Duration::from_secs(360), + skip_duration: Duration::from_secs(120), ..ErrorTracking::default() }, - tcs_errors: ErrorTracking { + tcs_collection_hard_errors: ErrorTracking { skip_threshold: 2, - skip_duration: std::time::Duration::from_secs(120), - reset_duration: std::time::Duration::from_secs(360), + skip_duration: Duration::from_secs(120), ..ErrorTracking::default() }, + tcs_collection_partial_errors: ErrorTracking { + skip_threshold: 2, + skip_duration: Duration::from_secs(120), + ..ErrorTracking::default() + }, + tcs_execution_errors: ErrorTracking { + skip_threshold: 2, + skip_duration: Duration::from_secs(120), + ..ErrorTracking::default() + }, + persistent_error_report_interval: Duration::from_secs(300), + persistent_error_min_duration: Duration::from_secs(300), + last_persistent_error_report: Instant::now(), }); let (liquidation_trigger_sender, liquidation_trigger_receiver) = @@ -442,6 +453,8 @@ async fn main() -> anyhow::Result<()> { state.health_check_accounts = vec![]; } + liquidation.log_persistent_errors(); + let liquidated = liquidation .maybe_liquidate_one_and_rebalance(account_addresses.iter()) .await @@ -464,6 +477,7 @@ async fn main() -> anyhow::Result<()> { let shared_state = shared_state.clone(); async move { loop { + interval.tick().await; if !shared_state.read().unwrap().one_snapshot_done { continue; } @@ -475,6 +489,7 @@ async fn main() -> anyhow::Result<()> { .copied() .collect_vec(); for token_index in token_indexes { + min_delay.tick().await; match token_swap_info_updater.update_one(token_index).await { Ok(()) => {} Err(err) => { @@ -484,11 +499,8 @@ async fn main() -> anyhow::Result<()> { ); } } - min_delay.tick().await; } token_swap_info_updater.log_all(); - - interval.tick().await; } } }); @@ -532,17 +544,17 @@ struct SharedState { } #[derive(Clone)] -pub struct AccountErrorState { +struct AccountErrorState { + pub messages: Vec, pub count: u64, - pub last_at: std::time::Instant, + pub last_at: Instant, } #[derive(Default)] -pub struct ErrorTracking { +struct ErrorTracking { accounts: HashMap, skip_threshold: u64, - skip_duration: std::time::Duration, - reset_duration: std::time::Duration, + skip_duration: Duration, } impl ErrorTracking { @@ -560,21 +572,42 @@ impl ErrorTracking { } } - pub fn record_error(&mut self, pubkey: &Pubkey, now: Instant) { + pub fn record_error(&mut self, pubkey: &Pubkey, now: Instant, message: String) { let error_entry = self.accounts.entry(*pubkey).or_insert(AccountErrorState { + messages: Vec::with_capacity(1), count: 0, last_at: now, }); - if now.duration_since(error_entry.last_at) > self.reset_duration { - error_entry.count = 0; - } error_entry.count += 1; error_entry.last_at = now; + if !error_entry.messages.contains(&message) { + error_entry.messages.push(message); + } + if error_entry.messages.len() > 5 { + error_entry.messages.remove(0); + } } pub fn clear_errors(&mut self, pubkey: &Pubkey) { self.accounts.remove(pubkey); } + + #[instrument(skip_all, fields(%error_type))] + #[allow(unused_variables)] + pub fn log_persistent_errors(&self, error_type: &str, min_duration: Duration) { + let now = Instant::now(); + for (pubkey, errors) in self.accounts.iter() { + if now.duration_since(errors.last_at) < min_duration { + continue; + } + info!( + %pubkey, + count = errors.count, + messages = ?errors.messages, + "has persistent errors", + ); + } + } } struct LiquidationState { @@ -584,8 +617,16 @@ struct LiquidationState { token_swap_info: Arc, liquidation_config: liquidate::Config, trigger_tcs_config: trigger_tcs::Config, + liq_errors: ErrorTracking, - tcs_errors: ErrorTracking, + /// Errors that suggest we maybe should skip trying to collect tcs for that pubkey + tcs_collection_hard_errors: ErrorTracking, + /// Recording errors when some tcs have errors during collection but others don't + tcs_collection_partial_errors: ErrorTracking, + tcs_execution_errors: ErrorTracking, + persistent_error_report_interval: Duration, + last_persistent_error_report: Instant, + persistent_error_min_duration: Duration, } impl LiquidationState { @@ -623,14 +664,15 @@ impl LiquidationState { } async fn maybe_liquidate_and_log_error(&mut self, pubkey: &Pubkey) -> anyhow::Result { - let now = std::time::Instant::now(); + let now = Instant::now(); let error_tracking = &mut self.liq_errors; // Skip a pubkey if there've been too many errors recently if let Some(error_entry) = error_tracking.had_too_many_errors(pubkey, now) { trace!( - "skip checking account {pubkey}, had {} errors recently", - error_entry.count + %pubkey, + error_entry.count, + "skip checking account for liquidation, had errors recently", ); return Ok(false); } @@ -645,7 +687,7 @@ impl LiquidationState { if let Err(err) = result.as_ref() { // Keep track of pubkeys that had errors - error_tracking.record_error(pubkey, now); + error_tracking.record_error(pubkey, now, err.to_string()); // Not all errors need to be raised to the user's attention. let mut is_error = true; @@ -680,7 +722,7 @@ impl LiquidationState { ) -> anyhow::Result<()> { let accounts = accounts_iter.collect::>(); - let now = std::time::Instant::now(); + let now = Instant::now(); let now_ts: u64 = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_secs() @@ -689,16 +731,49 @@ impl LiquidationState { // Find interesting (pubkey, tcsid, volume) let mut interesting_tcs = Vec::with_capacity(accounts.len()); for pubkey in accounts.iter() { - if let Ok(mut v) = trigger_tcs::find_interesting_tcs_for_account( + if let Some(error_entry) = self + .tcs_collection_hard_errors + .had_too_many_errors(pubkey, now) + { + trace!( + %pubkey, + error_entry.count, + "skip checking account for tcs, had errors recently", + ); + continue; + } + + match trigger_tcs::find_interesting_tcs_for_account( pubkey, &self.mango_client, &self.account_fetcher, &self.token_swap_info, - &self.tcs_errors, - now, now_ts, ) { - interesting_tcs.append(&mut v); + Ok(v) => { + self.tcs_collection_hard_errors.clear_errors(pubkey); + if v.is_empty() { + self.tcs_collection_partial_errors.clear_errors(pubkey); + self.tcs_execution_errors.clear_errors(pubkey); + } else if v.iter().all(|it| it.is_ok()) { + self.tcs_collection_partial_errors.clear_errors(pubkey); + } else { + for it in v.iter() { + if let Err(e) = it { + self.tcs_collection_partial_errors.record_error( + pubkey, + now, + e.to_string(), + ); + } + } + } + interesting_tcs.extend(v.iter().filter_map(|it| it.as_ref().ok())); + } + Err(e) => { + self.tcs_collection_hard_errors + .record_error(pubkey, now, e.to_string()); + } } } if interesting_tcs.is_empty() { @@ -750,8 +825,8 @@ impl LiquidationState { pubkey: &Pubkey, tcs_id: u64, ) -> anyhow::Result { - let now = std::time::Instant::now(); - let error_tracking = &mut self.tcs_errors; + let now = Instant::now(); + let error_tracking = &mut self.tcs_execution_errors; // Skip a pubkey if there've been too many errors recently if let Some(error_entry) = error_tracking.had_too_many_errors(pubkey, now) { @@ -774,7 +849,7 @@ impl LiquidationState { if let Err(err) = result.as_ref() { // Keep track of pubkeys that had errors - error_tracking.record_error(pubkey, now); + error_tracking.record_error(pubkey, now, err.to_string()); // Not all errors need to be raised to the user's attention. let mut is_error = true; @@ -803,10 +878,30 @@ impl LiquidationState { result } + + fn log_persistent_errors(&mut self) { + let now = Instant::now(); + if now.duration_since(self.last_persistent_error_report) + < self.persistent_error_report_interval + { + return; + } + self.last_persistent_error_report = now; + + let min_duration = self.persistent_error_min_duration; + self.liq_errors + .log_persistent_errors("liquidation", min_duration); + self.tcs_execution_errors + .log_persistent_errors("tcs execution", min_duration); + self.tcs_collection_hard_errors + .log_persistent_errors("tcs collection hard", min_duration); + self.tcs_collection_partial_errors + .log_persistent_errors("tcs collection partial", min_duration); + } } fn start_chain_data_metrics(chain: Arc>, metrics: &metrics::Metrics) { - let mut interval = tokio::time::interval(std::time::Duration::from_secs(600)); + let mut interval = tokio::time::interval(Duration::from_secs(600)); let mut metric_slots_count = metrics.register_u64("chain_data_slots_count".into()); let mut metric_accounts_count = metrics.register_u64("chain_data_accounts_count".into()); diff --git a/bin/liquidator/src/trigger_tcs.rs b/bin/liquidator/src/trigger_tcs.rs index 2509669df..1a397e08d 100644 --- a/bin/liquidator/src/trigger_tcs.rs +++ b/bin/liquidator/src/trigger_tcs.rs @@ -1,4 +1,4 @@ -use std::time::{Duration, Instant}; +use std::time::Duration; use itertools::Itertools; use mango_v4::{ @@ -10,7 +10,7 @@ use mango_v4_client::{chain_data, health_cache, JupiterSwapMode, MangoClient, Ma use tracing::*; use {anyhow::Context, fixed::types::I80F48, solana_sdk::pubkey::Pubkey}; -use crate::{token_swap_info, util, ErrorTracking}; +use crate::{token_swap_info, util}; // The liqee health ratio to aim for when executing tcs orders that are bigger // than the liqee can support. @@ -267,7 +267,7 @@ async fn execute_token_conditional_swap( } #[allow(clippy::too_many_arguments)] -#[instrument(skip_all, fields(%pubkey, tcs_id))] +#[instrument(skip_all, fields(%pubkey, %tcs_id))] pub async fn remove_expired_token_conditional_swap( mango_client: &MangoClient, pubkey: &Pubkey, @@ -380,13 +380,8 @@ pub fn find_interesting_tcs_for_account( mango_client: &MangoClient, account_fetcher: &chain_data::AccountFetcher, token_swap_info: &token_swap_info::TokenSwapInfoUpdater, - error_tracking: &ErrorTracking, - now: Instant, now_ts: u64, -) -> anyhow::Result> { - if error_tracking.had_too_many_errors(pubkey, now).is_some() { - anyhow::bail!("too many errors"); - } +) -> anyhow::Result>> { let liqee = account_fetcher.fetch_mango_account(pubkey)?; let interesting_tcs = liqee.active_token_conditional_swaps().filter_map(|tcs| { @@ -397,12 +392,12 @@ pub fn find_interesting_tcs_for_account( token_swap_info, now_ts, ) { - Ok(false) | Err(_) => None, Ok(true) => { - let volume = - tcs_max_volume(&liqee, mango_client, account_fetcher, tcs).unwrap_or(1); - Some((*pubkey, tcs.id, volume)) + let volume_result = tcs_max_volume(&liqee, mango_client, account_fetcher, tcs); + Some(volume_result.map(|v| (*pubkey, tcs.id, v))) } + Ok(false) => None, + Err(e) => Some(Err(e)), } }); Ok(interesting_tcs.collect_vec())