Add --unhealthy_threshold option

This commit is contained in:
Michael Vines 2020-11-28 21:50:11 -08:00
parent d2a1ec3a05
commit 7cbc25a6fe
1 changed files with 44 additions and 24 deletions

View File

@ -28,12 +28,12 @@ use {
};
struct Config {
address_labels: HashMap<String, String>,
interval: Duration,
json_rpc_url: String,
validator_identity_pubkeys: Vec<Pubkey>,
no_duplicate_notifications: bool,
monitor_active_stake: bool,
address_labels: HashMap<String, String>,
unhealthy_threshold: usize,
validator_identity_pubkeys: Vec<Pubkey>,
}
fn get_config() -> Config {
@ -87,6 +87,14 @@ fn get_config() -> Config {
.default_value("60")
.help("Wait interval seconds between checking the cluster"),
)
.arg(
Arg::with_name("unhealthy_threshold")
.long("unhealthy-threshold")
.value_name("COUNT")
.takes_value(true)
.default_value("1")
.help("How many consecutive failures must occur to trigger a notification")
)
.arg(
Arg::with_name("validator_identities")
.long("validator-identity")
@ -97,10 +105,10 @@ fn get_config() -> Config {
.help("Validator identities to monitor for delinquency")
)
.arg(
// Deprecated parameter, now always enabled
Arg::with_name("no_duplicate_notifications")
.long("no-duplicate-notifications")
.takes_value(false)
.help("Subsequent identical notifications will be suppressed"),
.hidden(true)
)
.arg(
Arg::with_name("monitor_active_stake")
@ -117,6 +125,7 @@ fn get_config() -> Config {
};
let interval = Duration::from_secs(value_t_or_exit!(matches, "interval", u64));
let unhealthy_threshold = value_t_or_exit!(matches, "unhealthy_threshold", usize);
let json_rpc_url =
value_t!(matches, "json_rpc_url", String).unwrap_or_else(|_| config.json_rpc_url.clone());
let validator_identity_pubkeys: Vec<_> = pubkeys_of(&matches, "validator_identities")
@ -124,16 +133,15 @@ fn get_config() -> Config {
.into_iter()
.collect();
let no_duplicate_notifications = matches.is_present("no_duplicate_notifications");
let monitor_active_stake = matches.is_present("monitor_active_stake");
let config = Config {
address_labels: config.address_labels,
interval,
json_rpc_url,
validator_identity_pubkeys,
no_duplicate_notifications,
monitor_active_stake,
address_labels: config.address_labels,
unhealthy_threshold,
validator_identity_pubkeys,
};
info!("RPC URL: {}", config.json_rpc_url);
@ -179,6 +187,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
let mut last_transaction_count = 0;
let mut last_recent_blockhash = Hash::default();
let mut last_notification_msg = "".into();
let mut num_consecutive_failures = 0;
let mut last_success = Instant::now();
loop {
@ -243,7 +252,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
));
}
let mut errors = vec![];
let mut validator_errors = vec![];
for validator_identity in config.validator_identity_pubkeys.iter() {
let formatted_validator_identity = format_labeled_address(
&validator_identity.to_string(),
@ -254,13 +263,14 @@ fn main() -> Result<(), Box<dyn error::Error>> {
.iter()
.any(|vai| vai.node_pubkey == *validator_identity.to_string())
{
errors.push(format!("{} delinquent", formatted_validator_identity));
validator_errors
.push(format!("{} delinquent", formatted_validator_identity));
} else if !vote_accounts
.current
.iter()
.any(|vai| vai.node_pubkey == *validator_identity.to_string())
{
errors.push(format!("{} missing", formatted_validator_identity));
validator_errors.push(format!("{} missing", formatted_validator_identity));
}
if let Some(balance) = validator_balances.get(&validator_identity) {
@ -275,8 +285,8 @@ fn main() -> Result<(), Box<dyn error::Error>> {
}
}
if !errors.is_empty() {
failures.push(("delinquent", errors.join(",")));
if !validator_errors.is_empty() {
failures.push(("delinquent", validator_errors.join(",")));
}
for failure in failures.iter() {
@ -284,25 +294,34 @@ fn main() -> Result<(), Box<dyn error::Error>> {
}
failures.into_iter().next() // Only report the first failure if any
}
Err(err) => Some(("rpc", err.to_string())),
Err(err) => Some(("rpc-error", err.to_string())),
};
datapoint_info!("watchtower-sanity", ("ok", failure.is_none(), bool));
if let Some((failure_test_name, failure_error_message)) = &failure {
let notification_msg = format!(
"solana-watchtower: Error: {}: {}",
failure_test_name, failure_error_message
);
if !config.no_duplicate_notifications || last_notification_msg != notification_msg {
notifier.send(&notification_msg);
num_consecutive_failures += 1;
if num_consecutive_failures > config.unhealthy_threshold {
datapoint_info!("watchtower-sanity", ("ok", false, bool));
if last_notification_msg != notification_msg {
notifier.send(&notification_msg);
}
datapoint_error!(
"watchtower-sanity-failure",
("test", failure_test_name, String),
("err", failure_error_message, String)
);
last_notification_msg = notification_msg;
} else {
info!(
"Failure {} of {}: {}",
num_consecutive_failures, config.unhealthy_threshold, notification_msg
);
}
datapoint_error!(
"watchtower-sanity-failure",
("test", failure_test_name, String),
("err", failure_error_message, String)
);
last_notification_msg = notification_msg;
} else {
datapoint_info!("watchtower-sanity", ("ok", true, bool));
if !last_notification_msg.is_empty() {
let alarm_duration = Instant::now().duration_since(last_success);
let alarm_duration = alarm_duration - config.interval; // Subtract the period before the first error
@ -317,6 +336,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
}
last_notification_msg = "".into();
last_success = Instant::now();
num_consecutive_failures = 0;
}
sleep(config.interval);
}