Merge pull request #827 from blockworks-foundation/ckamm/persistent-errors
Liquidator: more useful logs by tracking repeated errors better
This commit is contained in:
commit
7d177fdb26
|
@ -368,29 +368,11 @@ async fn main() -> anyhow::Result<()> {
|
|||
trigger_tcs_config: tcs_config,
|
||||
rebalancer: rebalancer.clone(),
|
||||
token_swap_info: token_swap_info_updater.clone(),
|
||||
liq_errors: ErrorTracking {
|
||||
skip_threshold: 5,
|
||||
skip_duration: Duration::from_secs(120),
|
||||
..ErrorTracking::default()
|
||||
},
|
||||
tcs_collection_hard_errors: ErrorTracking {
|
||||
skip_threshold: 2,
|
||||
skip_duration: Duration::from_secs(120),
|
||||
..ErrorTracking::default()
|
||||
},
|
||||
tcs_collection_partial_errors: ErrorTracking {
|
||||
skip_threshold: 2,
|
||||
skip_duration: Duration::from_secs(120),
|
||||
..ErrorTracking::default()
|
||||
},
|
||||
tcs_execution_errors: ErrorTracking {
|
||||
skip_threshold: 2,
|
||||
skip_duration: Duration::from_secs(120),
|
||||
..ErrorTracking::default()
|
||||
},
|
||||
persistent_error_report_interval: Duration::from_secs(300),
|
||||
persistent_error_min_duration: Duration::from_secs(300),
|
||||
last_persistent_error_report: Instant::now(),
|
||||
errors: ErrorTracking::builder()
|
||||
.skip_threshold(2)
|
||||
.skip_threshold_for_type(LiqErrorType::Liq, 5)
|
||||
.skip_duration(Duration::from_secs(120))
|
||||
.build()?,
|
||||
});
|
||||
|
||||
info!("main loop");
|
||||
|
@ -482,6 +464,8 @@ async fn main() -> anyhow::Result<()> {
|
|||
state.mango_accounts.iter().cloned().collect_vec()
|
||||
};
|
||||
|
||||
liquidation.errors.update();
|
||||
|
||||
if must_rebalance || last_rebalance.elapsed() > rebalance_delay {
|
||||
if let Err(err) = liquidation.rebalancer.zero_all_non_quote().await {
|
||||
error!("failed to rebalance liqor: {:?}", err);
|
||||
|
@ -490,8 +474,6 @@ async fn main() -> anyhow::Result<()> {
|
|||
last_rebalance = Instant::now();
|
||||
}
|
||||
|
||||
liquidation.log_persistent_errors();
|
||||
|
||||
let liquidated = liquidation
|
||||
.maybe_liquidate_one(account_addresses.iter())
|
||||
.await
|
||||
|
@ -533,14 +515,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
let mut min_delay = tokio::time::interval(Duration::from_secs(1));
|
||||
for token_index in token_indexes {
|
||||
min_delay.tick().await;
|
||||
match token_swap_info_updater.update_one(token_index).await {
|
||||
Ok(()) => {}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to update token swap info for token {token_index}: {err:?}",
|
||||
);
|
||||
}
|
||||
}
|
||||
token_swap_info_updater.update_one(token_index).await;
|
||||
}
|
||||
token_swap_info_updater.log_all();
|
||||
}
|
||||
|
@ -585,6 +560,27 @@ struct SharedState {
|
|||
one_snapshot_done: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum LiqErrorType {
|
||||
Liq,
|
||||
/// Errors that suggest we maybe should skip trying to collect tcs for that pubkey
|
||||
TcsCollectionHard,
|
||||
/// Recording errors when some tcs have errors during collection but others don't
|
||||
TcsCollectionPartial,
|
||||
TcsExecution,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for LiqErrorType {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Liq => write!(f, "liq"),
|
||||
Self::TcsCollectionHard => write!(f, "tcs-collection-hard"),
|
||||
Self::TcsCollectionPartial => write!(f, "tcs-collection-partial"),
|
||||
Self::TcsExecution => write!(f, "tcs-execution"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct LiquidationState {
|
||||
mango_client: Arc<MangoClient>,
|
||||
account_fetcher: Arc<chain_data::AccountFetcher>,
|
||||
|
@ -593,15 +589,7 @@ struct LiquidationState {
|
|||
liquidation_config: liquidate::Config,
|
||||
trigger_tcs_config: trigger_tcs::Config,
|
||||
|
||||
liq_errors: ErrorTracking,
|
||||
/// Errors that suggest we maybe should skip trying to collect tcs for that pubkey
|
||||
tcs_collection_hard_errors: ErrorTracking,
|
||||
/// Recording errors when some tcs have errors during collection but others don't
|
||||
tcs_collection_partial_errors: ErrorTracking,
|
||||
tcs_execution_errors: ErrorTracking,
|
||||
persistent_error_report_interval: Duration,
|
||||
last_persistent_error_report: Instant,
|
||||
persistent_error_min_duration: Duration,
|
||||
errors: ErrorTracking<Pubkey, LiqErrorType>,
|
||||
}
|
||||
|
||||
impl LiquidationState {
|
||||
|
@ -632,10 +620,12 @@ impl LiquidationState {
|
|||
|
||||
async fn maybe_liquidate_and_log_error(&mut self, pubkey: &Pubkey) -> anyhow::Result<bool> {
|
||||
let now = Instant::now();
|
||||
let error_tracking = &mut self.liq_errors;
|
||||
let error_tracking = &mut self.errors;
|
||||
|
||||
// Skip a pubkey if there've been too many errors recently
|
||||
if let Some(error_entry) = error_tracking.had_too_many_errors(pubkey, now) {
|
||||
if let Some(error_entry) =
|
||||
error_tracking.had_too_many_errors(LiqErrorType::Liq, pubkey, now)
|
||||
{
|
||||
trace!(
|
||||
%pubkey,
|
||||
error_entry.count,
|
||||
|
@ -654,7 +644,7 @@ impl LiquidationState {
|
|||
|
||||
if let Err(err) = result.as_ref() {
|
||||
// Keep track of pubkeys that had errors
|
||||
error_tracking.record_error(pubkey, now, err.to_string());
|
||||
error_tracking.record(LiqErrorType::Liq, pubkey, err.to_string());
|
||||
|
||||
// Not all errors need to be raised to the user's attention.
|
||||
let mut is_error = true;
|
||||
|
@ -677,7 +667,7 @@ impl LiquidationState {
|
|||
trace!("liquidating account {}: {:?}", pubkey, err);
|
||||
}
|
||||
} else {
|
||||
error_tracking.clear_errors(pubkey);
|
||||
error_tracking.clear(LiqErrorType::Liq, pubkey);
|
||||
}
|
||||
|
||||
result
|
||||
|
@ -706,9 +696,9 @@ impl LiquidationState {
|
|||
// Find interesting (pubkey, tcsid, volume)
|
||||
let mut interesting_tcs = Vec::with_capacity(accounts.len());
|
||||
for pubkey in accounts.iter() {
|
||||
if let Some(error_entry) = self
|
||||
.tcs_collection_hard_errors
|
||||
.had_too_many_errors(pubkey, now)
|
||||
if let Some(error_entry) =
|
||||
self.errors
|
||||
.had_too_many_errors(LiqErrorType::TcsCollectionHard, pubkey, now)
|
||||
{
|
||||
trace!(
|
||||
%pubkey,
|
||||
|
@ -720,19 +710,20 @@ impl LiquidationState {
|
|||
|
||||
match tcs_context.find_interesting_tcs_for_account(pubkey) {
|
||||
Ok(v) => {
|
||||
self.tcs_collection_hard_errors.clear_errors(pubkey);
|
||||
self.errors.clear(LiqErrorType::TcsCollectionHard, pubkey);
|
||||
if v.is_empty() {
|
||||
self.tcs_collection_partial_errors.clear_errors(pubkey);
|
||||
self.tcs_execution_errors.clear_errors(pubkey);
|
||||
self.errors
|
||||
.clear(LiqErrorType::TcsCollectionPartial, pubkey);
|
||||
self.errors.clear(LiqErrorType::TcsExecution, pubkey);
|
||||
} else if v.iter().all(|it| it.is_ok()) {
|
||||
self.tcs_collection_partial_errors.clear_errors(pubkey);
|
||||
self.errors
|
||||
.clear(LiqErrorType::TcsCollectionPartial, pubkey);
|
||||
} else {
|
||||
for it in v.iter() {
|
||||
if let Err(e) = it {
|
||||
info!("error on tcs find_interesting: {:?}", e);
|
||||
self.tcs_collection_partial_errors.record_error(
|
||||
self.errors.record(
|
||||
LiqErrorType::TcsCollectionPartial,
|
||||
pubkey,
|
||||
now,
|
||||
e.to_string(),
|
||||
);
|
||||
}
|
||||
|
@ -741,8 +732,8 @@ impl LiquidationState {
|
|||
interesting_tcs.extend(v.iter().filter_map(|it| it.as_ref().ok()));
|
||||
}
|
||||
Err(e) => {
|
||||
self.tcs_collection_hard_errors
|
||||
.record_error(pubkey, now, e.to_string());
|
||||
self.errors
|
||||
.record(LiqErrorType::TcsCollectionHard, pubkey, e.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -751,8 +742,11 @@ impl LiquidationState {
|
|||
}
|
||||
|
||||
let (txsigs, mut changed_pubkeys) = tcs_context
|
||||
.execute_tcs(&mut interesting_tcs, &mut self.tcs_execution_errors)
|
||||
.execute_tcs(&mut interesting_tcs, &mut self.errors)
|
||||
.await?;
|
||||
for pubkey in changed_pubkeys.iter() {
|
||||
self.errors.clear(LiqErrorType::TcsExecution, pubkey);
|
||||
}
|
||||
changed_pubkeys.push(self.mango_client.mango_account_address);
|
||||
|
||||
// Force a refresh of affected accounts
|
||||
|
@ -771,26 +765,6 @@ impl LiquidationState {
|
|||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn log_persistent_errors(&mut self) {
|
||||
let now = Instant::now();
|
||||
if now.duration_since(self.last_persistent_error_report)
|
||||
< self.persistent_error_report_interval
|
||||
{
|
||||
return;
|
||||
}
|
||||
self.last_persistent_error_report = now;
|
||||
|
||||
let min_duration = self.persistent_error_min_duration;
|
||||
self.liq_errors
|
||||
.log_persistent_errors("liquidation", min_duration);
|
||||
self.tcs_execution_errors
|
||||
.log_persistent_errors("tcs execution", min_duration);
|
||||
self.tcs_collection_hard_errors
|
||||
.log_persistent_errors("tcs collection hard", min_duration);
|
||||
self.tcs_collection_partial_errors
|
||||
.log_persistent_errors("tcs collection partial", min_duration);
|
||||
}
|
||||
}
|
||||
|
||||
fn start_chain_data_metrics(chain: Arc<RwLock<chain_data::ChainData>>, metrics: &metrics::Metrics) {
|
||||
|
|
|
@ -2,6 +2,7 @@ use std::collections::HashMap;
|
|||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use itertools::Itertools;
|
||||
use mango_v4_client::error_tracking::ErrorTracking;
|
||||
use tracing::*;
|
||||
|
||||
use mango_v4::state::TokenIndex;
|
||||
|
@ -37,21 +38,31 @@ impl TokenSwapInfo {
|
|||
}
|
||||
}
|
||||
|
||||
struct TokenSwapInfoState {
|
||||
swap_infos: HashMap<TokenIndex, TokenSwapInfo>,
|
||||
errors: ErrorTracking<TokenIndex, &'static str>,
|
||||
}
|
||||
|
||||
/// Track the buy/sell slippage for tokens
|
||||
///
|
||||
/// Needed to evaluate whether a token conditional swap premium might be good enough
|
||||
/// without having to query each time.
|
||||
pub struct TokenSwapInfoUpdater {
|
||||
mango_client: Arc<MangoClient>,
|
||||
swap_infos: RwLock<HashMap<TokenIndex, TokenSwapInfo>>,
|
||||
state: RwLock<TokenSwapInfoState>,
|
||||
config: Config,
|
||||
}
|
||||
|
||||
const ERROR_TYPE: &'static str = "tsi";
|
||||
|
||||
impl TokenSwapInfoUpdater {
|
||||
pub fn new(mango_client: Arc<MangoClient>, config: Config) -> Self {
|
||||
Self {
|
||||
mango_client,
|
||||
swap_infos: RwLock::new(HashMap::new()),
|
||||
state: RwLock::new(TokenSwapInfoState {
|
||||
swap_infos: HashMap::new(),
|
||||
errors: ErrorTracking::builder().build().unwrap(),
|
||||
}),
|
||||
config,
|
||||
}
|
||||
}
|
||||
|
@ -61,13 +72,13 @@ impl TokenSwapInfoUpdater {
|
|||
}
|
||||
|
||||
fn update(&self, token_index: TokenIndex, slippage: TokenSwapInfo) {
|
||||
let mut lock = self.swap_infos.write().unwrap();
|
||||
lock.insert(token_index, slippage);
|
||||
let mut lock = self.state.write().unwrap();
|
||||
lock.swap_infos.insert(token_index, slippage);
|
||||
}
|
||||
|
||||
pub fn swap_info(&self, token_index: TokenIndex) -> Option<TokenSwapInfo> {
|
||||
let lock = self.swap_infos.read().unwrap();
|
||||
lock.get(&token_index).cloned()
|
||||
let lock = self.state.read().unwrap();
|
||||
lock.swap_infos.get(&token_index).cloned()
|
||||
}
|
||||
|
||||
fn in_per_out_price(route: &jupiter::Quote) -> f64 {
|
||||
|
@ -76,7 +87,26 @@ impl TokenSwapInfoUpdater {
|
|||
in_amount / out_amount
|
||||
}
|
||||
|
||||
pub async fn update_one(&self, token_index: TokenIndex) -> anyhow::Result<()> {
|
||||
pub async fn update_one(&self, token_index: TokenIndex) {
|
||||
{
|
||||
let lock = self.state.read().unwrap();
|
||||
if lock
|
||||
.errors
|
||||
.had_too_many_errors(ERROR_TYPE, &token_index, std::time::Instant::now())
|
||||
.is_some()
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(err) = self.try_update_one(token_index).await {
|
||||
let mut lock = self.state.write().unwrap();
|
||||
lock.errors
|
||||
.record(ERROR_TYPE, &token_index, err.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
async fn try_update_one(&self, token_index: TokenIndex) -> anyhow::Result<()> {
|
||||
// since we're only quoting, the slippage does not matter
|
||||
let slippage = 100;
|
||||
|
||||
|
@ -155,6 +185,11 @@ impl TokenSwapInfoUpdater {
|
|||
}
|
||||
|
||||
pub fn log_all(&self) {
|
||||
{
|
||||
let mut lock = self.state.write().unwrap();
|
||||
lock.errors.update();
|
||||
}
|
||||
|
||||
let mut tokens = self
|
||||
.mango_client
|
||||
.context
|
||||
|
@ -163,11 +198,12 @@ impl TokenSwapInfoUpdater {
|
|||
.into_iter()
|
||||
.collect_vec();
|
||||
tokens.sort_by(|a, b| a.0.cmp(&b.0));
|
||||
let infos = self.swap_infos.read().unwrap();
|
||||
let lock = self.state.read().unwrap();
|
||||
|
||||
let mut msg = String::new();
|
||||
for (token, token_index) in tokens {
|
||||
let info = infos
|
||||
let info = lock
|
||||
.swap_infos
|
||||
.get(&token_index)
|
||||
.map(|info| {
|
||||
format!(
|
||||
|
|
|
@ -18,7 +18,7 @@ use solana_sdk::{signature::Signature, signer::Signer};
|
|||
use tracing::*;
|
||||
use {fixed::types::I80F48, solana_sdk::pubkey::Pubkey};
|
||||
|
||||
use crate::{token_swap_info, util, ErrorTracking};
|
||||
use crate::{token_swap_info, util, ErrorTracking, LiqErrorType};
|
||||
|
||||
/// When computing the max possible swap for a liqee, assume the price is this fraction worse for them.
|
||||
///
|
||||
|
@ -962,10 +962,9 @@ impl Context {
|
|||
pub async fn execute_tcs(
|
||||
&self,
|
||||
tcs: &mut [(Pubkey, u64, u64)],
|
||||
error_tracking: &mut ErrorTracking,
|
||||
error_tracking: &mut ErrorTracking<Pubkey, LiqErrorType>,
|
||||
) -> anyhow::Result<(Vec<Signature>, Vec<Pubkey>)> {
|
||||
use rand::distributions::{Distribution, WeightedError, WeightedIndex};
|
||||
let now = Instant::now();
|
||||
|
||||
let max_volume = self.config.max_trigger_quote_amount;
|
||||
let mut pending_volume = 0;
|
||||
|
@ -1012,7 +1011,11 @@ impl Context {
|
|||
}
|
||||
Err(e) => {
|
||||
trace!(%result.pubkey, "preparation error {:?}", e);
|
||||
error_tracking.record_error(&result.pubkey, now, e.to_string());
|
||||
error_tracking.record(
|
||||
LiqErrorType::TcsExecution,
|
||||
&result.pubkey,
|
||||
e.to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
no_new_job = false;
|
||||
|
@ -1089,7 +1092,7 @@ impl Context {
|
|||
Ok(v) => Some((pubkey, v)),
|
||||
Err(err) => {
|
||||
trace!(%pubkey, "execution error {:?}", err);
|
||||
error_tracking.record_error(&pubkey, Instant::now(), err.to_string());
|
||||
error_tracking.record(LiqErrorType::TcsExecution, &pubkey, err.to_string());
|
||||
None
|
||||
}
|
||||
});
|
||||
|
@ -1104,10 +1107,12 @@ impl Context {
|
|||
pubkey: &Pubkey,
|
||||
tcs_id: u64,
|
||||
volume: u64,
|
||||
error_tracking: &ErrorTracking,
|
||||
error_tracking: &ErrorTracking<Pubkey, LiqErrorType>,
|
||||
) -> Option<Pin<Box<dyn Future<Output = PreparationResult> + Send>>> {
|
||||
// Skip a pubkey if there've been too many errors recently
|
||||
if let Some(error_entry) = error_tracking.had_too_many_errors(pubkey, Instant::now()) {
|
||||
if let Some(error_entry) =
|
||||
error_tracking.had_too_many_errors(LiqErrorType::TcsExecution, pubkey, Instant::now())
|
||||
{
|
||||
trace!(
|
||||
"skip checking for tcs on account {pubkey}, had {} errors recently",
|
||||
error_entry.count
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::time::Duration;
|
||||
|
||||
use anchor_client::Cluster;
|
||||
use clap::Parser;
|
||||
|
@ -216,15 +216,12 @@ async fn main() -> anyhow::Result<()> {
|
|||
mango_client: mango_client.clone(),
|
||||
account_fetcher: account_fetcher.clone(),
|
||||
config: tcs_start::Config {
|
||||
persistent_error_min_duration: Duration::from_secs(300),
|
||||
persistent_error_report_interval: Duration::from_secs(300),
|
||||
},
|
||||
errors: mango_v4_client::error_tracking::ErrorTracking {
|
||||
skip_threshold: 2,
|
||||
skip_duration: Duration::from_secs(60),
|
||||
..Default::default()
|
||||
},
|
||||
last_persistent_error_report: Instant::now(),
|
||||
errors: mango_v4_client::error_tracking::ErrorTracking::builder()
|
||||
.skip_threshold(2)
|
||||
.skip_duration(Duration::from_secs(60))
|
||||
.build()?,
|
||||
};
|
||||
|
||||
info!("main loop");
|
||||
|
|
|
@ -12,7 +12,19 @@ use {fixed::types::I80F48, solana_sdk::pubkey::Pubkey};
|
|||
|
||||
pub struct Config {
|
||||
pub persistent_error_report_interval: Duration,
|
||||
pub persistent_error_min_duration: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum ErrorType {
|
||||
StartTcs,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ErrorType {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::StartTcs => write!(f, "start-tcs"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct State {
|
||||
|
@ -20,8 +32,7 @@ pub struct State {
|
|||
pub account_fetcher: Arc<chain_data::AccountFetcher>,
|
||||
pub config: Config,
|
||||
|
||||
pub errors: ErrorTracking,
|
||||
pub last_persistent_error_report: Instant,
|
||||
pub errors: ErrorTracking<Pubkey, ErrorType>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
|
@ -33,23 +44,10 @@ impl State {
|
|||
}
|
||||
|
||||
self.run_pass_inner(&accounts).await?;
|
||||
self.log_persistent_errors();
|
||||
self.errors.update();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn log_persistent_errors(&mut self) {
|
||||
let now = Instant::now();
|
||||
if now.duration_since(self.last_persistent_error_report)
|
||||
< self.config.persistent_error_report_interval
|
||||
{
|
||||
return;
|
||||
}
|
||||
self.last_persistent_error_report = now;
|
||||
|
||||
let min_duration = self.config.persistent_error_min_duration;
|
||||
self.errors.log_persistent_errors("start_tcs", min_duration);
|
||||
}
|
||||
|
||||
async fn run_pass_inner(&mut self, accounts: &[Pubkey]) -> anyhow::Result<()> {
|
||||
let now_ts: u64 = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
|
||||
let now = Instant::now();
|
||||
|
@ -63,7 +61,11 @@ impl State {
|
|||
if account.fixed.group != mango_client.group() {
|
||||
continue;
|
||||
}
|
||||
if self.errors.had_too_many_errors(account_key, now).is_some() {
|
||||
if self
|
||||
.errors
|
||||
.had_too_many_errors(ErrorType::StartTcs, account_key, now)
|
||||
.is_some()
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -73,9 +75,9 @@ impl State {
|
|||
Ok(true) => {}
|
||||
Ok(false) => continue,
|
||||
Err(e) => {
|
||||
self.errors.record_error(
|
||||
self.errors.record(
|
||||
ErrorType::StartTcs,
|
||||
account_key,
|
||||
now,
|
||||
format!("error in is_tcs_startable: tcsid={}, {e:?}", tcs.id),
|
||||
);
|
||||
}
|
||||
|
@ -85,7 +87,7 @@ impl State {
|
|||
}
|
||||
|
||||
if !had_tcs {
|
||||
self.errors.clear_errors(account_key);
|
||||
self.errors.clear(ErrorType::StartTcs, account_key);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -97,9 +99,9 @@ impl State {
|
|||
let ixs = match self.make_start_ix(pubkey, *tcs_id).await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
self.errors.record_error(
|
||||
self.errors.record(
|
||||
ErrorType::StartTcs,
|
||||
pubkey,
|
||||
now,
|
||||
format!("error in make_start_ix: tcsid={tcs_id}, {e:?}"),
|
||||
);
|
||||
continue;
|
||||
|
@ -133,9 +135,9 @@ impl State {
|
|||
.iter()
|
||||
.filter_map(|(pk, tcs_id)| (pk == pubkey).then_some(tcs_id))
|
||||
.collect_vec();
|
||||
self.errors.record_error(
|
||||
self.errors.record(
|
||||
ErrorType::StartTcs,
|
||||
pubkey,
|
||||
now,
|
||||
format!("error sending transaction: tcsids={tcs_ids:?}, {e:?}"),
|
||||
);
|
||||
}
|
||||
|
@ -147,7 +149,7 @@ impl State {
|
|||
|
||||
// clear errors on pubkeys with successes
|
||||
for pubkey in ix_targets.iter().map(|(pk, _)| pk).unique() {
|
||||
self.errors.clear_errors(pubkey);
|
||||
self.errors.clear(ErrorType::StartTcs, pubkey);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
use anchor_lang::prelude::Pubkey;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
time::{Duration, Instant},
|
||||
|
@ -6,68 +5,177 @@ use std::{
|
|||
use tracing::*;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AccountErrorState {
|
||||
pub messages: Vec<String>,
|
||||
pub struct ErrorState {
|
||||
pub errors: Vec<String>,
|
||||
pub count: u64,
|
||||
pub last_at: Instant,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ErrorTracking {
|
||||
pub accounts: HashMap<Pubkey, AccountErrorState>,
|
||||
#[derive(Clone)]
|
||||
struct ErrorTypeState<Key> {
|
||||
state_by_key: HashMap<Key, ErrorState>,
|
||||
|
||||
// override global
|
||||
skip_threshold: Option<u64>,
|
||||
skip_duration: Option<Duration>,
|
||||
}
|
||||
|
||||
impl<Key> Default for ErrorTypeState<Key> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
state_by_key: Default::default(),
|
||||
skip_threshold: None,
|
||||
skip_duration: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Builder)]
|
||||
pub struct ErrorTracking<Key, ErrorType> {
|
||||
#[builder(setter(custom))]
|
||||
errors_by_type: HashMap<ErrorType, ErrorTypeState<Key>>,
|
||||
|
||||
/// number of errors of a type after which had_too_many_errors returns true
|
||||
#[builder(default = "2")]
|
||||
pub skip_threshold: u64,
|
||||
|
||||
/// duration that had_too_many_errors returns true for after skip_threshold is reached
|
||||
#[builder(default = "Duration::from_secs(60)")]
|
||||
pub skip_duration: Duration,
|
||||
|
||||
#[builder(default = "3")]
|
||||
pub unique_messages_to_keep: usize,
|
||||
|
||||
/// after what time of no-errors may error info be wiped?
|
||||
#[builder(default = "Duration::from_secs(300)")]
|
||||
pub keep_duration: Duration,
|
||||
|
||||
#[builder(setter(skip), default = "Instant::now()")]
|
||||
last_log: Instant,
|
||||
|
||||
#[builder(default = "Duration::from_secs(300)")]
|
||||
pub log_interval: Duration,
|
||||
}
|
||||
|
||||
impl ErrorTracking {
|
||||
pub fn had_too_many_errors(&self, pubkey: &Pubkey, now: Instant) -> Option<AccountErrorState> {
|
||||
if let Some(error_entry) = self.accounts.get(pubkey) {
|
||||
if error_entry.count >= self.skip_threshold
|
||||
&& now.duration_since(error_entry.last_at) < self.skip_duration
|
||||
{
|
||||
Some(error_entry.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
impl<Key, ErrorType> ErrorTrackingBuilder<Key, ErrorType>
|
||||
where
|
||||
ErrorType: Copy + std::hash::Hash + std::cmp::Eq + std::fmt::Display,
|
||||
{
|
||||
pub fn skip_threshold_for_type(&mut self, error_type: ErrorType, threshold: u64) -> &mut Self {
|
||||
if self.errors_by_type.is_none() {
|
||||
self.errors_by_type = Some(Default::default());
|
||||
}
|
||||
let errors_by_type = self.errors_by_type.as_mut().unwrap();
|
||||
errors_by_type.entry(error_type).or_default().skip_threshold = Some(threshold);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<Key, ErrorType> ErrorTracking<Key, ErrorType>
|
||||
where
|
||||
Key: Clone + std::hash::Hash + std::cmp::Eq + std::fmt::Display,
|
||||
ErrorType: Copy + std::hash::Hash + std::cmp::Eq + std::fmt::Display,
|
||||
{
|
||||
pub fn builder() -> ErrorTrackingBuilder<Key, ErrorType> {
|
||||
ErrorTrackingBuilder::default()
|
||||
}
|
||||
|
||||
pub fn record_error(&mut self, pubkey: &Pubkey, now: Instant, message: String) {
|
||||
let error_entry = self.accounts.entry(*pubkey).or_insert(AccountErrorState {
|
||||
messages: Vec::with_capacity(1),
|
||||
count: 0,
|
||||
last_at: now,
|
||||
});
|
||||
error_entry.count += 1;
|
||||
error_entry.last_at = now;
|
||||
if !error_entry.messages.contains(&message) {
|
||||
error_entry.messages.push(message);
|
||||
}
|
||||
if error_entry.messages.len() > 5 {
|
||||
error_entry.messages.remove(0);
|
||||
}
|
||||
fn should_skip(
|
||||
&self,
|
||||
state: &ErrorState,
|
||||
error_type_state: &ErrorTypeState<Key>,
|
||||
now: Instant,
|
||||
) -> bool {
|
||||
let skip_threshold = error_type_state
|
||||
.skip_threshold
|
||||
.unwrap_or(self.skip_threshold);
|
||||
let skip_duration = error_type_state.skip_duration.unwrap_or(self.skip_duration);
|
||||
state.count >= skip_threshold && now.duration_since(state.last_at) < skip_duration
|
||||
}
|
||||
|
||||
pub fn clear_errors(&mut self, pubkey: &Pubkey) {
|
||||
self.accounts.remove(pubkey);
|
||||
pub fn had_too_many_errors(
|
||||
&self,
|
||||
error_type: ErrorType,
|
||||
key: &Key,
|
||||
now: Instant,
|
||||
) -> Option<ErrorState> {
|
||||
let error_type_state = self.errors_by_type.get(&error_type)?;
|
||||
let state = error_type_state.state_by_key.get(key)?;
|
||||
self.should_skip(state, error_type_state, now)
|
||||
.then(|| state.clone())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(%error_type))]
|
||||
#[allow(unused_variables)]
|
||||
pub fn log_persistent_errors(&self, error_type: &str, min_duration: Duration) {
|
||||
pub fn record(&mut self, error_type: ErrorType, key: &Key, message: String) {
|
||||
let now = Instant::now();
|
||||
for (pubkey, errors) in self.accounts.iter() {
|
||||
if now.duration_since(errors.last_at) < min_duration {
|
||||
continue;
|
||||
let state = self
|
||||
.errors_by_type
|
||||
.entry(error_type)
|
||||
.or_default()
|
||||
.state_by_key
|
||||
.entry(key.clone())
|
||||
.or_insert(ErrorState {
|
||||
errors: Vec::with_capacity(1),
|
||||
count: 0,
|
||||
last_at: now,
|
||||
});
|
||||
state.count += 1;
|
||||
state.last_at = now;
|
||||
if let Some(pos) = state.errors.iter().position(|m| m == &message) {
|
||||
state.errors.remove(pos);
|
||||
}
|
||||
state.errors.push(message);
|
||||
if state.errors.len() > self.unique_messages_to_keep {
|
||||
state.errors.remove(0);
|
||||
}
|
||||
|
||||
// log when skip threshold is reached the first time
|
||||
if state.count == self.skip_threshold {
|
||||
trace!(%error_type, %key, count = state.count, messages = ?state.errors, "had repeated errors, skipping...");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn clear(&mut self, error_type: ErrorType, key: &Key) {
|
||||
if let Some(error_type_state) = self.errors_by_type.get_mut(&error_type) {
|
||||
error_type_state.state_by_key.remove(key);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wipe_old(&mut self) {
|
||||
let now = Instant::now();
|
||||
for error_type_state in self.errors_by_type.values_mut() {
|
||||
error_type_state
|
||||
.state_by_key
|
||||
.retain(|_, state| now.duration_since(state.last_at) < self.keep_duration);
|
||||
}
|
||||
}
|
||||
|
||||
/// Wipes old errors and occasionally logs errors that caused skipping
|
||||
pub fn update(&mut self) {
|
||||
let now = Instant::now();
|
||||
if now.duration_since(self.last_log) < self.log_interval {
|
||||
return;
|
||||
}
|
||||
self.last_log = now;
|
||||
self.wipe_old();
|
||||
self.log_error_skips();
|
||||
}
|
||||
|
||||
/// Log all errors that cause skipping
|
||||
pub fn log_error_skips(&self) {
|
||||
let now = Instant::now();
|
||||
for (error_type, error_type_state) in self.errors_by_type.iter() {
|
||||
let span = info_span!("log_error_skips", %error_type);
|
||||
let _enter = span.enter();
|
||||
|
||||
for (key, state) in error_type_state.state_by_key.iter() {
|
||||
if self.should_skip(state, error_type_state, now) {
|
||||
info!(
|
||||
%key,
|
||||
count = state.count,
|
||||
messages = ?state.errors,
|
||||
);
|
||||
}
|
||||
}
|
||||
info!(
|
||||
%pubkey,
|
||||
count = errors.count,
|
||||
messages = ?errors.messages,
|
||||
"has persistent errors",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue