Add wait-for-restart-window subcommand

This commit is contained in:
Michael Vines 2021-02-19 11:31:16 -08:00
parent 6f3964c8cb
commit 0dc482e987
3 changed files with 219 additions and 36 deletions

View File

@ -1,7 +1,10 @@
use {
crate::{get_validator_rpc_addr, get_validator_start_time},
crate::{
get_validator_rpc_addr, get_validator_start_time, new_spinner_progress_bar,
println_name_value,
},
console::style,
indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle},
indicatif::ProgressBar,
solana_client::{
client_error, rpc_client::RpcClient, rpc_request, rpc_response::RpcContactInfo,
},
@ -19,21 +22,6 @@ use {
},
};
/// Creates a new process bar for processing that will take an unknown amount of time
fn new_spinner_progress_bar() -> ProgressBar {
let progress_bar = ProgressBar::new(42);
progress_bar.set_draw_target(ProgressDrawTarget::stdout());
progress_bar
.set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}"));
progress_bar.enable_steady_tick(100);
progress_bar
}
/// Pretty print a "name value"
fn println_name_value(name: &str, value: &str) {
println!("{} {}", style(name).bold(), value);
}
pub struct Dashboard {
progress_bar: ProgressBar,
ledger_path: PathBuf,
@ -56,7 +44,6 @@ impl Dashboard {
Ok(Self {
progress_bar,
//ledger_path: ledger_path.clone().to_path_buf(),
ledger_path: ledger_path.to_path_buf(),
})
}

View File

@ -1,6 +1,8 @@
#![allow(clippy::integer_arithmetic)]
pub use solana_core::test_validator;
use {
console::style,
indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle},
log::*,
serde_derive::{Deserialize, Serialize},
std::{
@ -134,3 +136,18 @@ pub fn get_validator_rpc_addr(ledger_path: &Path) -> Result<Option<SocketAddr>,
pub fn get_validator_start_time(ledger_path: &Path) -> Result<SystemTime, io::Error> {
get_validator_process_info(ledger_path).map(|process_info| process_info.1)
}
/// Creates a new process bar for processing that will take an unknown amount of time
pub fn new_spinner_progress_bar() -> ProgressBar {
let progress_bar = ProgressBar::new(42);
progress_bar.set_draw_target(ProgressDrawTarget::stdout());
progress_bar
.set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}"));
progress_bar.enable_steady_tick(100);
progress_bar
}
/// Pretty print a "name value"
pub fn println_name_value(name: &str, value: &str) {
println!("{} {}", style(name).bold(), value);
}

View File

@ -3,6 +3,7 @@ use clap::{
crate_description, crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, App,
AppSettings, Arg, ArgMatches, SubCommand,
};
use console::style;
use fd_lock::FdLock;
use log::*;
use rand::{seq::SliceRandom, thread_rng, Rng};
@ -37,16 +38,19 @@ use solana_runtime::{
snapshot_utils::get_highest_snapshot_archive_path,
};
use solana_sdk::{
clock::Slot,
clock::{Slot, DEFAULT_S_PER_SLOT},
commitment_config::CommitmentConfig,
genesis_config::GenesisConfig,
hash::Hash,
pubkey::Pubkey,
signature::{Keypair, Signer},
};
use solana_validator::{dashboard::Dashboard, record_start, redirect_stderr_to_file};
use solana_validator::{
dashboard::Dashboard, get_validator_rpc_addr, new_spinner_progress_bar, println_name_value,
record_start, redirect_stderr_to_file,
};
use std::{
collections::HashSet,
collections::{HashSet, VecDeque},
env,
fs::{self, File},
net::{IpAddr, SocketAddr, TcpListener, UdpSocket},
@ -58,7 +62,7 @@ use std::{
Arc,
},
thread::sleep,
time::{Duration, Instant},
time::{Duration, Instant, SystemTime},
};
#[derive(Debug, PartialEq)]
@ -66,6 +70,148 @@ enum Operation {
Initialize,
Monitor,
Run,
WaitForRestartWindow { min_idle_time_in_minutes: usize },
}
fn wait_for_restart_window(
ledger_path: &Path,
min_idle_time_in_minutes: usize,
) -> Result<(), Box<dyn std::error::Error>> {
let min_idle_slots = (min_idle_time_in_minutes as f64 * 60. / DEFAULT_S_PER_SLOT) as Slot;
let rpc_addr = get_validator_rpc_addr(&ledger_path).map_err(|err| {
format!(
"Unable to read validator RPC address from {}: {}",
ledger_path.display(),
err
)
})?;
let rpc_client = match rpc_addr {
None => return Err("RPC not available".into()),
Some(rpc_addr) => RpcClient::new_socket(rpc_addr),
};
let identity = rpc_client.get_identity()?;
println_name_value("Identity:", &identity.to_string());
println_name_value(
"Minimum Idle Time:",
&format!(
"{} slots (~{} minutes)",
min_idle_slots, min_idle_time_in_minutes
),
);
let mut current_epoch = None;
let mut leader_schedule = VecDeque::new();
let mut restart_snapshot = None;
let progress_bar = new_spinner_progress_bar();
let monitor_start_time = SystemTime::now();
loop {
let snapshot_slot = rpc_client.get_snapshot_slot().ok();
let epoch_info = rpc_client.get_epoch_info_with_commitment(CommitmentConfig::processed())?;
let healthy = rpc_client.get_health().ok().is_some();
if match current_epoch {
None => true,
Some(current_epoch) => current_epoch != epoch_info.epoch,
} {
progress_bar.set_message(&format!(
"Fetching leader schedule for epoch {}...",
epoch_info.epoch
));
let first_slot_in_epoch = epoch_info.absolute_slot - epoch_info.slot_index;
leader_schedule = rpc_client
.get_leader_schedule(Some(first_slot_in_epoch))?
.ok_or_else(|| {
format!(
"Unable to get leader schedule from slot {}",
first_slot_in_epoch
)
})?
.get(&identity.to_string())
.cloned()
.unwrap_or_default()
.into_iter()
.map(|slot_index| first_slot_in_epoch.saturating_add(slot_index as u64))
.collect::<VecDeque<_>>();
current_epoch = Some(epoch_info.epoch);
}
let status = {
if !healthy {
style("Node is unhealthy").red().to_string()
} else {
// Wait until a hole in the leader schedule before restarting the node
let in_leader_schedule_hole =
if epoch_info.slot_index + min_idle_slots as u64 > epoch_info.slots_in_epoch {
Err("Current epoch is almost complete".to_string())
} else {
while leader_schedule
.get(0)
.map(|slot_index| *slot_index < epoch_info.absolute_slot)
.unwrap_or(false)
{
leader_schedule.pop_front();
}
match leader_schedule.get(0) {
None => {
Ok(()) // Validator has no leader slots
}
Some(next_leader_slot) => {
let idle_slots =
next_leader_slot.saturating_sub(epoch_info.absolute_slot);
if idle_slots >= min_idle_slots {
Ok(())
} else {
Err(format!(
"Validator will be leader for soon. Next leader slot is {}",
next_leader_slot
))
}
}
}
};
match in_leader_schedule_hole {
Ok(_) => {
if restart_snapshot == None {
restart_snapshot = snapshot_slot;
}
if restart_snapshot == snapshot_slot {
"Waiting for a new snapshot".to_string()
} else {
break; // Restart!
}
}
Err(why) => why,
}
}
};
progress_bar.set_message(&format!(
"{} | Processed Slot: {} | {}",
{
let elapsed =
chrono::Duration::from_std(monitor_start_time.elapsed().unwrap()).unwrap();
format!(
"{:02}:{:02}:{:02}",
elapsed.num_hours(),
elapsed.num_minutes() % 60,
elapsed.num_seconds() % 60
)
},
epoch_info.absolute_slot,
status
));
std::thread::sleep(Duration::from_secs(1))
}
drop(progress_bar);
println!("{}", style("Ready to restart").green());
Ok(())
}
fn port_range_validator(port_range: String) -> Result<(), String> {
@ -1482,12 +1628,33 @@ pub fn main() {
SubCommand::with_name("monitor")
.about("Monitor the validator")
)
.subcommand(
SubCommand::with_name("wait-for-restart-window")
.about("Monitor the validator for a good time to restart")
.arg(
Arg::with_name("min_idle_time_in_minutes")
.takes_value(true)
.index(1)
.validator(is_parsable::<usize>)
.default_value("10")
.help("Minimum time that the validator should not be leader")
)
.after_help("Note: If this command exits with a non-zero status \
then this not a good time for a restart")
)
.get_matches();
let operation = match matches.subcommand().0 {
"" | "run" => Operation::Run,
"init" => Operation::Initialize,
"monitor" => Operation::Monitor,
let operation = match matches.subcommand() {
("", _) | ("run", _) => Operation::Run,
("init", _) => Operation::Initialize,
("monitor", _) => Operation::Monitor,
("wait-for-restart-window", Some(subcommand_matches)) => Operation::WaitForRestartWindow {
min_idle_time_in_minutes: value_t_or_exit!(
subcommand_matches,
"min_idle_time_in_minutes",
usize
),
},
_ => unreachable!(),
};
@ -1841,16 +2008,28 @@ pub fn main() {
})
});
if operation == Operation::Monitor {
let dashboard = Dashboard::new(&ledger_path, None).unwrap_or_else(|err| {
println!(
"Error: Unable to connect to validator at {}: {:?}",
ledger_path.display(),
err,
);
exit(1);
});
dashboard.run();
match operation {
Operation::Monitor => {
let dashboard = Dashboard::new(&ledger_path, None).unwrap_or_else(|err| {
println!(
"Error: Unable to connect to validator at {}: {:?}",
ledger_path.display(),
err,
);
exit(1);
});
dashboard.run();
}
Operation::WaitForRestartWindow {
min_idle_time_in_minutes,
} => {
wait_for_restart_window(&ledger_path, min_idle_time_in_minutes).unwrap_or_else(|err| {
println!("{}", err);
exit(1);
});
exit(0);
}
Operation::Initialize | Operation::Run => {}
}
let mut ledger_fd_lock = FdLock::new(fs::File::open(&ledger_path).unwrap());