Relax requirement that the entrypoint node runs the RPC service (#7019)

This commit is contained in:
Michael Vines 2019-11-18 21:43:14 -07:00 committed by GitHub
parent 3615209ce7
commit 0eb78e461d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 137 additions and 114 deletions

View File

@ -8,26 +8,36 @@ use solana_clap_utils::{
input_validators::{is_keypair, is_pubkey_or_keypair}, input_validators::{is_keypair, is_pubkey_or_keypair},
}; };
use solana_client::rpc_client::RpcClient; use solana_client::rpc_client::RpcClient;
use solana_core::cluster_info::{Node, VALIDATOR_PORT_RANGE}; use solana_core::{
use solana_core::contact_info::ContactInfo; cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE},
use solana_core::gossip_service::discover; contact_info::ContactInfo,
use solana_core::ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS; gossip_service::GossipService,
use solana_core::socketaddr; ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS,
use solana_core::validator::{Validator, ValidatorConfig}; socketaddr,
validator::{Validator, ValidatorConfig},
};
use solana_ledger::bank_forks::SnapshotConfig; use solana_ledger::bank_forks::SnapshotConfig;
use solana_perf::recycler::enable_recycler_warming; use solana_perf::recycler::enable_recycler_warming;
use solana_sdk::clock::Slot; use solana_sdk::{
use solana_sdk::hash::Hash; clock::Slot,
use solana_sdk::pubkey::Pubkey; hash::Hash,
use solana_sdk::signature::{read_keypair_file, Keypair, KeypairUtil}; pubkey::Pubkey,
use std::fs::{self, File}; signature::{read_keypair_file, Keypair, KeypairUtil},
use std::io::{self, Read}; };
use std::net::{SocketAddr, TcpListener}; use std::{
use std::path::{Path, PathBuf}; fs::{self, File},
use std::process::exit; io::{self, Read},
use std::str::FromStr; net::{SocketAddr, TcpListener},
use std::sync::Arc; path::{Path, PathBuf},
use std::time::Instant; process::exit,
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::sleep,
time::{Duration, Instant},
};
fn port_validator(port: String) -> Result<(), String> { fn port_validator(port: String) -> Result<(), String> {
port.parse::<u16>() port.parse::<u16>()
@ -71,6 +81,8 @@ fn download_tar_bz2(
if archive_path.is_file() { if archive_path.is_file() {
return Ok(()); return Ok(());
} }
fs::create_dir_all(download_path).map_err(|err| err.to_string())?;
let temp_archive_path = { let temp_archive_path = {
let mut p = archive_path.clone(); let mut p = archive_path.clone();
p.set_extension(".tmp"); p.set_extension(".tmp");
@ -168,37 +180,58 @@ fn download_tar_bz2(
Ok(()) Ok(())
} }
fn create_rpc_client( fn get_rpc_addr(
entrypoint: &ContactInfo, node: &Node,
) -> Result<(std::net::SocketAddr, RpcClient), String> { identity_keypair: &Arc<Keypair>,
let (nodes, _archivers) = discover( entrypoint_gossip: &SocketAddr,
Some(&entrypoint.gossip), ) -> (Pubkey, SocketAddr) {
Some(1), let mut cluster_info = ClusterInfo::new(node.info.clone(), identity_keypair.clone());
Some(60), cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint_gossip));
None, let cluster_info = Arc::new(RwLock::new(cluster_info));
Some(&entrypoint.gossip),
None,
)
.map_err(|err| err.to_string())?;
let rpc_addr = nodes.iter().find_map(|contact_info| { let exit = Arc::new(AtomicBool::new(false));
if contact_info.gossip == entrypoint.gossip let gossip_service = GossipService::new(
&& ContactInfo::is_valid_address(&contact_info.rpc) &cluster_info.clone(),
{ None,
Some(contact_info.rpc) None,
} else { node.sockets.gossip.try_clone().unwrap(),
None &exit,
);
let (id, rpc_addr) = loop {
info!(
"Searching for RPC service...\n{}",
cluster_info.read().unwrap().contact_info_trace()
);
let (gossip_peers, rpc_peers) = {
let cluster_info = cluster_info.read().unwrap();
(cluster_info.gossip_peers(), cluster_info.rpc_peers())
};
let found_entrypoint = gossip_peers
.iter()
.any(|contact_info| contact_info.gossip == *entrypoint_gossip);
if found_entrypoint & !rpc_peers.is_empty() {
// Prefer the entrypoint's RPC service it it has one, otherwise pick the first RPC
// service found
if let Some(contact_info) = rpc_peers
.iter()
.find(|contact_info| contact_info.gossip == *entrypoint_gossip)
{
break (contact_info.id, contact_info.rpc);
}
break (rpc_peers[0].id, rpc_peers[0].rpc);
} }
});
if let Some(rpc_addr) = rpc_addr { sleep(Duration::from_secs(1));
Ok((rpc_addr, RpcClient::new_socket(rpc_addr))) };
} else {
Err(format!( exit.store(true, Ordering::Relaxed);
"Entrypoint ({:?}) is not running the RPC service", gossip_service.join().unwrap();
entrypoint.gossip
)) (id, rpc_addr)
}
} }
fn check_vote_account( fn check_vote_account(
@ -251,22 +284,12 @@ fn check_vote_account(
Ok(()) Ok(())
} }
fn initialize_ledger_path( fn download_ledger(
rpc_addr: &std::net::SocketAddr, rpc_addr: &SocketAddr,
rpc_client: &RpcClient,
ledger_path: &Path, ledger_path: &Path,
no_genesis_fetch: bool,
no_snapshot_fetch: bool, no_snapshot_fetch: bool,
) -> Result<Hash, String> { ) -> Result<(), String> {
let genesis_hash = rpc_client download_tar_bz2(rpc_addr, "genesis.tar.bz2", ledger_path, true)?;
.get_genesis_hash()
.map_err(|err| err.to_string())?;
fs::create_dir_all(ledger_path).map_err(|err| err.to_string())?;
if !no_genesis_fetch {
download_tar_bz2(&rpc_addr, "genesis.tar.bz2", ledger_path, true)?;
}
if !no_snapshot_fetch { if !no_snapshot_fetch {
let snapshot_package = solana_ledger::snapshot_utils::get_snapshot_tar_path(ledger_path); let snapshot_package = solana_ledger::snapshot_utils::get_snapshot_tar_path(ledger_path);
@ -275,7 +298,7 @@ fn initialize_ledger_path(
.unwrap_or_else(|err| warn!("error removing {:?}: {}", snapshot_package, err)); .unwrap_or_else(|err| warn!("error removing {:?}: {}", snapshot_package, err));
} }
download_tar_bz2( download_tar_bz2(
&rpc_addr, rpc_addr,
snapshot_package.file_name().unwrap().to_str().unwrap(), snapshot_package.file_name().unwrap().to_str().unwrap(),
snapshot_package.parent().unwrap(), snapshot_package.parent().unwrap(),
false, false,
@ -283,12 +306,7 @@ fn initialize_ledger_path(
.unwrap_or_else(|err| warn!("Unable to fetch snapshot: {:?}", err)); .unwrap_or_else(|err| warn!("Unable to fetch snapshot: {:?}", err));
} }
match rpc_client.get_slot() { Ok(())
Ok(slot) => info!("Entrypoint currently at slot {}", slot),
Err(err) => warn!("Failed to get_slot from entrypoint: {}", err),
}
Ok(genesis_hash)
} }
#[allow(clippy::cognitive_complexity)] #[allow(clippy::cognitive_complexity)]
@ -361,21 +379,21 @@ pub fn main() {
.value_name("HOST:PORT") .value_name("HOST:PORT")
.takes_value(true) .takes_value(true)
.validator(solana_net_utils::is_host_port) .validator(solana_net_utils::is_host_port)
.help("Rendezvous with the cluster at this entry point"), .help("Rendezvous with the cluster at this gossip entrypoint"),
) )
.arg( .arg(
Arg::with_name("no_snapshot_fetch") Arg::with_name("no_snapshot_fetch")
.long("no-snapshot-fetch") .long("no-snapshot-fetch")
.takes_value(false) .takes_value(false)
.requires("entrypoint") .requires("entrypoint")
.help("Do not attempt to fetch a new snapshot from the cluster entrypoint, start from a local snapshot if present"), .help("Do not attempt to fetch a snapshot from the cluster, start from a local snapshot if present"),
) )
.arg( .arg(
Arg::with_name("no_genesis_fetch") Arg::with_name("no_genesis_fetch")
.long("no-genesis-fetch") .long("no-genesis-fetch")
.takes_value(false) .takes_value(false)
.requires("entrypoint") .requires("entrypoint")
.help("Do not attempt to fetch a new genesis from the cluster entrypoint, start from a local genesis if present"), .help("Do not fetch genesis from the cluster"),
) )
.arg( .arg(
Arg::with_name("no_voting") Arg::with_name("no_voting")
@ -502,6 +520,7 @@ pub fn main() {
} else { } else {
Keypair::new() Keypair::new()
}; };
let identity_keypair = Arc::new(identity_keypair);
let mut ephemeral_voting_keypair = false; let mut ephemeral_voting_keypair = false;
let voting_keypair = if let Some(identity) = matches.value_of("voting_keypair") { let voting_keypair = if let Some(identity) = matches.value_of("voting_keypair") {
@ -720,60 +739,64 @@ pub fn main() {
&udp_sockets, &udp_sockets,
); );
let (rpc_addr, rpc_client) = create_rpc_client(cluster_entrypoint).unwrap_or_else(|err| { if !no_genesis_fetch {
error!("unable to create rpc client: {}", err); let (rpc_node_id, rpc_addr) =
std::process::exit(1); get_rpc_addr(&node, &identity_keypair, &cluster_entrypoint.gossip);
}); info!("Using RPC from node {}: {:?}", rpc_node_id, rpc_addr);
let rpc_client = RpcClient::new_socket(rpc_addr);
if !validator_config.voting_disabled { let rpc_version = rpc_client.get_version().unwrap_or_else(|err| {
check_vote_account( error!("Failed to get version: {}", err);
&rpc_client,
&vote_account,
&voting_keypair.pubkey(),
&identity_keypair.pubkey(),
)
.unwrap_or_else(|err| {
error!("Failed to check vote account: {}", err);
exit(1); exit(1);
}); });
} info!("RPC node version: {}", rpc_version.solana_core);
let genesis_hash = initialize_ledger_path( download_ledger(&rpc_addr, &ledger_path, no_snapshot_fetch).unwrap_or_else(|err| {
&rpc_addr, error!("Failed to initialize ledger: {}", err);
&rpc_client,
&ledger_path,
no_genesis_fetch,
no_snapshot_fetch,
)
.unwrap_or_else(|err| {
error!("Failed to download ledger: {}", err);
exit(1);
});
if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash {
if expected_genesis_hash != genesis_hash {
error!(
"Genesis hash mismatch: expected {} but local genesis hash is {}",
expected_genesis_hash, genesis_hash,
);
exit(1); exit(1);
});
let genesis_hash = rpc_client.get_genesis_hash().unwrap_or_else(|err| {
error!("Failed to get genesis hash: {}", err);
exit(1);
});
if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash {
if expected_genesis_hash != genesis_hash {
error!(
"Genesis hash mismatch: expected {} but local genesis hash is {}",
expected_genesis_hash, genesis_hash,
);
exit(1);
}
}
validator_config.expected_genesis_hash = Some(genesis_hash);
if !validator_config.voting_disabled {
check_vote_account(
&rpc_client,
&vote_account,
&voting_keypair.pubkey(),
&identity_keypair.pubkey(),
)
.unwrap_or_else(|err| {
error!("Failed to check vote account: {}", err);
exit(1);
});
} }
} }
validator_config.expected_genesis_hash = Some(genesis_hash); }
} else {
// Without a cluster entrypoint, ledger_path must already be present if !ledger_path.is_dir() {
if !ledger_path.is_dir() { error!(
error!( "ledger directory does not exist or is not accessible: {:?}",
"ledger directory does not exist or is not accessible: {:?}", ledger_path
ledger_path );
); exit(1);
exit(1);
}
} }
let validator = Validator::new( let validator = Validator::new(
node, node,
&Arc::new(identity_keypair), &identity_keypair,
&ledger_path, &ledger_path,
&vote_account, &vote_account,
&Arc::new(voting_keypair), &Arc::new(voting_keypair),