diff --git a/bin/liquidator/src/main.rs b/bin/liquidator/src/main.rs index 5781c41d3..05391acf4 100644 --- a/bin/liquidator/src/main.rs +++ b/bin/liquidator/src/main.rs @@ -368,29 +368,11 @@ async fn main() -> anyhow::Result<()> { trigger_tcs_config: tcs_config, rebalancer: rebalancer.clone(), token_swap_info: token_swap_info_updater.clone(), - liq_errors: ErrorTracking { - skip_threshold: 5, - skip_duration: Duration::from_secs(120), - ..ErrorTracking::default() - }, - tcs_collection_hard_errors: ErrorTracking { - skip_threshold: 2, - skip_duration: Duration::from_secs(120), - ..ErrorTracking::default() - }, - tcs_collection_partial_errors: ErrorTracking { - skip_threshold: 2, - skip_duration: Duration::from_secs(120), - ..ErrorTracking::default() - }, - tcs_execution_errors: ErrorTracking { - skip_threshold: 2, - skip_duration: Duration::from_secs(120), - ..ErrorTracking::default() - }, - persistent_error_report_interval: Duration::from_secs(300), - persistent_error_min_duration: Duration::from_secs(300), - last_persistent_error_report: Instant::now(), + errors: ErrorTracking::builder() + .skip_threshold(2) + .skip_threshold_for_type(LiqErrorType::Liq, 5) + .skip_duration(Duration::from_secs(120)) + .build()?, }); info!("main loop"); @@ -482,6 +464,8 @@ async fn main() -> anyhow::Result<()> { state.mango_accounts.iter().cloned().collect_vec() }; + liquidation.errors.update(); + if must_rebalance || last_rebalance.elapsed() > rebalance_delay { if let Err(err) = liquidation.rebalancer.zero_all_non_quote().await { error!("failed to rebalance liqor: {:?}", err); @@ -490,8 +474,6 @@ async fn main() -> anyhow::Result<()> { last_rebalance = Instant::now(); } - liquidation.log_persistent_errors(); - let liquidated = liquidation .maybe_liquidate_one(account_addresses.iter()) .await @@ -533,14 +515,7 @@ async fn main() -> anyhow::Result<()> { let mut min_delay = tokio::time::interval(Duration::from_secs(1)); for token_index in token_indexes { min_delay.tick().await; - match token_swap_info_updater.update_one(token_index).await { - Ok(()) => {} - Err(err) => { - warn!( - "failed to update token swap info for token {token_index}: {err:?}", - ); - } - } + token_swap_info_updater.update_one(token_index).await; } token_swap_info_updater.log_all(); } @@ -585,6 +560,27 @@ struct SharedState { one_snapshot_done: bool, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum LiqErrorType { + Liq, + /// Errors that suggest we maybe should skip trying to collect tcs for that pubkey + TcsCollectionHard, + /// Recording errors when some tcs have errors during collection but others don't + TcsCollectionPartial, + TcsExecution, +} + +impl std::fmt::Display for LiqErrorType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Liq => write!(f, "liq"), + Self::TcsCollectionHard => write!(f, "tcs-collection-hard"), + Self::TcsCollectionPartial => write!(f, "tcs-collection-partial"), + Self::TcsExecution => write!(f, "tcs-execution"), + } + } +} + struct LiquidationState { mango_client: Arc, account_fetcher: Arc, @@ -593,15 +589,7 @@ struct LiquidationState { liquidation_config: liquidate::Config, trigger_tcs_config: trigger_tcs::Config, - liq_errors: ErrorTracking, - /// Errors that suggest we maybe should skip trying to collect tcs for that pubkey - tcs_collection_hard_errors: ErrorTracking, - /// Recording errors when some tcs have errors during collection but others don't - tcs_collection_partial_errors: ErrorTracking, - tcs_execution_errors: ErrorTracking, - persistent_error_report_interval: Duration, - last_persistent_error_report: Instant, - persistent_error_min_duration: Duration, + errors: ErrorTracking, } impl LiquidationState { @@ -632,10 +620,12 @@ impl LiquidationState { async fn maybe_liquidate_and_log_error(&mut self, pubkey: &Pubkey) -> anyhow::Result { let now = Instant::now(); - let error_tracking = &mut self.liq_errors; + let error_tracking = &mut self.errors; // Skip a pubkey if there've been too many errors recently - if let Some(error_entry) = error_tracking.had_too_many_errors(pubkey, now) { + if let Some(error_entry) = + error_tracking.had_too_many_errors(LiqErrorType::Liq, pubkey, now) + { trace!( %pubkey, error_entry.count, @@ -654,7 +644,7 @@ impl LiquidationState { if let Err(err) = result.as_ref() { // Keep track of pubkeys that had errors - error_tracking.record_error(pubkey, now, err.to_string()); + error_tracking.record(LiqErrorType::Liq, pubkey, err.to_string()); // Not all errors need to be raised to the user's attention. let mut is_error = true; @@ -677,7 +667,7 @@ impl LiquidationState { trace!("liquidating account {}: {:?}", pubkey, err); } } else { - error_tracking.clear_errors(pubkey); + error_tracking.clear(LiqErrorType::Liq, pubkey); } result @@ -706,9 +696,9 @@ impl LiquidationState { // Find interesting (pubkey, tcsid, volume) let mut interesting_tcs = Vec::with_capacity(accounts.len()); for pubkey in accounts.iter() { - if let Some(error_entry) = self - .tcs_collection_hard_errors - .had_too_many_errors(pubkey, now) + if let Some(error_entry) = + self.errors + .had_too_many_errors(LiqErrorType::TcsCollectionHard, pubkey, now) { trace!( %pubkey, @@ -720,19 +710,20 @@ impl LiquidationState { match tcs_context.find_interesting_tcs_for_account(pubkey) { Ok(v) => { - self.tcs_collection_hard_errors.clear_errors(pubkey); + self.errors.clear(LiqErrorType::TcsCollectionHard, pubkey); if v.is_empty() { - self.tcs_collection_partial_errors.clear_errors(pubkey); - self.tcs_execution_errors.clear_errors(pubkey); + self.errors + .clear(LiqErrorType::TcsCollectionPartial, pubkey); + self.errors.clear(LiqErrorType::TcsExecution, pubkey); } else if v.iter().all(|it| it.is_ok()) { - self.tcs_collection_partial_errors.clear_errors(pubkey); + self.errors + .clear(LiqErrorType::TcsCollectionPartial, pubkey); } else { for it in v.iter() { if let Err(e) = it { - info!("error on tcs find_interesting: {:?}", e); - self.tcs_collection_partial_errors.record_error( + self.errors.record( + LiqErrorType::TcsCollectionPartial, pubkey, - now, e.to_string(), ); } @@ -741,8 +732,8 @@ impl LiquidationState { interesting_tcs.extend(v.iter().filter_map(|it| it.as_ref().ok())); } Err(e) => { - self.tcs_collection_hard_errors - .record_error(pubkey, now, e.to_string()); + self.errors + .record(LiqErrorType::TcsCollectionHard, pubkey, e.to_string()); } } } @@ -751,8 +742,11 @@ impl LiquidationState { } let (txsigs, mut changed_pubkeys) = tcs_context - .execute_tcs(&mut interesting_tcs, &mut self.tcs_execution_errors) + .execute_tcs(&mut interesting_tcs, &mut self.errors) .await?; + for pubkey in changed_pubkeys.iter() { + self.errors.clear(LiqErrorType::TcsExecution, pubkey); + } changed_pubkeys.push(self.mango_client.mango_account_address); // Force a refresh of affected accounts @@ -771,26 +765,6 @@ impl LiquidationState { Ok(true) } - - fn log_persistent_errors(&mut self) { - let now = Instant::now(); - if now.duration_since(self.last_persistent_error_report) - < self.persistent_error_report_interval - { - return; - } - self.last_persistent_error_report = now; - - let min_duration = self.persistent_error_min_duration; - self.liq_errors - .log_persistent_errors("liquidation", min_duration); - self.tcs_execution_errors - .log_persistent_errors("tcs execution", min_duration); - self.tcs_collection_hard_errors - .log_persistent_errors("tcs collection hard", min_duration); - self.tcs_collection_partial_errors - .log_persistent_errors("tcs collection partial", min_duration); - } } fn start_chain_data_metrics(chain: Arc>, metrics: &metrics::Metrics) { diff --git a/bin/liquidator/src/token_swap_info.rs b/bin/liquidator/src/token_swap_info.rs index 33ed288aa..e15fea2fd 100644 --- a/bin/liquidator/src/token_swap_info.rs +++ b/bin/liquidator/src/token_swap_info.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; use itertools::Itertools; +use mango_v4_client::error_tracking::ErrorTracking; use tracing::*; use mango_v4::state::TokenIndex; @@ -37,21 +38,31 @@ impl TokenSwapInfo { } } +struct TokenSwapInfoState { + swap_infos: HashMap, + errors: ErrorTracking, +} + /// Track the buy/sell slippage for tokens /// /// Needed to evaluate whether a token conditional swap premium might be good enough /// without having to query each time. pub struct TokenSwapInfoUpdater { mango_client: Arc, - swap_infos: RwLock>, + state: RwLock, config: Config, } +const ERROR_TYPE: &'static str = "tsi"; + impl TokenSwapInfoUpdater { pub fn new(mango_client: Arc, config: Config) -> Self { Self { mango_client, - swap_infos: RwLock::new(HashMap::new()), + state: RwLock::new(TokenSwapInfoState { + swap_infos: HashMap::new(), + errors: ErrorTracking::builder().build().unwrap(), + }), config, } } @@ -61,13 +72,13 @@ impl TokenSwapInfoUpdater { } fn update(&self, token_index: TokenIndex, slippage: TokenSwapInfo) { - let mut lock = self.swap_infos.write().unwrap(); - lock.insert(token_index, slippage); + let mut lock = self.state.write().unwrap(); + lock.swap_infos.insert(token_index, slippage); } pub fn swap_info(&self, token_index: TokenIndex) -> Option { - let lock = self.swap_infos.read().unwrap(); - lock.get(&token_index).cloned() + let lock = self.state.read().unwrap(); + lock.swap_infos.get(&token_index).cloned() } fn in_per_out_price(route: &jupiter::Quote) -> f64 { @@ -76,7 +87,26 @@ impl TokenSwapInfoUpdater { in_amount / out_amount } - pub async fn update_one(&self, token_index: TokenIndex) -> anyhow::Result<()> { + pub async fn update_one(&self, token_index: TokenIndex) { + { + let lock = self.state.read().unwrap(); + if lock + .errors + .had_too_many_errors(ERROR_TYPE, &token_index, std::time::Instant::now()) + .is_some() + { + return; + } + } + + if let Err(err) = self.try_update_one(token_index).await { + let mut lock = self.state.write().unwrap(); + lock.errors + .record(ERROR_TYPE, &token_index, err.to_string()); + } + } + + async fn try_update_one(&self, token_index: TokenIndex) -> anyhow::Result<()> { // since we're only quoting, the slippage does not matter let slippage = 100; @@ -155,6 +185,11 @@ impl TokenSwapInfoUpdater { } pub fn log_all(&self) { + { + let mut lock = self.state.write().unwrap(); + lock.errors.update(); + } + let mut tokens = self .mango_client .context @@ -163,11 +198,12 @@ impl TokenSwapInfoUpdater { .into_iter() .collect_vec(); tokens.sort_by(|a, b| a.0.cmp(&b.0)); - let infos = self.swap_infos.read().unwrap(); + let lock = self.state.read().unwrap(); let mut msg = String::new(); for (token, token_index) in tokens { - let info = infos + let info = lock + .swap_infos .get(&token_index) .map(|info| { format!( diff --git a/bin/liquidator/src/trigger_tcs.rs b/bin/liquidator/src/trigger_tcs.rs index a78cbb4f7..00ad4aadd 100644 --- a/bin/liquidator/src/trigger_tcs.rs +++ b/bin/liquidator/src/trigger_tcs.rs @@ -18,7 +18,7 @@ use solana_sdk::{signature::Signature, signer::Signer}; use tracing::*; use {fixed::types::I80F48, solana_sdk::pubkey::Pubkey}; -use crate::{token_swap_info, util, ErrorTracking}; +use crate::{token_swap_info, util, ErrorTracking, LiqErrorType}; /// When computing the max possible swap for a liqee, assume the price is this fraction worse for them. /// @@ -962,10 +962,9 @@ impl Context { pub async fn execute_tcs( &self, tcs: &mut [(Pubkey, u64, u64)], - error_tracking: &mut ErrorTracking, + error_tracking: &mut ErrorTracking, ) -> anyhow::Result<(Vec, Vec)> { use rand::distributions::{Distribution, WeightedError, WeightedIndex}; - let now = Instant::now(); let max_volume = self.config.max_trigger_quote_amount; let mut pending_volume = 0; @@ -1012,7 +1011,11 @@ impl Context { } Err(e) => { trace!(%result.pubkey, "preparation error {:?}", e); - error_tracking.record_error(&result.pubkey, now, e.to_string()); + error_tracking.record( + LiqErrorType::TcsExecution, + &result.pubkey, + e.to_string(), + ); } } no_new_job = false; @@ -1089,7 +1092,7 @@ impl Context { Ok(v) => Some((pubkey, v)), Err(err) => { trace!(%pubkey, "execution error {:?}", err); - error_tracking.record_error(&pubkey, Instant::now(), err.to_string()); + error_tracking.record(LiqErrorType::TcsExecution, &pubkey, err.to_string()); None } }); @@ -1104,10 +1107,12 @@ impl Context { pubkey: &Pubkey, tcs_id: u64, volume: u64, - error_tracking: &ErrorTracking, + error_tracking: &ErrorTracking, ) -> Option + Send>>> { // Skip a pubkey if there've been too many errors recently - if let Some(error_entry) = error_tracking.had_too_many_errors(pubkey, Instant::now()) { + if let Some(error_entry) = + error_tracking.had_too_many_errors(LiqErrorType::TcsExecution, pubkey, Instant::now()) + { trace!( "skip checking for tcs on account {pubkey}, had {} errors recently", error_entry.count diff --git a/bin/settler/src/main.rs b/bin/settler/src/main.rs index ae357518b..06b7b14b3 100644 --- a/bin/settler/src/main.rs +++ b/bin/settler/src/main.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; -use std::time::{Duration, Instant}; +use std::time::Duration; use anchor_client::Cluster; use clap::Parser; @@ -216,15 +216,12 @@ async fn main() -> anyhow::Result<()> { mango_client: mango_client.clone(), account_fetcher: account_fetcher.clone(), config: tcs_start::Config { - persistent_error_min_duration: Duration::from_secs(300), persistent_error_report_interval: Duration::from_secs(300), }, - errors: mango_v4_client::error_tracking::ErrorTracking { - skip_threshold: 2, - skip_duration: Duration::from_secs(60), - ..Default::default() - }, - last_persistent_error_report: Instant::now(), + errors: mango_v4_client::error_tracking::ErrorTracking::builder() + .skip_threshold(2) + .skip_duration(Duration::from_secs(60)) + .build()?, }; info!("main loop"); diff --git a/bin/settler/src/tcs_start.rs b/bin/settler/src/tcs_start.rs index 6c98eb012..d481e3946 100644 --- a/bin/settler/src/tcs_start.rs +++ b/bin/settler/src/tcs_start.rs @@ -12,7 +12,19 @@ use {fixed::types::I80F48, solana_sdk::pubkey::Pubkey}; pub struct Config { pub persistent_error_report_interval: Duration, - pub persistent_error_min_duration: Duration, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ErrorType { + StartTcs, +} + +impl std::fmt::Display for ErrorType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::StartTcs => write!(f, "start-tcs"), + } + } } pub struct State { @@ -20,8 +32,7 @@ pub struct State { pub account_fetcher: Arc, pub config: Config, - pub errors: ErrorTracking, - pub last_persistent_error_report: Instant, + pub errors: ErrorTracking, } impl State { @@ -33,23 +44,10 @@ impl State { } self.run_pass_inner(&accounts).await?; - self.log_persistent_errors(); + self.errors.update(); Ok(()) } - fn log_persistent_errors(&mut self) { - let now = Instant::now(); - if now.duration_since(self.last_persistent_error_report) - < self.config.persistent_error_report_interval - { - return; - } - self.last_persistent_error_report = now; - - let min_duration = self.config.persistent_error_min_duration; - self.errors.log_persistent_errors("start_tcs", min_duration); - } - async fn run_pass_inner(&mut self, accounts: &[Pubkey]) -> anyhow::Result<()> { let now_ts: u64 = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(); let now = Instant::now(); @@ -63,7 +61,11 @@ impl State { if account.fixed.group != mango_client.group() { continue; } - if self.errors.had_too_many_errors(account_key, now).is_some() { + if self + .errors + .had_too_many_errors(ErrorType::StartTcs, account_key, now) + .is_some() + { continue; } @@ -73,9 +75,9 @@ impl State { Ok(true) => {} Ok(false) => continue, Err(e) => { - self.errors.record_error( + self.errors.record( + ErrorType::StartTcs, account_key, - now, format!("error in is_tcs_startable: tcsid={}, {e:?}", tcs.id), ); } @@ -85,7 +87,7 @@ impl State { } if !had_tcs { - self.errors.clear_errors(account_key); + self.errors.clear(ErrorType::StartTcs, account_key); } } @@ -97,9 +99,9 @@ impl State { let ixs = match self.make_start_ix(pubkey, *tcs_id).await { Ok(v) => v, Err(e) => { - self.errors.record_error( + self.errors.record( + ErrorType::StartTcs, pubkey, - now, format!("error in make_start_ix: tcsid={tcs_id}, {e:?}"), ); continue; @@ -133,9 +135,9 @@ impl State { .iter() .filter_map(|(pk, tcs_id)| (pk == pubkey).then_some(tcs_id)) .collect_vec(); - self.errors.record_error( + self.errors.record( + ErrorType::StartTcs, pubkey, - now, format!("error sending transaction: tcsids={tcs_ids:?}, {e:?}"), ); } @@ -147,7 +149,7 @@ impl State { // clear errors on pubkeys with successes for pubkey in ix_targets.iter().map(|(pk, _)| pk).unique() { - self.errors.clear_errors(pubkey); + self.errors.clear(ErrorType::StartTcs, pubkey); } } diff --git a/lib/client/src/error_tracking.rs b/lib/client/src/error_tracking.rs index 73efb80c7..a466287ac 100644 --- a/lib/client/src/error_tracking.rs +++ b/lib/client/src/error_tracking.rs @@ -1,4 +1,3 @@ -use anchor_lang::prelude::Pubkey; use std::{ collections::HashMap, time::{Duration, Instant}, @@ -6,68 +5,177 @@ use std::{ use tracing::*; #[derive(Clone)] -pub struct AccountErrorState { - pub messages: Vec, +pub struct ErrorState { + pub errors: Vec, pub count: u64, pub last_at: Instant, } -#[derive(Default)] -pub struct ErrorTracking { - pub accounts: HashMap, +#[derive(Clone)] +struct ErrorTypeState { + state_by_key: HashMap, + + // override global + skip_threshold: Option, + skip_duration: Option, +} + +impl Default for ErrorTypeState { + fn default() -> Self { + Self { + state_by_key: Default::default(), + skip_threshold: None, + skip_duration: None, + } + } +} + +#[derive(Builder)] +pub struct ErrorTracking { + #[builder(setter(custom))] + errors_by_type: HashMap>, + + /// number of errors of a type after which had_too_many_errors returns true + #[builder(default = "2")] pub skip_threshold: u64, + + /// duration that had_too_many_errors returns true for after skip_threshold is reached + #[builder(default = "Duration::from_secs(60)")] pub skip_duration: Duration, + + #[builder(default = "3")] + pub unique_messages_to_keep: usize, + + /// after what time of no-errors may error info be wiped? + #[builder(default = "Duration::from_secs(300)")] + pub keep_duration: Duration, + + #[builder(setter(skip), default = "Instant::now()")] + last_log: Instant, + + #[builder(default = "Duration::from_secs(300)")] + pub log_interval: Duration, } -impl ErrorTracking { - pub fn had_too_many_errors(&self, pubkey: &Pubkey, now: Instant) -> Option { - if let Some(error_entry) = self.accounts.get(pubkey) { - if error_entry.count >= self.skip_threshold - && now.duration_since(error_entry.last_at) < self.skip_duration - { - Some(error_entry.clone()) - } else { - None - } - } else { - None +impl ErrorTrackingBuilder +where + ErrorType: Copy + std::hash::Hash + std::cmp::Eq + std::fmt::Display, +{ + pub fn skip_threshold_for_type(&mut self, error_type: ErrorType, threshold: u64) -> &mut Self { + if self.errors_by_type.is_none() { + self.errors_by_type = Some(Default::default()); } + let errors_by_type = self.errors_by_type.as_mut().unwrap(); + errors_by_type.entry(error_type).or_default().skip_threshold = Some(threshold); + self + } +} + +impl ErrorTracking +where + Key: Clone + std::hash::Hash + std::cmp::Eq + std::fmt::Display, + ErrorType: Copy + std::hash::Hash + std::cmp::Eq + std::fmt::Display, +{ + pub fn builder() -> ErrorTrackingBuilder { + ErrorTrackingBuilder::default() } - pub fn record_error(&mut self, pubkey: &Pubkey, now: Instant, message: String) { - let error_entry = self.accounts.entry(*pubkey).or_insert(AccountErrorState { - messages: Vec::with_capacity(1), - count: 0, - last_at: now, - }); - error_entry.count += 1; - error_entry.last_at = now; - if !error_entry.messages.contains(&message) { - error_entry.messages.push(message); - } - if error_entry.messages.len() > 5 { - error_entry.messages.remove(0); - } + fn should_skip( + &self, + state: &ErrorState, + error_type_state: &ErrorTypeState, + now: Instant, + ) -> bool { + let skip_threshold = error_type_state + .skip_threshold + .unwrap_or(self.skip_threshold); + let skip_duration = error_type_state.skip_duration.unwrap_or(self.skip_duration); + state.count >= skip_threshold && now.duration_since(state.last_at) < skip_duration } - pub fn clear_errors(&mut self, pubkey: &Pubkey) { - self.accounts.remove(pubkey); + pub fn had_too_many_errors( + &self, + error_type: ErrorType, + key: &Key, + now: Instant, + ) -> Option { + let error_type_state = self.errors_by_type.get(&error_type)?; + let state = error_type_state.state_by_key.get(key)?; + self.should_skip(state, error_type_state, now) + .then(|| state.clone()) } - #[instrument(skip_all, fields(%error_type))] - #[allow(unused_variables)] - pub fn log_persistent_errors(&self, error_type: &str, min_duration: Duration) { + pub fn record(&mut self, error_type: ErrorType, key: &Key, message: String) { let now = Instant::now(); - for (pubkey, errors) in self.accounts.iter() { - if now.duration_since(errors.last_at) < min_duration { - continue; + let state = self + .errors_by_type + .entry(error_type) + .or_default() + .state_by_key + .entry(key.clone()) + .or_insert(ErrorState { + errors: Vec::with_capacity(1), + count: 0, + last_at: now, + }); + state.count += 1; + state.last_at = now; + if let Some(pos) = state.errors.iter().position(|m| m == &message) { + state.errors.remove(pos); + } + state.errors.push(message); + if state.errors.len() > self.unique_messages_to_keep { + state.errors.remove(0); + } + + // log when skip threshold is reached the first time + if state.count == self.skip_threshold { + trace!(%error_type, %key, count = state.count, messages = ?state.errors, "had repeated errors, skipping..."); + } + } + + pub fn clear(&mut self, error_type: ErrorType, key: &Key) { + if let Some(error_type_state) = self.errors_by_type.get_mut(&error_type) { + error_type_state.state_by_key.remove(key); + } + } + + pub fn wipe_old(&mut self) { + let now = Instant::now(); + for error_type_state in self.errors_by_type.values_mut() { + error_type_state + .state_by_key + .retain(|_, state| now.duration_since(state.last_at) < self.keep_duration); + } + } + + /// Wipes old errors and occasionally logs errors that caused skipping + pub fn update(&mut self) { + let now = Instant::now(); + if now.duration_since(self.last_log) < self.log_interval { + return; + } + self.last_log = now; + self.wipe_old(); + self.log_error_skips(); + } + + /// Log all errors that cause skipping + pub fn log_error_skips(&self) { + let now = Instant::now(); + for (error_type, error_type_state) in self.errors_by_type.iter() { + let span = info_span!("log_error_skips", %error_type); + let _enter = span.enter(); + + for (key, state) in error_type_state.state_by_key.iter() { + if self.should_skip(state, error_type_state, now) { + info!( + %key, + count = state.count, + messages = ?state.errors, + ); + } } - info!( - %pubkey, - count = errors.count, - messages = ?errors.messages, - "has persistent errors", - ); } } }