From 0c72f62e964bd45a4799d711178bed9fdb4082f4 Mon Sep 17 00:00:00 2001 From: sakridge Date: Thu, 18 Jun 2020 22:20:52 -0700 Subject: [PATCH] Refactor gossip code from one huge function (#10701) --- gossip/src/main.rs | 372 +++++++++++++++++++++++++-------------------- 1 file changed, 203 insertions(+), 169 deletions(-) diff --git a/gossip/src/main.rs b/gossip/src/main.rs index 0409bf5b9..bcc2fe490 100644 --- a/gossip/src/main.rs +++ b/gossip/src/main.rs @@ -12,9 +12,7 @@ use std::error; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::process::exit; -fn main() -> Result<(), Box> { - solana_logger::setup_with_default("solana=info"); - +fn parse_matches() -> ArgMatches<'static> { let shred_version_arg = Arg::with_name("shred_version") .long("shred-version") .value_name("VERSION") @@ -22,7 +20,7 @@ fn main() -> Result<(), Box> { .default_value("0") .help("Filter gossip nodes by this shred version"); - let matches = App::new(crate_name!()) + App::new(crate_name!()) .about(crate_description!()) .version(solana_version::version!()) .setting(AppSettings::SubcommandRequiredElseHelp) @@ -150,187 +148,223 @@ fn main() -> Result<(), Box> { .help("Public key of a specific node to stop"), ), ) - .get_matches(); + .get_matches() +} - fn parse_entrypoint(matches: &ArgMatches) -> Option { - matches.value_of("entrypoint").map(|entrypoint| { - solana_net_utils::parse_host_port(entrypoint).unwrap_or_else(|e| { - eprintln!("failed to parse entrypoint address: {}", e); +fn parse_gossip_host(matches: &ArgMatches, entrypoint_addr: Option) -> IpAddr { + matches + .value_of("gossip_host") + .map(|gossip_host| { + solana_net_utils::parse_host(gossip_host).unwrap_or_else(|e| { + eprintln!("failed to parse gossip-host: {}", e); exit(1); }) }) - } - - match matches.subcommand() { - ("spy", Some(matches)) => { - let num_nodes_exactly = matches - .value_of("num_nodes_exactly") - .map(|num| num.to_string().parse().unwrap()); - let num_nodes = matches - .value_of("num_nodes") - .map(|num| num.to_string().parse().unwrap()) - .or(num_nodes_exactly); - let timeout = matches - .value_of("timeout") - .map(|secs| secs.to_string().parse().unwrap()); - let pubkey = matches - .value_of("node_pubkey") - .map(|pubkey_str| pubkey_str.parse::().unwrap()); - let shred_version = value_t_or_exit!(matches, "shred_version", u16); - - let entrypoint_addr = parse_entrypoint(&matches); - - let gossip_host = matches - .value_of("gossip_host") - .map(|gossip_host| { - solana_net_utils::parse_host(gossip_host).unwrap_or_else(|e| { - eprintln!("failed to parse gossip-host: {}", e); - exit(1); - }) - }) - .unwrap_or_else(|| { - if let Some(entrypoint_addr) = entrypoint_addr { - solana_net_utils::get_public_ip_addr(&entrypoint_addr).unwrap_or_else( - |err| { - eprintln!( - "Failed to contact cluster entrypoint {}: {}", - entrypoint_addr, err - ); - exit(1); - }, - ) - } else { - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) - } - }); - - let gossip_addr = SocketAddr::new( - gossip_host, - value_t!(matches, "gossip_port", u16).unwrap_or_else(|_| { - solana_net_utils::find_available_port_in_range( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - (0, 1), - ) - .expect("unable to find an available gossip port") - }), - ); - - let (_all_peers, validators) = discover( - entrypoint_addr.as_ref(), - num_nodes, - timeout, - pubkey, - None, - Some(&gossip_addr), - shred_version, - )?; - - if timeout.is_some() { - if let Some(num) = num_nodes { - if validators.len() < num { - let add = if num_nodes_exactly.is_some() { - "" - } else { - " or more" - }; - eprintln!( - "Error: Insufficient validators discovered. Expecting {}{}", - num, add, - ); - exit(1); - } - } - if let Some(node) = pubkey { - if validators.iter().find(|x| x.id == node).is_none() { - eprintln!("Error: Could not find node {:?}", node); - exit(1); - } - } - } - if let Some(num_nodes_exactly) = num_nodes_exactly { - if validators.len() > num_nodes_exactly { + .unwrap_or_else(|| { + if let Some(entrypoint_addr) = entrypoint_addr { + solana_net_utils::get_public_ip_addr(&entrypoint_addr).unwrap_or_else(|err| { eprintln!( - "Error: Extra nodes discovered. Expecting exactly {}", - num_nodes_exactly + "Failed to contact cluster entrypoint {}: {}", + entrypoint_addr, err ); exit(1); - } - } - } - ("rpc-url", Some(matches)) => { - let any = matches.is_present("any"); - let all = matches.is_present("all"); - let entrypoint_addr = parse_entrypoint(&matches); - let timeout = value_t_or_exit!(matches, "timeout", u64); - let shred_version = value_t_or_exit!(matches, "shred_version", u16); - let (_all_peers, validators) = discover( - entrypoint_addr.as_ref(), - Some(1), - Some(timeout), - None, - entrypoint_addr.as_ref(), - None, - shred_version, - )?; - - let rpc_addrs: Vec<_> = validators - .iter() - .filter_map(|contact_info| { - if (any || all || Some(contact_info.gossip) == entrypoint_addr) - && ContactInfo::is_valid_address(&contact_info.rpc) - { - return Some(contact_info.rpc); - } - None }) - .collect(); - - if rpc_addrs.is_empty() { - eprintln!("No RPC URL found"); - exit(1); + } else { + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) } + }) +} - for rpc_addr in rpc_addrs { - println!("http://{}", rpc_addr); - if any { - break; - } - } - } - ("stop", Some(matches)) => { - let entrypoint_addr = parse_entrypoint(&matches); - let pubkey = matches - .value_of("node_pubkey") - .unwrap() - .parse::() - .unwrap(); - let (_all_peers, validators) = discover( - entrypoint_addr.as_ref(), - None, - None, - Some(pubkey), - None, - None, - 0, - )?; - let validator = validators.iter().find(|x| x.id == pubkey).unwrap(); - - if !ContactInfo::is_valid_address(&validator.rpc) { +fn process_spy_results( + timeout: Option, + validators: Vec, + num_nodes: Option, + num_nodes_exactly: Option, + pubkey: Option, +) { + if timeout.is_some() { + if let Some(num) = num_nodes { + if validators.len() < num { + let add = if num_nodes_exactly.is_some() { + "" + } else { + " or more" + }; eprintln!( - "Error: RPC service is not enabled on validator {:?}", - pubkey + "Error: Insufficient validators discovered. Expecting {}{}", + num, add, ); exit(1); } - println!("\nSending stop request to validator {:?}", pubkey); - - let result = RpcClient::new_socket(validator.rpc).validator_exit()?; - if result { - println!("Stop signal accepted"); - } else { - eprintln!("Error: Stop signal ignored"); + } + if let Some(node) = pubkey { + if validators.iter().find(|x| x.id == node).is_none() { + eprintln!("Error: Could not find node {:?}", node); + exit(1); } } + } + if let Some(num_nodes_exactly) = num_nodes_exactly { + if validators.len() > num_nodes_exactly { + eprintln!( + "Error: Extra nodes discovered. Expecting exactly {}", + num_nodes_exactly + ); + exit(1); + } + } +} + +fn process_spy(matches: &ArgMatches) -> std::io::Result<()> { + let num_nodes_exactly = matches + .value_of("num_nodes_exactly") + .map(|num| num.to_string().parse().unwrap()); + let num_nodes = matches + .value_of("num_nodes") + .map(|num| num.to_string().parse().unwrap()) + .or(num_nodes_exactly); + let timeout = matches + .value_of("timeout") + .map(|secs| secs.to_string().parse().unwrap()); + let pubkey = matches + .value_of("node_pubkey") + .map(|pubkey_str| pubkey_str.parse::().unwrap()); + let shred_version = value_t_or_exit!(matches, "shred_version", u16); + + let entrypoint_addr = parse_entrypoint(matches); + + let gossip_host = parse_gossip_host(matches, entrypoint_addr); + + let gossip_addr = SocketAddr::new( + gossip_host, + value_t!(matches, "gossip_port", u16).unwrap_or_else(|_| { + solana_net_utils::find_available_port_in_range( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + (0, 1), + ) + .expect("unable to find an available gossip port") + }), + ); + + let (_all_peers, validators) = discover( + entrypoint_addr.as_ref(), + num_nodes, + timeout, + pubkey, + None, + Some(&gossip_addr), + shred_version, + )?; + + process_spy_results(timeout, validators, num_nodes, num_nodes_exactly, pubkey); + + Ok(()) +} + +fn parse_entrypoint(matches: &ArgMatches) -> Option { + matches.value_of("entrypoint").map(|entrypoint| { + solana_net_utils::parse_host_port(entrypoint).unwrap_or_else(|e| { + eprintln!("failed to parse entrypoint address: {}", e); + exit(1); + }) + }) +} + +fn process_rpc_url(matches: &ArgMatches) -> std::io::Result<()> { + let any = matches.is_present("any"); + let all = matches.is_present("all"); + let entrypoint_addr = parse_entrypoint(&matches); + let timeout = value_t_or_exit!(matches, "timeout", u64); + let shred_version = value_t_or_exit!(matches, "shred_version", u16); + let (_all_peers, validators) = discover( + entrypoint_addr.as_ref(), + Some(1), + Some(timeout), + None, + entrypoint_addr.as_ref(), + None, + shred_version, + )?; + + let rpc_addrs: Vec<_> = validators + .iter() + .filter_map(|contact_info| { + if (any || all || Some(contact_info.gossip) == entrypoint_addr) + && ContactInfo::is_valid_address(&contact_info.rpc) + { + return Some(contact_info.rpc); + } + None + }) + .collect(); + + if rpc_addrs.is_empty() { + eprintln!("No RPC URL found"); + exit(1); + } + + for rpc_addr in rpc_addrs { + println!("http://{}", rpc_addr); + if any { + break; + } + } + + Ok(()) +} + +fn process_stop(matches: &ArgMatches) -> Result<(), Box> { + let entrypoint_addr = parse_entrypoint(&matches); + let pubkey = matches + .value_of("node_pubkey") + .unwrap() + .parse::() + .unwrap(); + let (_all_peers, validators) = discover( + entrypoint_addr.as_ref(), + None, + None, + Some(pubkey), + None, + None, + 0, + )?; + let validator = validators.iter().find(|x| x.id == pubkey).unwrap(); + + if !ContactInfo::is_valid_address(&validator.rpc) { + eprintln!( + "Error: RPC service is not enabled on validator {:?}", + pubkey + ); + exit(1); + } + println!("\nSending stop request to validator {:?}", pubkey); + + let result = RpcClient::new_socket(validator.rpc).validator_exit()?; + if result { + println!("Stop signal accepted"); + } else { + eprintln!("Error: Stop signal ignored"); + } + + Ok(()) +} + +fn main() -> Result<(), Box> { + solana_logger::setup_with_default("solana=info"); + + let matches = parse_matches(); + + match matches.subcommand() { + ("spy", Some(matches)) => { + process_spy(matches)?; + } + ("rpc-url", Some(matches)) => { + process_rpc_url(matches)?; + } + ("stop", Some(matches)) => { + process_stop(matches)?; + } _ => unreachable!(), }