From 0eb78e461dfee211e86cc8677fefe0c523b25fc9 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Mon, 18 Nov 2019 21:43:14 -0700 Subject: [PATCH] Relax requirement that the entrypoint node runs the RPC service (#7019) --- validator/src/main.rs | 251 +++++++++++++++++++++++------------------- 1 file changed, 137 insertions(+), 114 deletions(-) diff --git a/validator/src/main.rs b/validator/src/main.rs index dc78b71b3..688b2a84e 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -8,26 +8,36 @@ use solana_clap_utils::{ input_validators::{is_keypair, is_pubkey_or_keypair}, }; use solana_client::rpc_client::RpcClient; -use solana_core::cluster_info::{Node, VALIDATOR_PORT_RANGE}; -use solana_core::contact_info::ContactInfo; -use solana_core::gossip_service::discover; -use solana_core::ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS; -use solana_core::socketaddr; -use solana_core::validator::{Validator, ValidatorConfig}; +use solana_core::{ + cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE}, + contact_info::ContactInfo, + gossip_service::GossipService, + ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS, + socketaddr, + validator::{Validator, ValidatorConfig}, +}; use solana_ledger::bank_forks::SnapshotConfig; use solana_perf::recycler::enable_recycler_warming; -use solana_sdk::clock::Slot; -use solana_sdk::hash::Hash; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::{read_keypair_file, Keypair, KeypairUtil}; -use std::fs::{self, File}; -use std::io::{self, Read}; -use std::net::{SocketAddr, TcpListener}; -use std::path::{Path, PathBuf}; -use std::process::exit; -use std::str::FromStr; -use std::sync::Arc; -use std::time::Instant; +use solana_sdk::{ + clock::Slot, + hash::Hash, + pubkey::Pubkey, + signature::{read_keypair_file, Keypair, KeypairUtil}, +}; +use std::{ + fs::{self, File}, + io::{self, Read}, + net::{SocketAddr, TcpListener}, + path::{Path, PathBuf}, + process::exit, + str::FromStr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::sleep, + time::{Duration, Instant}, +}; fn port_validator(port: String) -> Result<(), String> { port.parse::() @@ -71,6 +81,8 @@ fn download_tar_bz2( if archive_path.is_file() { return Ok(()); } + fs::create_dir_all(download_path).map_err(|err| err.to_string())?; + let temp_archive_path = { let mut p = archive_path.clone(); p.set_extension(".tmp"); @@ -168,37 +180,58 @@ fn download_tar_bz2( Ok(()) } -fn create_rpc_client( - entrypoint: &ContactInfo, -) -> Result<(std::net::SocketAddr, RpcClient), String> { - let (nodes, _archivers) = discover( - Some(&entrypoint.gossip), - Some(1), - Some(60), - None, - Some(&entrypoint.gossip), - None, - ) - .map_err(|err| err.to_string())?; +fn get_rpc_addr( + node: &Node, + identity_keypair: &Arc, + entrypoint_gossip: &SocketAddr, +) -> (Pubkey, SocketAddr) { + let mut cluster_info = ClusterInfo::new(node.info.clone(), identity_keypair.clone()); + cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint_gossip)); + let cluster_info = Arc::new(RwLock::new(cluster_info)); - let rpc_addr = nodes.iter().find_map(|contact_info| { - if contact_info.gossip == entrypoint.gossip - && ContactInfo::is_valid_address(&contact_info.rpc) - { - Some(contact_info.rpc) - } else { - None + let exit = Arc::new(AtomicBool::new(false)); + let gossip_service = GossipService::new( + &cluster_info.clone(), + None, + None, + node.sockets.gossip.try_clone().unwrap(), + &exit, + ); + + let (id, rpc_addr) = loop { + info!( + "Searching for RPC service...\n{}", + cluster_info.read().unwrap().contact_info_trace() + ); + + let (gossip_peers, rpc_peers) = { + let cluster_info = cluster_info.read().unwrap(); + (cluster_info.gossip_peers(), cluster_info.rpc_peers()) + }; + + let found_entrypoint = gossip_peers + .iter() + .any(|contact_info| contact_info.gossip == *entrypoint_gossip); + + if found_entrypoint & !rpc_peers.is_empty() { + // Prefer the entrypoint's RPC service it it has one, otherwise pick the first RPC + // service found + if let Some(contact_info) = rpc_peers + .iter() + .find(|contact_info| contact_info.gossip == *entrypoint_gossip) + { + break (contact_info.id, contact_info.rpc); + } + break (rpc_peers[0].id, rpc_peers[0].rpc); } - }); - if let Some(rpc_addr) = rpc_addr { - Ok((rpc_addr, RpcClient::new_socket(rpc_addr))) - } else { - Err(format!( - "Entrypoint ({:?}) is not running the RPC service", - entrypoint.gossip - )) - } + sleep(Duration::from_secs(1)); + }; + + exit.store(true, Ordering::Relaxed); + gossip_service.join().unwrap(); + + (id, rpc_addr) } fn check_vote_account( @@ -251,22 +284,12 @@ fn check_vote_account( Ok(()) } -fn initialize_ledger_path( - rpc_addr: &std::net::SocketAddr, - rpc_client: &RpcClient, +fn download_ledger( + rpc_addr: &SocketAddr, ledger_path: &Path, - no_genesis_fetch: bool, no_snapshot_fetch: bool, -) -> Result { - let genesis_hash = rpc_client - .get_genesis_hash() - .map_err(|err| err.to_string())?; - - fs::create_dir_all(ledger_path).map_err(|err| err.to_string())?; - - if !no_genesis_fetch { - download_tar_bz2(&rpc_addr, "genesis.tar.bz2", ledger_path, true)?; - } +) -> Result<(), String> { + download_tar_bz2(rpc_addr, "genesis.tar.bz2", ledger_path, true)?; if !no_snapshot_fetch { let snapshot_package = solana_ledger::snapshot_utils::get_snapshot_tar_path(ledger_path); @@ -275,7 +298,7 @@ fn initialize_ledger_path( .unwrap_or_else(|err| warn!("error removing {:?}: {}", snapshot_package, err)); } download_tar_bz2( - &rpc_addr, + rpc_addr, snapshot_package.file_name().unwrap().to_str().unwrap(), snapshot_package.parent().unwrap(), false, @@ -283,12 +306,7 @@ fn initialize_ledger_path( .unwrap_or_else(|err| warn!("Unable to fetch snapshot: {:?}", err)); } - match rpc_client.get_slot() { - Ok(slot) => info!("Entrypoint currently at slot {}", slot), - Err(err) => warn!("Failed to get_slot from entrypoint: {}", err), - } - - Ok(genesis_hash) + Ok(()) } #[allow(clippy::cognitive_complexity)] @@ -361,21 +379,21 @@ pub fn main() { .value_name("HOST:PORT") .takes_value(true) .validator(solana_net_utils::is_host_port) - .help("Rendezvous with the cluster at this entry point"), + .help("Rendezvous with the cluster at this gossip entrypoint"), ) .arg( Arg::with_name("no_snapshot_fetch") .long("no-snapshot-fetch") .takes_value(false) .requires("entrypoint") - .help("Do not attempt to fetch a new snapshot from the cluster entrypoint, start from a local snapshot if present"), + .help("Do not attempt to fetch a snapshot from the cluster, start from a local snapshot if present"), ) .arg( Arg::with_name("no_genesis_fetch") .long("no-genesis-fetch") .takes_value(false) .requires("entrypoint") - .help("Do not attempt to fetch a new genesis from the cluster entrypoint, start from a local genesis if present"), + .help("Do not fetch genesis from the cluster"), ) .arg( Arg::with_name("no_voting") @@ -502,6 +520,7 @@ pub fn main() { } else { Keypair::new() }; + let identity_keypair = Arc::new(identity_keypair); let mut ephemeral_voting_keypair = false; let voting_keypair = if let Some(identity) = matches.value_of("voting_keypair") { @@ -720,60 +739,64 @@ pub fn main() { &udp_sockets, ); - let (rpc_addr, rpc_client) = create_rpc_client(cluster_entrypoint).unwrap_or_else(|err| { - error!("unable to create rpc client: {}", err); - std::process::exit(1); - }); - - if !validator_config.voting_disabled { - check_vote_account( - &rpc_client, - &vote_account, - &voting_keypair.pubkey(), - &identity_keypair.pubkey(), - ) - .unwrap_or_else(|err| { - error!("Failed to check vote account: {}", err); + if !no_genesis_fetch { + let (rpc_node_id, rpc_addr) = + get_rpc_addr(&node, &identity_keypair, &cluster_entrypoint.gossip); + info!("Using RPC from node {}: {:?}", rpc_node_id, rpc_addr); + let rpc_client = RpcClient::new_socket(rpc_addr); + let rpc_version = rpc_client.get_version().unwrap_or_else(|err| { + error!("Failed to get version: {}", err); exit(1); }); - } + info!("RPC node version: {}", rpc_version.solana_core); - let genesis_hash = initialize_ledger_path( - &rpc_addr, - &rpc_client, - &ledger_path, - no_genesis_fetch, - no_snapshot_fetch, - ) - .unwrap_or_else(|err| { - error!("Failed to download ledger: {}", err); - exit(1); - }); - - if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash { - if expected_genesis_hash != genesis_hash { - error!( - "Genesis hash mismatch: expected {} but local genesis hash is {}", - expected_genesis_hash, genesis_hash, - ); + download_ledger(&rpc_addr, &ledger_path, no_snapshot_fetch).unwrap_or_else(|err| { + error!("Failed to initialize ledger: {}", err); exit(1); + }); + + let genesis_hash = rpc_client.get_genesis_hash().unwrap_or_else(|err| { + error!("Failed to get genesis hash: {}", err); + exit(1); + }); + + if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash { + if expected_genesis_hash != genesis_hash { + error!( + "Genesis hash mismatch: expected {} but local genesis hash is {}", + expected_genesis_hash, genesis_hash, + ); + exit(1); + } + } + validator_config.expected_genesis_hash = Some(genesis_hash); + + if !validator_config.voting_disabled { + check_vote_account( + &rpc_client, + &vote_account, + &voting_keypair.pubkey(), + &identity_keypair.pubkey(), + ) + .unwrap_or_else(|err| { + error!("Failed to check vote account: {}", err); + exit(1); + }); } } - validator_config.expected_genesis_hash = Some(genesis_hash); - } else { - // Without a cluster entrypoint, ledger_path must already be present - if !ledger_path.is_dir() { - error!( - "ledger directory does not exist or is not accessible: {:?}", - ledger_path - ); - exit(1); - } + } + + if !ledger_path.is_dir() { + error!( + "ledger directory does not exist or is not accessible: {:?}", + ledger_path + ); + exit(1); } let validator = Validator::new( node, - &Arc::new(identity_keypair), + &identity_keypair, &ledger_path, &vote_account, &Arc::new(voting_keypair),