From f34b8643c74b355dc24a63eb4e55e8d886de4085 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 5 Feb 2021 22:39:23 -0800 Subject: [PATCH] Add |solana-validator monitor| subcommand (#15118) --- Cargo.lock | 4 +- validator/Cargo.toml | 4 +- validator/src/bin/solana-test-validator.rs | 214 ++++--------------- validator/src/dashboard.rs | 228 +++++++++++++++++++++ validator/src/lib.rs | 60 +++++- validator/src/main.rs | 40 +++- 6 files changed, 375 insertions(+), 175 deletions(-) create mode 100644 validator/src/dashboard.rs diff --git a/Cargo.lock b/Cargo.lock index 72496cce35..96540cd898 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5372,7 +5372,9 @@ dependencies = [ "log 0.4.11", "num_cpus", "rand 0.7.3", - "serde_json", + "serde", + "serde_derive", + "serde_yaml", "signal-hook", "solana-clap-utils", "solana-cli-config", diff --git a/validator/Cargo.toml b/validator/Cargo.toml index e7afaa96d4..727edc60c3 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -21,7 +21,9 @@ indicatif = "0.15.0" log = "0.4.11" num_cpus = "1.13.0" rand = "0.7.0" -serde_json = "1.0.56" +serde = "1.0.112" +serde_derive = "1.0.103" +serde_yaml = "0.8.13" solana-clap-utils = { path = "../clap-utils", version = "1.6.0" } solana-cli-config = { path = "../cli-config", version = "1.6.0" } solana-client = { path = "../client", version = "1.6.0" } diff --git a/validator/src/bin/solana-test-validator.rs b/validator/src/bin/solana-test-validator.rs index 1049a354f9..13f6674126 100644 --- a/validator/src/bin/solana-test-validator.rs +++ b/validator/src/bin/solana-test-validator.rs @@ -1,8 +1,6 @@ use { clap::{value_t, value_t_or_exit, App, Arg}, - console::style, fd_lock::FdLock, - indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}, solana_clap_utils::{ input_parsers::{pubkey_of, pubkeys_of}, input_validators::{ @@ -10,20 +8,21 @@ use { normalize_to_url_if_moniker, }, }, - solana_client::{client_error, rpc_client::RpcClient, rpc_request}, + solana_client::rpc_client::RpcClient, solana_core::rpc::JsonRpcConfig, solana_faucet::faucet::{run_local_faucet_with_port, FAUCET_PORT}, solana_sdk::{ account::Account, - clock::{Slot, DEFAULT_TICKS_PER_SLOT, MS_PER_TICK}, - commitment_config::CommitmentConfig, - native_token::{sol_to_lamports, Sol}, + clock::Slot, + native_token::sol_to_lamports, pubkey::Pubkey, rpc_port, signature::{read_keypair_file, write_keypair_file, Keypair, Signer}, system_program, }, - solana_validator::{redirect_stderr_to_file, test_validator::*}, + solana_validator::{ + dashboard::Dashboard, record_start, redirect_stderr_to_file, test_validator::*, + }, std::{ collections::HashSet, fs, io, @@ -31,8 +30,7 @@ use { path::{Path, PathBuf}, process::exit, sync::mpsc::channel, - thread, - time::{Duration, Instant, SystemTime, UNIX_EPOCH}, + time::{SystemTime, UNIX_EPOCH}, }, }; @@ -43,21 +41,6 @@ enum Output { Dashboard, } -/// Creates a new process bar for processing that will take an unknown amount of time -fn new_spinner_progress_bar() -> ProgressBar { - let progress_bar = ProgressBar::new(42); - progress_bar.set_draw_target(ProgressDrawTarget::stdout()); - progress_bar - .set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}")); - progress_bar.enable_steady_tick(100); - progress_bar -} - -/// Pretty print a "name value" -fn println_name_value(name: &str, value: &str) { - println!("{} {}", style(name).bold(), value); -} - fn main() { let default_rpc_port = rpc_port::DEFAULT_RPC_PORT.to_string(); @@ -289,7 +272,7 @@ fn main() { let mut ledger_fd_lock = FdLock::new(fs::File::open(&ledger_path).unwrap()); let _ledger_lock = ledger_fd_lock.try_lock().unwrap_or_else(|_| { println!( - "Error: Unable to lock {} directory. Check if another solana-test-validator is running", + "Error: Unable to lock {} directory. Check if another validator is running", ledger_path.display() ); exit(1); @@ -342,6 +325,7 @@ fn main() { }, ); } + let faucet_keypair = read_keypair_file(faucet_keypair_file.to_str().unwrap()).unwrap_or_else(|err| { println!( @@ -351,26 +335,38 @@ fn main() { ); exit(1); }); + let faucet_pubkey = faucet_keypair.pubkey(); - let validator_start = Instant::now(); + if let Some(faucet_addr) = &faucet_addr { + let (sender, receiver) = channel(); + run_local_faucet_with_port(faucet_keypair, sender, None, faucet_addr.port()); + let _ = receiver.recv().expect("run faucet").unwrap_or_else(|err| { + println!("Error: failed to start faucet: {}", err); + exit(1); + }); + } + + record_start( + &ledger_path, + Some(&SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + rpc_port, + )), + ) + .unwrap_or_else(|err| println!("Error: failed to record validator start: {}", err)); + + let dashboard = if output == Output::Dashboard { + Some(Dashboard::new(&ledger_path, Some(&validator_log_symlink)).unwrap()) + } else { + None + }; let test_validator = { - let _progress_bar = if output == Output::Dashboard { - println_name_value("Mint address:", &mint_address.to_string()); - println_name_value("Ledger location:", &format!("{}", ledger_path.display())); - println_name_value("Log:", &format!("{}", validator_log_symlink.display())); - let progress_bar = new_spinner_progress_bar(); - progress_bar.set_message("Initializing..."); - Some(progress_bar) - } else { - None - }; - let mut genesis = TestValidatorGenesis::default(); genesis .ledger_path(&ledger_path) .add_account( - faucet_keypair.pubkey(), + faucet_pubkey, Account::new(faucet_lamports, 0, &system_program::id()), ) .rpc_config(JsonRpcConfig { @@ -396,143 +392,19 @@ fn main() { genesis.warp_slot(warp_slot); } genesis.start_with_mint_address(mint_address) - } - .unwrap_or_else(|err| { - println!("Error: failed to start validator: {}", err); - exit(1); - }); + }; - if let Some(faucet_addr) = &faucet_addr { - let (sender, receiver) = channel(); - run_local_faucet_with_port(faucet_keypair, sender, None, faucet_addr.port()); - let _ = receiver.recv().expect("run faucet").unwrap_or_else(|err| { - println!("Error: failed to start faucet: {}", err); + match test_validator { + Ok(_test_validator) => match dashboard { + Some(dashboard) => dashboard.run(), + None => std::thread::park(), + }, + Err(err) => { + drop(dashboard); + println!("Error: failed to start validator: {}", err); exit(1); - }); - } - - if output == Output::Dashboard { - let rpc_client = test_validator.rpc_client().0; - let identity = &rpc_client.get_identity().expect("get_identity"); - println_name_value("Identity:", &identity.to_string()); - println_name_value( - "Version:", - &rpc_client.get_version().expect("get_version").solana_core, - ); - println_name_value("JSON RPC URL:", &test_validator.rpc_url()); - println_name_value( - "JSON RPC PubSub Websocket:", - &test_validator.rpc_pubsub_url(), - ); - println_name_value("Gossip Address:", &test_validator.gossip().to_string()); - println_name_value("TPU Address:", &test_validator.tpu().to_string()); - if let Some(faucet_addr) = &faucet_addr { - println_name_value( - "Faucet Address:", - &format!("{}:{}", &test_validator.gossip().ip(), faucet_addr.port()), - ); - } - - let progress_bar = new_spinner_progress_bar(); - - fn get_validator_stats( - rpc_client: &RpcClient, - identity: &Pubkey, - ) -> client_error::Result<(Slot, Slot, Slot, u64, Sol, String)> { - let processed_slot = - rpc_client.get_slot_with_commitment(CommitmentConfig::processed())?; - let confirmed_slot = - rpc_client.get_slot_with_commitment(CommitmentConfig::confirmed())?; - let finalized_slot = - rpc_client.get_slot_with_commitment(CommitmentConfig::finalized())?; - let transaction_count = - rpc_client.get_transaction_count_with_commitment(CommitmentConfig::processed())?; - let identity_balance = rpc_client - .get_balance_with_commitment(identity, CommitmentConfig::confirmed())? - .value; - - let health = match rpc_client.get_health() { - Ok(()) => "ok".to_string(), - Err(err) => { - if let client_error::ClientErrorKind::RpcError( - rpc_request::RpcError::RpcResponseError { - code: _, - message: _, - data: - rpc_request::RpcResponseErrorData::NodeUnhealthy { - num_slots_behind: Some(num_slots_behind), - }, - }, - ) = &err.kind - { - format!("{} slots behind", num_slots_behind) - } else { - "unhealthy".to_string() - } - } - }; - - Ok(( - processed_slot, - confirmed_slot, - finalized_slot, - transaction_count, - Sol(identity_balance), - health, - )) - } - - loop { - let snapshot_slot = rpc_client.get_snapshot_slot().ok(); - - for _i in 0..10 { - match get_validator_stats(&rpc_client, &identity) { - Ok(( - processed_slot, - confirmed_slot, - finalized_slot, - transaction_count, - identity_balance, - health, - )) => { - let uptime = chrono::Duration::from_std(validator_start.elapsed()).unwrap(); - - progress_bar.set_message(&format!( - "{:02}:{:02}:{:02} \ - {}| \ - Processed Slot: {} | Confirmed Slot: {} | Finalized Slot: {} | \ - Snapshot Slot: {} | \ - Transactions: {} | {}", - uptime.num_hours(), - uptime.num_minutes() % 60, - uptime.num_seconds() % 60, - if health == "ok" { - "".to_string() - } else { - format!("| {} ", style(health).bold().red()) - }, - processed_slot, - confirmed_slot, - finalized_slot, - snapshot_slot - .map(|s| s.to_string()) - .unwrap_or_else(|| "-".to_string()), - transaction_count, - identity_balance - )); - } - Err(err) => { - progress_bar.set_message(&format!("{}", err)); - } - } - thread::sleep(Duration::from_millis( - MS_PER_TICK * DEFAULT_TICKS_PER_SLOT / 2, - )); - } } } - - std::thread::park(); } fn remove_directory_contents(ledger_path: &Path) -> Result<(), io::Error> { diff --git a/validator/src/dashboard.rs b/validator/src/dashboard.rs new file mode 100644 index 0000000000..f18364c0d2 --- /dev/null +++ b/validator/src/dashboard.rs @@ -0,0 +1,228 @@ +use { + crate::{get_validator_rpc_addr, get_validator_start_time}, + console::style, + indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}, + solana_client::{ + client_error, rpc_client::RpcClient, rpc_request, rpc_response::RpcContactInfo, + }, + solana_sdk::{ + clock::{Slot, DEFAULT_TICKS_PER_SLOT, MS_PER_TICK}, + commitment_config::CommitmentConfig, + native_token::Sol, + pubkey::Pubkey, + }, + std::{ + io, + path::{Path, PathBuf}, + thread, + time::Duration, + }, +}; + +/// Creates a new process bar for processing that will take an unknown amount of time +fn new_spinner_progress_bar() -> ProgressBar { + let progress_bar = ProgressBar::new(42); + progress_bar.set_draw_target(ProgressDrawTarget::stdout()); + progress_bar + .set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}")); + progress_bar.enable_steady_tick(100); + progress_bar +} + +/// Pretty print a "name value" +fn println_name_value(name: &str, value: &str) { + println!("{} {}", style(name).bold(), value); +} + +pub struct Dashboard { + progress_bar: ProgressBar, + ledger_path: PathBuf, +} + +impl Dashboard { + pub fn new(ledger_path: &Path, log_path: Option<&Path>) -> Result { + println_name_value("Ledger location:", &format!("{}", ledger_path.display())); + if let Some(log_path) = log_path { + println_name_value("Log:", &format!("{}", log_path.display())); + } + + let rpc_addr = get_validator_rpc_addr(&ledger_path)?; + if rpc_addr.is_none() { + return Err(io::Error::new(io::ErrorKind::Other, "RPC not available")); + } + + let progress_bar = new_spinner_progress_bar(); + progress_bar.set_message("Initializing..."); + + Ok(Self { + progress_bar, + //ledger_path: ledger_path.clone().to_path_buf(), + ledger_path: ledger_path.to_path_buf(), + }) + } + + pub fn run(self) -> ! { + let Self { + progress_bar, + ledger_path, + } = self; + + progress_bar.set_message("Connecting..."); + + let rpc_addr = get_validator_rpc_addr(&ledger_path).unwrap().unwrap(); + let rpc_client = RpcClient::new_socket(rpc_addr); + + // Wait until RPC starts responding... + loop { + match rpc_client.get_identity() { + Ok(_) => break, + Err(err) => { + progress_bar.set_message(&format!("{}", err)); + thread::sleep(Duration::from_millis(500)); + } + } + } + + drop(progress_bar); + + let identity = &rpc_client.get_identity().expect("get_identity"); + + println_name_value("Identity:", &identity.to_string()); + + fn get_contact_info(rpc_client: &RpcClient, identity: &Pubkey) -> Option { + rpc_client + .get_cluster_nodes() + .ok() + .unwrap_or_default() + .into_iter() + .find(|node| node.pubkey == identity.to_string()) + } + + if let Some(contact_info) = get_contact_info(&rpc_client, &identity) { + println_name_value( + "Version:", + &contact_info.version.unwrap_or_else(|| "?".to_string()), + ); + if let Some(gossip) = contact_info.gossip { + println_name_value("Gossip Address:", &gossip.to_string()); + } + if let Some(tpu) = contact_info.tpu { + println_name_value("TPU Address:", &tpu.to_string()); + } + if let Some(rpc) = contact_info.rpc { + println_name_value("JSON RPC URL:", &format!("http://{}", rpc.to_string())); + } + } + + let progress_bar = new_spinner_progress_bar(); + + fn get_validator_stats( + rpc_client: &RpcClient, + identity: &Pubkey, + ) -> client_error::Result<(Slot, Slot, Slot, u64, Sol, String)> { + let processed_slot = + rpc_client.get_slot_with_commitment(CommitmentConfig::processed())?; + let confirmed_slot = + rpc_client.get_slot_with_commitment(CommitmentConfig::confirmed())?; + let finalized_slot = + rpc_client.get_slot_with_commitment(CommitmentConfig::finalized())?; + let transaction_count = + rpc_client.get_transaction_count_with_commitment(CommitmentConfig::processed())?; + let identity_balance = rpc_client + .get_balance_with_commitment(identity, CommitmentConfig::confirmed())? + .value; + + let health = match rpc_client.get_health() { + Ok(()) => "ok".to_string(), + Err(err) => { + if let client_error::ClientErrorKind::RpcError( + rpc_request::RpcError::RpcResponseError { + code: _, + message: _, + data: + rpc_request::RpcResponseErrorData::NodeUnhealthy { + num_slots_behind: Some(num_slots_behind), + }, + }, + ) = &err.kind + { + format!("{} slots behind", num_slots_behind) + } else { + "unhealthy".to_string() + } + } + }; + + Ok(( + processed_slot, + confirmed_slot, + finalized_slot, + transaction_count, + Sol(identity_balance), + health, + )) + } + + let mut start_time = get_validator_start_time(&ledger_path).ok(); + loop { + let snapshot_slot = rpc_client.get_snapshot_slot().ok(); + + for _i in 0..10 { + match get_validator_stats(&rpc_client, &identity) { + Ok(( + processed_slot, + confirmed_slot, + finalized_slot, + transaction_count, + identity_balance, + health, + )) => { + let uptime = match start_time { + Some(start_time) => { + let uptime = + chrono::Duration::from_std(start_time.elapsed().unwrap()) + .unwrap(); + + format!( + "{:02}:{:02}:{:02} ", + uptime.num_hours(), + uptime.num_minutes() % 60, + uptime.num_seconds() % 60 + ) + } + None => " ".to_string(), + }; + + progress_bar.set_message(&format!( + "{}{}| \ + Processed Slot: {} | Confirmed Slot: {} | Finalized Slot: {} | \ + Snapshot Slot: {} | \ + Transactions: {} | {}", + uptime, + if health == "ok" { + "".to_string() + } else { + format!("| {} ", style(health).bold().red()) + }, + processed_slot, + confirmed_slot, + finalized_slot, + snapshot_slot + .map(|s| s.to_string()) + .unwrap_or_else(|| "-".to_string()), + transaction_count, + identity_balance + )); + } + Err(err) => { + start_time = get_validator_start_time(&ledger_path).ok(); + progress_bar.set_message(&format!("{}", err)); + } + } + thread::sleep(Duration::from_millis( + MS_PER_TICK * DEFAULT_TICKS_PER_SLOT / 2, + )); + } + } + } +} diff --git a/validator/src/lib.rs b/validator/src/lib.rs index e9793281e4..2177613190 100644 --- a/validator/src/lib.rs +++ b/validator/src/lib.rs @@ -1,9 +1,21 @@ pub use solana_core::test_validator; use { log::*, - std::{env, process::exit, thread::JoinHandle}, + serde_derive::{Deserialize, Serialize}, + std::{ + env, + fs::{self, File}, + io::{self, Write}, + net::SocketAddr, + path::Path, + process::exit, + thread::JoinHandle, + time::{Duration, SystemTime}, + }, }; +pub mod dashboard; + #[cfg(unix)] fn redirect_stderr(filename: &str) { use std::{fs::OpenOptions, os::unix::io::AsRawFd}; @@ -75,3 +87,49 @@ pub fn port_validator(port: String) -> Result<(), String> { .map(|_| ()) .map_err(|e| format!("{:?}", e)) } + +#[derive(Serialize, Deserialize, Clone, Debug)] +struct ProcessInfo { + rpc_addr: Option, // RPC port to contact the validator at + start_time: u64, // Seconds since the UNIX_EPOCH for when the validator was started +} + +pub fn record_start(ledger_path: &Path, rpc_addr: Option<&SocketAddr>) -> Result<(), io::Error> { + if !ledger_path.exists() { + fs::create_dir_all(&ledger_path)?; + } + + let start_info = ProcessInfo { + rpc_addr: rpc_addr.cloned(), + start_time: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(), + }; + + let serialized = serde_yaml::to_string(&start_info) + .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?; + + let mut file = File::create(ledger_path.join("process-info.yml"))?; + file.write_all(&serialized.into_bytes())?; + Ok(()) +} + +fn get_validator_process_info( + ledger_path: &Path, +) -> Result<(Option, SystemTime), io::Error> { + let file = File::open(ledger_path.join("process-info.yml"))?; + let config: ProcessInfo = serde_yaml::from_reader(file) + .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?; + + let start_time = SystemTime::UNIX_EPOCH + Duration::from_secs(config.start_time); + Ok((config.rpc_addr, start_time)) +} + +pub fn get_validator_rpc_addr(ledger_path: &Path) -> Result, io::Error> { + get_validator_process_info(ledger_path).map(|process_info| process_info.0) +} + +pub fn get_validator_start_time(ledger_path: &Path) -> Result { + get_validator_process_info(ledger_path).map(|process_info| process_info.1) +} diff --git a/validator/src/main.rs b/validator/src/main.rs index e0d3c9794a..a1dc21a20d 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -2,6 +2,7 @@ use clap::{ crate_description, crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand, }; +use fd_lock::FdLock; use log::*; use rand::{seq::SliceRandom, thread_rng, Rng}; use solana_clap_utils::{ @@ -41,7 +42,7 @@ use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signer}, }; -use solana_validator::redirect_stderr_to_file; +use solana_validator::{dashboard::Dashboard, record_start, redirect_stderr_to_file}; use std::{ collections::HashSet, env, @@ -61,6 +62,7 @@ use std::{ #[derive(Debug, PartialEq)] enum Operation { Initialize, + Monitor, Run, } @@ -793,6 +795,7 @@ pub fn main() { let matches = App::new(crate_name!()).about(crate_description!()) .version(solana_version::version!()) .setting(AppSettings::VersionlessSubcommands) + .setting(AppSettings::InferSubcommands) .arg( Arg::with_name(SKIP_SEED_PHRASE_VALIDATION_ARG.name) .long(SKIP_SEED_PHRASE_VALIDATION_ARG.long) @@ -1446,11 +1449,16 @@ pub fn main() { SubCommand::with_name("run") .about("Run the validator") ) + .subcommand( + SubCommand::with_name("monitor") + .about("Monitor the validator") + ) .get_matches(); let operation = match matches.subcommand().0 { "" | "run" => Operation::Run, "init" => Operation::Initialize, + "monitor" => Operation::Monitor, _ => unreachable!(), }; @@ -1795,6 +1803,36 @@ pub fn main() { }) }); + if operation == Operation::Monitor { + let dashboard = Dashboard::new(&ledger_path, None).unwrap_or_else(|err| { + println!( + "Error: Unable to connect to validator at {}: {:?}", + ledger_path.display(), + err, + ); + exit(1); + }); + dashboard.run(); + } + + let mut ledger_fd_lock = FdLock::new(fs::File::open(&ledger_path).unwrap()); + let _ledger_lock = ledger_fd_lock.try_lock().unwrap_or_else(|_| { + println!( + "Error: Unable to lock {} directory. Check if another validator is running", + ledger_path.display() + ); + exit(1); + }); + + record_start( + &ledger_path, + validator_config + .rpc_addrs + .as_ref() + .map(|(rpc_addr, _)| rpc_addr), + ) + .unwrap_or_else(|err| println!("Error: failed to record validator start: {}", err)); + let logfile = { let logfile = matches .value_of("logfile")