Refactor gossip code from one huge function (#10701)

This commit is contained in:
sakridge 2020-06-18 22:20:52 -07:00 committed by GitHub
parent c0389ef82f
commit 0c72f62e96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 203 additions and 169 deletions

View File

@ -12,9 +12,7 @@ use std::error;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::process::exit;
fn main() -> Result<(), Box<dyn error::Error>> {
solana_logger::setup_with_default("solana=info");
fn parse_matches() -> ArgMatches<'static> {
let shred_version_arg = Arg::with_name("shred_version")
.long("shred-version")
.value_name("VERSION")
@ -22,7 +20,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
.default_value("0")
.help("Filter gossip nodes by this shred version");
let matches = App::new(crate_name!())
App::new(crate_name!())
.about(crate_description!())
.version(solana_version::version!())
.setting(AppSettings::SubcommandRequiredElseHelp)
@ -150,187 +148,223 @@ fn main() -> Result<(), Box<dyn error::Error>> {
.help("Public key of a specific node to stop"),
),
)
.get_matches();
.get_matches()
}
fn parse_entrypoint(matches: &ArgMatches) -> Option<SocketAddr> {
matches.value_of("entrypoint").map(|entrypoint| {
solana_net_utils::parse_host_port(entrypoint).unwrap_or_else(|e| {
eprintln!("failed to parse entrypoint address: {}", e);
fn parse_gossip_host(matches: &ArgMatches, entrypoint_addr: Option<SocketAddr>) -> IpAddr {
matches
.value_of("gossip_host")
.map(|gossip_host| {
solana_net_utils::parse_host(gossip_host).unwrap_or_else(|e| {
eprintln!("failed to parse gossip-host: {}", e);
exit(1);
})
})
}
match matches.subcommand() {
("spy", Some(matches)) => {
let num_nodes_exactly = matches
.value_of("num_nodes_exactly")
.map(|num| num.to_string().parse().unwrap());
let num_nodes = matches
.value_of("num_nodes")
.map(|num| num.to_string().parse().unwrap())
.or(num_nodes_exactly);
let timeout = matches
.value_of("timeout")
.map(|secs| secs.to_string().parse().unwrap());
let pubkey = matches
.value_of("node_pubkey")
.map(|pubkey_str| pubkey_str.parse::<Pubkey>().unwrap());
let shred_version = value_t_or_exit!(matches, "shred_version", u16);
let entrypoint_addr = parse_entrypoint(&matches);
let gossip_host = matches
.value_of("gossip_host")
.map(|gossip_host| {
solana_net_utils::parse_host(gossip_host).unwrap_or_else(|e| {
eprintln!("failed to parse gossip-host: {}", e);
exit(1);
})
})
.unwrap_or_else(|| {
if let Some(entrypoint_addr) = entrypoint_addr {
solana_net_utils::get_public_ip_addr(&entrypoint_addr).unwrap_or_else(
|err| {
eprintln!(
"Failed to contact cluster entrypoint {}: {}",
entrypoint_addr, err
);
exit(1);
},
)
} else {
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
}
});
let gossip_addr = SocketAddr::new(
gossip_host,
value_t!(matches, "gossip_port", u16).unwrap_or_else(|_| {
solana_net_utils::find_available_port_in_range(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
(0, 1),
)
.expect("unable to find an available gossip port")
}),
);
let (_all_peers, validators) = discover(
entrypoint_addr.as_ref(),
num_nodes,
timeout,
pubkey,
None,
Some(&gossip_addr),
shred_version,
)?;
if timeout.is_some() {
if let Some(num) = num_nodes {
if validators.len() < num {
let add = if num_nodes_exactly.is_some() {
""
} else {
" or more"
};
eprintln!(
"Error: Insufficient validators discovered. Expecting {}{}",
num, add,
);
exit(1);
}
}
if let Some(node) = pubkey {
if validators.iter().find(|x| x.id == node).is_none() {
eprintln!("Error: Could not find node {:?}", node);
exit(1);
}
}
}
if let Some(num_nodes_exactly) = num_nodes_exactly {
if validators.len() > num_nodes_exactly {
.unwrap_or_else(|| {
if let Some(entrypoint_addr) = entrypoint_addr {
solana_net_utils::get_public_ip_addr(&entrypoint_addr).unwrap_or_else(|err| {
eprintln!(
"Error: Extra nodes discovered. Expecting exactly {}",
num_nodes_exactly
"Failed to contact cluster entrypoint {}: {}",
entrypoint_addr, err
);
exit(1);
}
}
}
("rpc-url", Some(matches)) => {
let any = matches.is_present("any");
let all = matches.is_present("all");
let entrypoint_addr = parse_entrypoint(&matches);
let timeout = value_t_or_exit!(matches, "timeout", u64);
let shred_version = value_t_or_exit!(matches, "shred_version", u16);
let (_all_peers, validators) = discover(
entrypoint_addr.as_ref(),
Some(1),
Some(timeout),
None,
entrypoint_addr.as_ref(),
None,
shred_version,
)?;
let rpc_addrs: Vec<_> = validators
.iter()
.filter_map(|contact_info| {
if (any || all || Some(contact_info.gossip) == entrypoint_addr)
&& ContactInfo::is_valid_address(&contact_info.rpc)
{
return Some(contact_info.rpc);
}
None
})
.collect();
if rpc_addrs.is_empty() {
eprintln!("No RPC URL found");
exit(1);
} else {
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
}
})
}
for rpc_addr in rpc_addrs {
println!("http://{}", rpc_addr);
if any {
break;
}
}
}
("stop", Some(matches)) => {
let entrypoint_addr = parse_entrypoint(&matches);
let pubkey = matches
.value_of("node_pubkey")
.unwrap()
.parse::<Pubkey>()
.unwrap();
let (_all_peers, validators) = discover(
entrypoint_addr.as_ref(),
None,
None,
Some(pubkey),
None,
None,
0,
)?;
let validator = validators.iter().find(|x| x.id == pubkey).unwrap();
if !ContactInfo::is_valid_address(&validator.rpc) {
fn process_spy_results(
timeout: Option<u64>,
validators: Vec<ContactInfo>,
num_nodes: Option<usize>,
num_nodes_exactly: Option<usize>,
pubkey: Option<Pubkey>,
) {
if timeout.is_some() {
if let Some(num) = num_nodes {
if validators.len() < num {
let add = if num_nodes_exactly.is_some() {
""
} else {
" or more"
};
eprintln!(
"Error: RPC service is not enabled on validator {:?}",
pubkey
"Error: Insufficient validators discovered. Expecting {}{}",
num, add,
);
exit(1);
}
println!("\nSending stop request to validator {:?}", pubkey);
let result = RpcClient::new_socket(validator.rpc).validator_exit()?;
if result {
println!("Stop signal accepted");
} else {
eprintln!("Error: Stop signal ignored");
}
if let Some(node) = pubkey {
if validators.iter().find(|x| x.id == node).is_none() {
eprintln!("Error: Could not find node {:?}", node);
exit(1);
}
}
}
if let Some(num_nodes_exactly) = num_nodes_exactly {
if validators.len() > num_nodes_exactly {
eprintln!(
"Error: Extra nodes discovered. Expecting exactly {}",
num_nodes_exactly
);
exit(1);
}
}
}
fn process_spy(matches: &ArgMatches) -> std::io::Result<()> {
let num_nodes_exactly = matches
.value_of("num_nodes_exactly")
.map(|num| num.to_string().parse().unwrap());
let num_nodes = matches
.value_of("num_nodes")
.map(|num| num.to_string().parse().unwrap())
.or(num_nodes_exactly);
let timeout = matches
.value_of("timeout")
.map(|secs| secs.to_string().parse().unwrap());
let pubkey = matches
.value_of("node_pubkey")
.map(|pubkey_str| pubkey_str.parse::<Pubkey>().unwrap());
let shred_version = value_t_or_exit!(matches, "shred_version", u16);
let entrypoint_addr = parse_entrypoint(matches);
let gossip_host = parse_gossip_host(matches, entrypoint_addr);
let gossip_addr = SocketAddr::new(
gossip_host,
value_t!(matches, "gossip_port", u16).unwrap_or_else(|_| {
solana_net_utils::find_available_port_in_range(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
(0, 1),
)
.expect("unable to find an available gossip port")
}),
);
let (_all_peers, validators) = discover(
entrypoint_addr.as_ref(),
num_nodes,
timeout,
pubkey,
None,
Some(&gossip_addr),
shred_version,
)?;
process_spy_results(timeout, validators, num_nodes, num_nodes_exactly, pubkey);
Ok(())
}
fn parse_entrypoint(matches: &ArgMatches) -> Option<SocketAddr> {
matches.value_of("entrypoint").map(|entrypoint| {
solana_net_utils::parse_host_port(entrypoint).unwrap_or_else(|e| {
eprintln!("failed to parse entrypoint address: {}", e);
exit(1);
})
})
}
fn process_rpc_url(matches: &ArgMatches) -> std::io::Result<()> {
let any = matches.is_present("any");
let all = matches.is_present("all");
let entrypoint_addr = parse_entrypoint(&matches);
let timeout = value_t_or_exit!(matches, "timeout", u64);
let shred_version = value_t_or_exit!(matches, "shred_version", u16);
let (_all_peers, validators) = discover(
entrypoint_addr.as_ref(),
Some(1),
Some(timeout),
None,
entrypoint_addr.as_ref(),
None,
shred_version,
)?;
let rpc_addrs: Vec<_> = validators
.iter()
.filter_map(|contact_info| {
if (any || all || Some(contact_info.gossip) == entrypoint_addr)
&& ContactInfo::is_valid_address(&contact_info.rpc)
{
return Some(contact_info.rpc);
}
None
})
.collect();
if rpc_addrs.is_empty() {
eprintln!("No RPC URL found");
exit(1);
}
for rpc_addr in rpc_addrs {
println!("http://{}", rpc_addr);
if any {
break;
}
}
Ok(())
}
fn process_stop(matches: &ArgMatches) -> Result<(), Box<dyn error::Error>> {
let entrypoint_addr = parse_entrypoint(&matches);
let pubkey = matches
.value_of("node_pubkey")
.unwrap()
.parse::<Pubkey>()
.unwrap();
let (_all_peers, validators) = discover(
entrypoint_addr.as_ref(),
None,
None,
Some(pubkey),
None,
None,
0,
)?;
let validator = validators.iter().find(|x| x.id == pubkey).unwrap();
if !ContactInfo::is_valid_address(&validator.rpc) {
eprintln!(
"Error: RPC service is not enabled on validator {:?}",
pubkey
);
exit(1);
}
println!("\nSending stop request to validator {:?}", pubkey);
let result = RpcClient::new_socket(validator.rpc).validator_exit()?;
if result {
println!("Stop signal accepted");
} else {
eprintln!("Error: Stop signal ignored");
}
Ok(())
}
fn main() -> Result<(), Box<dyn error::Error>> {
solana_logger::setup_with_default("solana=info");
let matches = parse_matches();
match matches.subcommand() {
("spy", Some(matches)) => {
process_spy(matches)?;
}
("rpc-url", Some(matches)) => {
process_rpc_url(matches)?;
}
("stop", Some(matches)) => {
process_stop(matches)?;
}
_ => unreachable!(),
}