From 7cbc25a6fe8d3766af67ce23afdba2822dbddd55 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Sat, 28 Nov 2020 21:50:11 -0800 Subject: [PATCH] Add --unhealthy_threshold option --- watchtower/src/main.rs | 68 +++++++++++++++++++++++++++--------------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/watchtower/src/main.rs b/watchtower/src/main.rs index 07aa3b48d3..785a998ac7 100644 --- a/watchtower/src/main.rs +++ b/watchtower/src/main.rs @@ -28,12 +28,12 @@ use { }; struct Config { + address_labels: HashMap, interval: Duration, json_rpc_url: String, - validator_identity_pubkeys: Vec, - no_duplicate_notifications: bool, monitor_active_stake: bool, - address_labels: HashMap, + unhealthy_threshold: usize, + validator_identity_pubkeys: Vec, } fn get_config() -> Config { @@ -87,6 +87,14 @@ fn get_config() -> Config { .default_value("60") .help("Wait interval seconds between checking the cluster"), ) + .arg( + Arg::with_name("unhealthy_threshold") + .long("unhealthy-threshold") + .value_name("COUNT") + .takes_value(true) + .default_value("1") + .help("How many consecutive failures must occur to trigger a notification") + ) .arg( Arg::with_name("validator_identities") .long("validator-identity") @@ -97,10 +105,10 @@ fn get_config() -> Config { .help("Validator identities to monitor for delinquency") ) .arg( + // Deprecated parameter, now always enabled Arg::with_name("no_duplicate_notifications") .long("no-duplicate-notifications") - .takes_value(false) - .help("Subsequent identical notifications will be suppressed"), + .hidden(true) ) .arg( Arg::with_name("monitor_active_stake") @@ -117,6 +125,7 @@ fn get_config() -> Config { }; let interval = Duration::from_secs(value_t_or_exit!(matches, "interval", u64)); + let unhealthy_threshold = value_t_or_exit!(matches, "unhealthy_threshold", usize); let json_rpc_url = value_t!(matches, "json_rpc_url", String).unwrap_or_else(|_| config.json_rpc_url.clone()); let validator_identity_pubkeys: Vec<_> = pubkeys_of(&matches, "validator_identities") @@ -124,16 +133,15 @@ fn get_config() -> Config { .into_iter() .collect(); - let no_duplicate_notifications = matches.is_present("no_duplicate_notifications"); let monitor_active_stake = matches.is_present("monitor_active_stake"); let config = Config { + address_labels: config.address_labels, interval, json_rpc_url, - validator_identity_pubkeys, - no_duplicate_notifications, monitor_active_stake, - address_labels: config.address_labels, + unhealthy_threshold, + validator_identity_pubkeys, }; info!("RPC URL: {}", config.json_rpc_url); @@ -179,6 +187,7 @@ fn main() -> Result<(), Box> { let mut last_transaction_count = 0; let mut last_recent_blockhash = Hash::default(); let mut last_notification_msg = "".into(); + let mut num_consecutive_failures = 0; let mut last_success = Instant::now(); loop { @@ -243,7 +252,7 @@ fn main() -> Result<(), Box> { )); } - let mut errors = vec![]; + let mut validator_errors = vec![]; for validator_identity in config.validator_identity_pubkeys.iter() { let formatted_validator_identity = format_labeled_address( &validator_identity.to_string(), @@ -254,13 +263,14 @@ fn main() -> Result<(), Box> { .iter() .any(|vai| vai.node_pubkey == *validator_identity.to_string()) { - errors.push(format!("{} delinquent", formatted_validator_identity)); + validator_errors + .push(format!("{} delinquent", formatted_validator_identity)); } else if !vote_accounts .current .iter() .any(|vai| vai.node_pubkey == *validator_identity.to_string()) { - errors.push(format!("{} missing", formatted_validator_identity)); + validator_errors.push(format!("{} missing", formatted_validator_identity)); } if let Some(balance) = validator_balances.get(&validator_identity) { @@ -275,8 +285,8 @@ fn main() -> Result<(), Box> { } } - if !errors.is_empty() { - failures.push(("delinquent", errors.join(","))); + if !validator_errors.is_empty() { + failures.push(("delinquent", validator_errors.join(","))); } for failure in failures.iter() { @@ -284,25 +294,34 @@ fn main() -> Result<(), Box> { } failures.into_iter().next() // Only report the first failure if any } - Err(err) => Some(("rpc", err.to_string())), + Err(err) => Some(("rpc-error", err.to_string())), }; - datapoint_info!("watchtower-sanity", ("ok", failure.is_none(), bool)); if let Some((failure_test_name, failure_error_message)) = &failure { let notification_msg = format!( "solana-watchtower: Error: {}: {}", failure_test_name, failure_error_message ); - if !config.no_duplicate_notifications || last_notification_msg != notification_msg { - notifier.send(¬ification_msg); + num_consecutive_failures += 1; + if num_consecutive_failures > config.unhealthy_threshold { + datapoint_info!("watchtower-sanity", ("ok", false, bool)); + if last_notification_msg != notification_msg { + notifier.send(¬ification_msg); + } + datapoint_error!( + "watchtower-sanity-failure", + ("test", failure_test_name, String), + ("err", failure_error_message, String) + ); + last_notification_msg = notification_msg; + } else { + info!( + "Failure {} of {}: {}", + num_consecutive_failures, config.unhealthy_threshold, notification_msg + ); } - datapoint_error!( - "watchtower-sanity-failure", - ("test", failure_test_name, String), - ("err", failure_error_message, String) - ); - last_notification_msg = notification_msg; } else { + datapoint_info!("watchtower-sanity", ("ok", true, bool)); if !last_notification_msg.is_empty() { let alarm_duration = Instant::now().duration_since(last_success); let alarm_duration = alarm_duration - config.interval; // Subtract the period before the first error @@ -317,6 +336,7 @@ fn main() -> Result<(), Box> { } last_notification_msg = "".into(); last_success = Instant::now(); + num_consecutive_failures = 0; } sleep(config.interval); }