Add |solana-validator monitor| subcommand (#15118)

This commit is contained in:
Michael Vines 2021-02-05 22:39:23 -08:00 committed by GitHub
parent 819d829c41
commit f34b8643c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 375 additions and 175 deletions

4
Cargo.lock generated
View File

@ -5372,7 +5372,9 @@ dependencies = [
"log 0.4.11",
"num_cpus",
"rand 0.7.3",
"serde_json",
"serde",
"serde_derive",
"serde_yaml",
"signal-hook",
"solana-clap-utils",
"solana-cli-config",

View File

@ -21,7 +21,9 @@ indicatif = "0.15.0"
log = "0.4.11"
num_cpus = "1.13.0"
rand = "0.7.0"
serde_json = "1.0.56"
serde = "1.0.112"
serde_derive = "1.0.103"
serde_yaml = "0.8.13"
solana-clap-utils = { path = "../clap-utils", version = "1.6.0" }
solana-cli-config = { path = "../cli-config", version = "1.6.0" }
solana-client = { path = "../client", version = "1.6.0" }

View File

@ -1,8 +1,6 @@
use {
clap::{value_t, value_t_or_exit, App, Arg},
console::style,
fd_lock::FdLock,
indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle},
solana_clap_utils::{
input_parsers::{pubkey_of, pubkeys_of},
input_validators::{
@ -10,20 +8,21 @@ use {
normalize_to_url_if_moniker,
},
},
solana_client::{client_error, rpc_client::RpcClient, rpc_request},
solana_client::rpc_client::RpcClient,
solana_core::rpc::JsonRpcConfig,
solana_faucet::faucet::{run_local_faucet_with_port, FAUCET_PORT},
solana_sdk::{
account::Account,
clock::{Slot, DEFAULT_TICKS_PER_SLOT, MS_PER_TICK},
commitment_config::CommitmentConfig,
native_token::{sol_to_lamports, Sol},
clock::Slot,
native_token::sol_to_lamports,
pubkey::Pubkey,
rpc_port,
signature::{read_keypair_file, write_keypair_file, Keypair, Signer},
system_program,
},
solana_validator::{redirect_stderr_to_file, test_validator::*},
solana_validator::{
dashboard::Dashboard, record_start, redirect_stderr_to_file, test_validator::*,
},
std::{
collections::HashSet,
fs, io,
@ -31,8 +30,7 @@ use {
path::{Path, PathBuf},
process::exit,
sync::mpsc::channel,
thread,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
time::{SystemTime, UNIX_EPOCH},
},
};
@ -43,21 +41,6 @@ enum Output {
Dashboard,
}
/// Creates a new process bar for processing that will take an unknown amount of time
fn new_spinner_progress_bar() -> ProgressBar {
let progress_bar = ProgressBar::new(42);
progress_bar.set_draw_target(ProgressDrawTarget::stdout());
progress_bar
.set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}"));
progress_bar.enable_steady_tick(100);
progress_bar
}
/// Pretty print a "name value"
fn println_name_value(name: &str, value: &str) {
println!("{} {}", style(name).bold(), value);
}
fn main() {
let default_rpc_port = rpc_port::DEFAULT_RPC_PORT.to_string();
@ -289,7 +272,7 @@ fn main() {
let mut ledger_fd_lock = FdLock::new(fs::File::open(&ledger_path).unwrap());
let _ledger_lock = ledger_fd_lock.try_lock().unwrap_or_else(|_| {
println!(
"Error: Unable to lock {} directory. Check if another solana-test-validator is running",
"Error: Unable to lock {} directory. Check if another validator is running",
ledger_path.display()
);
exit(1);
@ -342,6 +325,7 @@ fn main() {
},
);
}
let faucet_keypair =
read_keypair_file(faucet_keypair_file.to_str().unwrap()).unwrap_or_else(|err| {
println!(
@ -351,26 +335,38 @@ fn main() {
);
exit(1);
});
let faucet_pubkey = faucet_keypair.pubkey();
let validator_start = Instant::now();
if let Some(faucet_addr) = &faucet_addr {
let (sender, receiver) = channel();
run_local_faucet_with_port(faucet_keypair, sender, None, faucet_addr.port());
let _ = receiver.recv().expect("run faucet").unwrap_or_else(|err| {
println!("Error: failed to start faucet: {}", err);
exit(1);
});
}
let test_validator = {
let _progress_bar = if output == Output::Dashboard {
println_name_value("Mint address:", &mint_address.to_string());
println_name_value("Ledger location:", &format!("{}", ledger_path.display()));
println_name_value("Log:", &format!("{}", validator_log_symlink.display()));
let progress_bar = new_spinner_progress_bar();
progress_bar.set_message("Initializing...");
Some(progress_bar)
record_start(
&ledger_path,
Some(&SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
rpc_port,
)),
)
.unwrap_or_else(|err| println!("Error: failed to record validator start: {}", err));
let dashboard = if output == Output::Dashboard {
Some(Dashboard::new(&ledger_path, Some(&validator_log_symlink)).unwrap())
} else {
None
};
let test_validator = {
let mut genesis = TestValidatorGenesis::default();
genesis
.ledger_path(&ledger_path)
.add_account(
faucet_keypair.pubkey(),
faucet_pubkey,
Account::new(faucet_lamports, 0, &system_program::id()),
)
.rpc_config(JsonRpcConfig {
@ -396,143 +392,19 @@ fn main() {
genesis.warp_slot(warp_slot);
}
genesis.start_with_mint_address(mint_address)
}
.unwrap_or_else(|err| {
println!("Error: failed to start validator: {}", err);
exit(1);
});
if let Some(faucet_addr) = &faucet_addr {
let (sender, receiver) = channel();
run_local_faucet_with_port(faucet_keypair, sender, None, faucet_addr.port());
let _ = receiver.recv().expect("run faucet").unwrap_or_else(|err| {
println!("Error: failed to start faucet: {}", err);
exit(1);
});
}
if output == Output::Dashboard {
let rpc_client = test_validator.rpc_client().0;
let identity = &rpc_client.get_identity().expect("get_identity");
println_name_value("Identity:", &identity.to_string());
println_name_value(
"Version:",
&rpc_client.get_version().expect("get_version").solana_core,
);
println_name_value("JSON RPC URL:", &test_validator.rpc_url());
println_name_value(
"JSON RPC PubSub Websocket:",
&test_validator.rpc_pubsub_url(),
);
println_name_value("Gossip Address:", &test_validator.gossip().to_string());
println_name_value("TPU Address:", &test_validator.tpu().to_string());
if let Some(faucet_addr) = &faucet_addr {
println_name_value(
"Faucet Address:",
&format!("{}:{}", &test_validator.gossip().ip(), faucet_addr.port()),
);
}
let progress_bar = new_spinner_progress_bar();
fn get_validator_stats(
rpc_client: &RpcClient,
identity: &Pubkey,
) -> client_error::Result<(Slot, Slot, Slot, u64, Sol, String)> {
let processed_slot =
rpc_client.get_slot_with_commitment(CommitmentConfig::processed())?;
let confirmed_slot =
rpc_client.get_slot_with_commitment(CommitmentConfig::confirmed())?;
let finalized_slot =
rpc_client.get_slot_with_commitment(CommitmentConfig::finalized())?;
let transaction_count =
rpc_client.get_transaction_count_with_commitment(CommitmentConfig::processed())?;
let identity_balance = rpc_client
.get_balance_with_commitment(identity, CommitmentConfig::confirmed())?
.value;
let health = match rpc_client.get_health() {
Ok(()) => "ok".to_string(),
Err(err) => {
if let client_error::ClientErrorKind::RpcError(
rpc_request::RpcError::RpcResponseError {
code: _,
message: _,
data:
rpc_request::RpcResponseErrorData::NodeUnhealthy {
num_slots_behind: Some(num_slots_behind),
},
},
) = &err.kind
{
format!("{} slots behind", num_slots_behind)
} else {
"unhealthy".to_string()
}
}
};
Ok((
processed_slot,
confirmed_slot,
finalized_slot,
transaction_count,
Sol(identity_balance),
health,
))
}
loop {
let snapshot_slot = rpc_client.get_snapshot_slot().ok();
for _i in 0..10 {
match get_validator_stats(&rpc_client, &identity) {
Ok((
processed_slot,
confirmed_slot,
finalized_slot,
transaction_count,
identity_balance,
health,
)) => {
let uptime = chrono::Duration::from_std(validator_start.elapsed()).unwrap();
progress_bar.set_message(&format!(
"{:02}:{:02}:{:02} \
{}| \
Processed Slot: {} | Confirmed Slot: {} | Finalized Slot: {} | \
Snapshot Slot: {} | \
Transactions: {} | {}",
uptime.num_hours(),
uptime.num_minutes() % 60,
uptime.num_seconds() % 60,
if health == "ok" {
"".to_string()
} else {
format!("| {} ", style(health).bold().red())
match test_validator {
Ok(_test_validator) => match dashboard {
Some(dashboard) => dashboard.run(),
None => std::thread::park(),
},
processed_slot,
confirmed_slot,
finalized_slot,
snapshot_slot
.map(|s| s.to_string())
.unwrap_or_else(|| "-".to_string()),
transaction_count,
identity_balance
));
}
Err(err) => {
progress_bar.set_message(&format!("{}", err));
drop(dashboard);
println!("Error: failed to start validator: {}", err);
exit(1);
}
}
thread::sleep(Duration::from_millis(
MS_PER_TICK * DEFAULT_TICKS_PER_SLOT / 2,
));
}
}
}
std::thread::park();
}
fn remove_directory_contents(ledger_path: &Path) -> Result<(), io::Error> {

228
validator/src/dashboard.rs Normal file
View File

@ -0,0 +1,228 @@
use {
crate::{get_validator_rpc_addr, get_validator_start_time},
console::style,
indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle},
solana_client::{
client_error, rpc_client::RpcClient, rpc_request, rpc_response::RpcContactInfo,
},
solana_sdk::{
clock::{Slot, DEFAULT_TICKS_PER_SLOT, MS_PER_TICK},
commitment_config::CommitmentConfig,
native_token::Sol,
pubkey::Pubkey,
},
std::{
io,
path::{Path, PathBuf},
thread,
time::Duration,
},
};
/// Creates a new process bar for processing that will take an unknown amount of time
fn new_spinner_progress_bar() -> ProgressBar {
let progress_bar = ProgressBar::new(42);
progress_bar.set_draw_target(ProgressDrawTarget::stdout());
progress_bar
.set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}"));
progress_bar.enable_steady_tick(100);
progress_bar
}
/// Pretty print a "name value"
fn println_name_value(name: &str, value: &str) {
println!("{} {}", style(name).bold(), value);
}
pub struct Dashboard {
progress_bar: ProgressBar,
ledger_path: PathBuf,
}
impl Dashboard {
pub fn new(ledger_path: &Path, log_path: Option<&Path>) -> Result<Self, io::Error> {
println_name_value("Ledger location:", &format!("{}", ledger_path.display()));
if let Some(log_path) = log_path {
println_name_value("Log:", &format!("{}", log_path.display()));
}
let rpc_addr = get_validator_rpc_addr(&ledger_path)?;
if rpc_addr.is_none() {
return Err(io::Error::new(io::ErrorKind::Other, "RPC not available"));
}
let progress_bar = new_spinner_progress_bar();
progress_bar.set_message("Initializing...");
Ok(Self {
progress_bar,
//ledger_path: ledger_path.clone().to_path_buf(),
ledger_path: ledger_path.to_path_buf(),
})
}
pub fn run(self) -> ! {
let Self {
progress_bar,
ledger_path,
} = self;
progress_bar.set_message("Connecting...");
let rpc_addr = get_validator_rpc_addr(&ledger_path).unwrap().unwrap();
let rpc_client = RpcClient::new_socket(rpc_addr);
// Wait until RPC starts responding...
loop {
match rpc_client.get_identity() {
Ok(_) => break,
Err(err) => {
progress_bar.set_message(&format!("{}", err));
thread::sleep(Duration::from_millis(500));
}
}
}
drop(progress_bar);
let identity = &rpc_client.get_identity().expect("get_identity");
println_name_value("Identity:", &identity.to_string());
fn get_contact_info(rpc_client: &RpcClient, identity: &Pubkey) -> Option<RpcContactInfo> {
rpc_client
.get_cluster_nodes()
.ok()
.unwrap_or_default()
.into_iter()
.find(|node| node.pubkey == identity.to_string())
}
if let Some(contact_info) = get_contact_info(&rpc_client, &identity) {
println_name_value(
"Version:",
&contact_info.version.unwrap_or_else(|| "?".to_string()),
);
if let Some(gossip) = contact_info.gossip {
println_name_value("Gossip Address:", &gossip.to_string());
}
if let Some(tpu) = contact_info.tpu {
println_name_value("TPU Address:", &tpu.to_string());
}
if let Some(rpc) = contact_info.rpc {
println_name_value("JSON RPC URL:", &format!("http://{}", rpc.to_string()));
}
}
let progress_bar = new_spinner_progress_bar();
fn get_validator_stats(
rpc_client: &RpcClient,
identity: &Pubkey,
) -> client_error::Result<(Slot, Slot, Slot, u64, Sol, String)> {
let processed_slot =
rpc_client.get_slot_with_commitment(CommitmentConfig::processed())?;
let confirmed_slot =
rpc_client.get_slot_with_commitment(CommitmentConfig::confirmed())?;
let finalized_slot =
rpc_client.get_slot_with_commitment(CommitmentConfig::finalized())?;
let transaction_count =
rpc_client.get_transaction_count_with_commitment(CommitmentConfig::processed())?;
let identity_balance = rpc_client
.get_balance_with_commitment(identity, CommitmentConfig::confirmed())?
.value;
let health = match rpc_client.get_health() {
Ok(()) => "ok".to_string(),
Err(err) => {
if let client_error::ClientErrorKind::RpcError(
rpc_request::RpcError::RpcResponseError {
code: _,
message: _,
data:
rpc_request::RpcResponseErrorData::NodeUnhealthy {
num_slots_behind: Some(num_slots_behind),
},
},
) = &err.kind
{
format!("{} slots behind", num_slots_behind)
} else {
"unhealthy".to_string()
}
}
};
Ok((
processed_slot,
confirmed_slot,
finalized_slot,
transaction_count,
Sol(identity_balance),
health,
))
}
let mut start_time = get_validator_start_time(&ledger_path).ok();
loop {
let snapshot_slot = rpc_client.get_snapshot_slot().ok();
for _i in 0..10 {
match get_validator_stats(&rpc_client, &identity) {
Ok((
processed_slot,
confirmed_slot,
finalized_slot,
transaction_count,
identity_balance,
health,
)) => {
let uptime = match start_time {
Some(start_time) => {
let uptime =
chrono::Duration::from_std(start_time.elapsed().unwrap())
.unwrap();
format!(
"{:02}:{:02}:{:02} ",
uptime.num_hours(),
uptime.num_minutes() % 60,
uptime.num_seconds() % 60
)
}
None => " ".to_string(),
};
progress_bar.set_message(&format!(
"{}{}| \
Processed Slot: {} | Confirmed Slot: {} | Finalized Slot: {} | \
Snapshot Slot: {} | \
Transactions: {} | {}",
uptime,
if health == "ok" {
"".to_string()
} else {
format!("| {} ", style(health).bold().red())
},
processed_slot,
confirmed_slot,
finalized_slot,
snapshot_slot
.map(|s| s.to_string())
.unwrap_or_else(|| "-".to_string()),
transaction_count,
identity_balance
));
}
Err(err) => {
start_time = get_validator_start_time(&ledger_path).ok();
progress_bar.set_message(&format!("{}", err));
}
}
thread::sleep(Duration::from_millis(
MS_PER_TICK * DEFAULT_TICKS_PER_SLOT / 2,
));
}
}
}
}

View File

@ -1,9 +1,21 @@
pub use solana_core::test_validator;
use {
log::*,
std::{env, process::exit, thread::JoinHandle},
serde_derive::{Deserialize, Serialize},
std::{
env,
fs::{self, File},
io::{self, Write},
net::SocketAddr,
path::Path,
process::exit,
thread::JoinHandle,
time::{Duration, SystemTime},
},
};
pub mod dashboard;
#[cfg(unix)]
fn redirect_stderr(filename: &str) {
use std::{fs::OpenOptions, os::unix::io::AsRawFd};
@ -75,3 +87,49 @@ pub fn port_validator(port: String) -> Result<(), String> {
.map(|_| ())
.map_err(|e| format!("{:?}", e))
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct ProcessInfo {
rpc_addr: Option<SocketAddr>, // RPC port to contact the validator at
start_time: u64, // Seconds since the UNIX_EPOCH for when the validator was started
}
pub fn record_start(ledger_path: &Path, rpc_addr: Option<&SocketAddr>) -> Result<(), io::Error> {
if !ledger_path.exists() {
fs::create_dir_all(&ledger_path)?;
}
let start_info = ProcessInfo {
rpc_addr: rpc_addr.cloned(),
start_time: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
let serialized = serde_yaml::to_string(&start_info)
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?;
let mut file = File::create(ledger_path.join("process-info.yml"))?;
file.write_all(&serialized.into_bytes())?;
Ok(())
}
fn get_validator_process_info(
ledger_path: &Path,
) -> Result<(Option<SocketAddr>, SystemTime), io::Error> {
let file = File::open(ledger_path.join("process-info.yml"))?;
let config: ProcessInfo = serde_yaml::from_reader(file)
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?;
let start_time = SystemTime::UNIX_EPOCH + Duration::from_secs(config.start_time);
Ok((config.rpc_addr, start_time))
}
pub fn get_validator_rpc_addr(ledger_path: &Path) -> Result<Option<SocketAddr>, io::Error> {
get_validator_process_info(ledger_path).map(|process_info| process_info.0)
}
pub fn get_validator_start_time(ledger_path: &Path) -> Result<SystemTime, io::Error> {
get_validator_process_info(ledger_path).map(|process_info| process_info.1)
}

View File

@ -2,6 +2,7 @@ use clap::{
crate_description, crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, App,
AppSettings, Arg, ArgMatches, SubCommand,
};
use fd_lock::FdLock;
use log::*;
use rand::{seq::SliceRandom, thread_rng, Rng};
use solana_clap_utils::{
@ -41,7 +42,7 @@ use solana_sdk::{
pubkey::Pubkey,
signature::{Keypair, Signer},
};
use solana_validator::redirect_stderr_to_file;
use solana_validator::{dashboard::Dashboard, record_start, redirect_stderr_to_file};
use std::{
collections::HashSet,
env,
@ -61,6 +62,7 @@ use std::{
#[derive(Debug, PartialEq)]
enum Operation {
Initialize,
Monitor,
Run,
}
@ -793,6 +795,7 @@ pub fn main() {
let matches = App::new(crate_name!()).about(crate_description!())
.version(solana_version::version!())
.setting(AppSettings::VersionlessSubcommands)
.setting(AppSettings::InferSubcommands)
.arg(
Arg::with_name(SKIP_SEED_PHRASE_VALIDATION_ARG.name)
.long(SKIP_SEED_PHRASE_VALIDATION_ARG.long)
@ -1446,11 +1449,16 @@ pub fn main() {
SubCommand::with_name("run")
.about("Run the validator")
)
.subcommand(
SubCommand::with_name("monitor")
.about("Monitor the validator")
)
.get_matches();
let operation = match matches.subcommand().0 {
"" | "run" => Operation::Run,
"init" => Operation::Initialize,
"monitor" => Operation::Monitor,
_ => unreachable!(),
};
@ -1795,6 +1803,36 @@ pub fn main() {
})
});
if operation == Operation::Monitor {
let dashboard = Dashboard::new(&ledger_path, None).unwrap_or_else(|err| {
println!(
"Error: Unable to connect to validator at {}: {:?}",
ledger_path.display(),
err,
);
exit(1);
});
dashboard.run();
}
let mut ledger_fd_lock = FdLock::new(fs::File::open(&ledger_path).unwrap());
let _ledger_lock = ledger_fd_lock.try_lock().unwrap_or_else(|_| {
println!(
"Error: Unable to lock {} directory. Check if another validator is running",
ledger_path.display()
);
exit(1);
});
record_start(
&ledger_path,
validator_config
.rpc_addrs
.as_ref()
.map(|(rpc_addr, _)| rpc_addr),
)
.unwrap_or_else(|err| println!("Error: failed to record validator start: {}", err));
let logfile = {
let logfile = matches
.value_of("logfile")