Connect to RPC nodes in parallel w/ reduced timeout (#26892)

* Connect to RPC nodes in parallel w/ reduced timeout
This commit is contained in:
Brennan Watt 2022-08-11 14:32:22 -07:00 committed by GitHub
parent b0c6c31b71
commit da4028b24f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 232 additions and 173 deletions

7
Cargo.lock generated
View File

@ -2099,12 +2099,12 @@ dependencies = [
[[package]] [[package]]
name = "iana-time-zone" name = "iana-time-zone"
version = "0.1.42" version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9512e544c25736b82aebbd2bf739a47c8a1c935dfcc3a6adcde10e35cd3cd468" checksum = "808cf7d67cf4a22adc5be66e75ebdf769b3f2ea032041437a7061f97a63dad4b"
dependencies = [ dependencies = [
"android_system_properties", "android_system_properties",
"core-foundation", "core-foundation-sys",
"js-sys", "js-sys",
"wasm-bindgen", "wasm-bindgen",
"winapi 0.3.9", "winapi 0.3.9",
@ -6512,6 +6512,7 @@ dependencies = [
"log", "log",
"num_cpus", "num_cpus",
"rand 0.7.3", "rand 0.7.3",
"rayon",
"serde", "serde",
"serde_json", "serde_json",
"signal-hook", "signal-hook",

View File

@ -5802,6 +5802,7 @@ dependencies = [
"log", "log",
"num_cpus", "num_cpus",
"rand 0.7.3", "rand 0.7.3",
"rayon",
"serde", "serde",
"serde_json", "serde_json",
"signal-hook", "signal-hook",

View File

@ -26,6 +26,7 @@ jsonrpc-server-utils = "18.0.0"
log = "0.4.17" log = "0.4.17"
num_cpus = "1.13.1" num_cpus = "1.13.1"
rand = "0.7.0" rand = "0.7.0"
rayon = "1.5.3"
serde = "1.0.143" serde = "1.0.143"
serde_json = "1.0.83" serde_json = "1.0.83"
solana-clap-utils = { path = "../clap-utils", version = "=1.12.0" } solana-clap-utils = { path = "../clap-utils", version = "=1.12.0" }

View File

@ -1,6 +1,7 @@
use { use {
log::*, log::*,
rand::{seq::SliceRandom, thread_rng, Rng}, rand::{seq::SliceRandom, thread_rng, Rng},
rayon::prelude::*,
solana_client::rpc_client::RpcClient, solana_client::rpc_client::RpcClient,
solana_core::validator::{ValidatorConfig, ValidatorStartProgress}, solana_core::validator::{ValidatorConfig, ValidatorStartProgress},
solana_download_utils::{download_snapshot_archive, DownloadProgressRecord}, solana_download_utils::{download_snapshot_archive, DownloadProgressRecord},
@ -28,7 +29,7 @@ use {
}, },
solana_streamer::socket::SocketAddrSpace, solana_streamer::socket::SocketAddrSpace,
std::{ std::{
collections::{HashMap, HashSet}, collections::{hash_map::RandomState, HashMap, HashSet},
net::{SocketAddr, TcpListener, UdpSocket}, net::{SocketAddr, TcpListener, UdpSocket},
path::Path, path::Path,
process::exit, process::exit,
@ -36,11 +37,12 @@ use {
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Arc, RwLock, Arc, RwLock,
}, },
thread::sleep,
time::{Duration, Instant}, time::{Duration, Instant},
}, },
}; };
pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
#[derive(Debug)] #[derive(Debug)]
pub struct RpcBootstrapConfig { pub struct RpcBootstrapConfig {
pub no_genesis_fetch: bool, pub no_genesis_fetch: bool,
@ -303,7 +305,7 @@ fn check_vote_account(
Ok(()) Ok(())
} }
/// Struct to wrap the return value from get_rpc_node(). The `rpc_contact_info` is the peer to /// Struct to wrap the return value from get_rpc_nodes(). The `rpc_contact_info` is the peer to
/// download from, and `snapshot_hash` is the (optional) full and (optional) incremental /// download from, and `snapshot_hash` is the (optional) full and (optional) incremental
/// snapshots to download. /// snapshots to download.
#[derive(Debug)] #[derive(Debug)]
@ -322,100 +324,49 @@ struct PeerSnapshotHash {
/// A snapshot hash. In this context (bootstrap *with* incremental snapshots), a snapshot hash /// A snapshot hash. In this context (bootstrap *with* incremental snapshots), a snapshot hash
/// is _both_ a full snapshot hash and an (optional) incremental snapshot hash. /// is _both_ a full snapshot hash and an (optional) incremental snapshot hash.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
struct SnapshotHash { pub struct SnapshotHash {
full: (Slot, Hash), full: (Slot, Hash),
incr: Option<(Slot, Hash)>, incr: Option<(Slot, Hash)>,
} }
pub fn fail_rpc_node(
err: String,
known_validators: &Option<HashSet<Pubkey, RandomState>>,
rpc_id: &Pubkey,
blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
) {
warn!("{}", err);
if let Some(ref known_validators) = known_validators {
if known_validators.contains(rpc_id) {
return;
}
}
info!("Excluding {} as a future RPC candidate", rpc_id);
blacklisted_rpc_nodes.insert(*rpc_id);
}
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn rpc_bootstrap( pub fn attempt_download_genesis_and_snapshot(
node: &Node, rpc_contact_info: &ContactInfo,
identity_keypair: &Arc<Keypair>,
ledger_path: &Path, ledger_path: &Path,
validator_config: &mut ValidatorConfig,
bootstrap_config: &RpcBootstrapConfig,
use_progress_bar: bool,
gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
rpc_client: &RpcClient,
full_snapshot_archives_dir: &Path, full_snapshot_archives_dir: &Path,
incremental_snapshot_archives_dir: &Path, incremental_snapshot_archives_dir: &Path,
vote_account: &Pubkey,
authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
cluster_entrypoints: &[ContactInfo],
validator_config: &mut ValidatorConfig,
bootstrap_config: RpcBootstrapConfig,
do_port_check: bool,
use_progress_bar: bool,
maximum_local_snapshot_age: Slot, maximum_local_snapshot_age: Slot,
should_check_duplicate_instance: bool,
start_progress: &Arc<RwLock<ValidatorStartProgress>>, start_progress: &Arc<RwLock<ValidatorStartProgress>>,
minimal_snapshot_download_speed: f32, minimal_snapshot_download_speed: f32,
maximum_snapshot_download_abort: u64, maximum_snapshot_download_abort: u64,
socket_addr_space: SocketAddrSpace, download_abort_count: &mut u64,
) { snapshot_hash: Option<SnapshotHash>,
if do_port_check { identity_keypair: &Arc<Keypair>,
let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect(); vote_account: &Pubkey,
order.shuffle(&mut thread_rng()); authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
if order.into_iter().all(|i| { ) -> Result<(), String> {
!verify_reachable_ports(
node,
&cluster_entrypoints[i],
validator_config,
&socket_addr_space,
)
}) {
exit(1);
}
}
if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
return;
}
let mut blacklisted_rpc_nodes = HashSet::new();
let mut gossip = None;
let mut download_abort_count = 0;
loop {
if gossip.is_none() {
*start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
gossip = Some(start_gossip_node(
identity_keypair.clone(),
cluster_entrypoints,
ledger_path,
&node.info.gossip,
node.sockets.gossip.try_clone().unwrap(),
validator_config.expected_shred_version,
validator_config.gossip_validators.clone(),
should_check_duplicate_instance,
socket_addr_space,
));
}
let rpc_node_details = get_rpc_node(
&gossip.as_ref().unwrap().0,
cluster_entrypoints,
validator_config,
&mut blacklisted_rpc_nodes,
&bootstrap_config,
);
if rpc_node_details.is_none() {
return;
}
let GetRpcNodeResult {
rpc_contact_info,
snapshot_hash,
} = rpc_node_details.unwrap();
info!(
"Using RPC service from node {}: {:?}",
rpc_contact_info.id, rpc_contact_info.rpc
);
let rpc_client = RpcClient::new_socket(rpc_contact_info.rpc);
let result = match rpc_client.get_version() {
Ok(rpc_version) => {
info!("RPC node version: {}", rpc_version.solana_core);
Ok(())
}
Err(err) => Err(format!("Failed to get RPC node version: {}", err)),
}
.and_then(|_| {
let genesis_config = download_then_check_genesis_hash( let genesis_config = download_then_check_genesis_hash(
&rpc_contact_info.rpc, &rpc_contact_info.rpc,
ledger_path, ledger_path,
@ -458,22 +409,23 @@ pub fn rpc_bootstrap(
.map_err(|err| format!("Failed to get RPC node slot: {}", err))?; .map_err(|err| format!("Failed to get RPC node slot: {}", err))?;
info!("RPC node root slot: {}", rpc_client_slot); info!("RPC node root slot: {}", rpc_client_slot);
download_snapshots( if let Err(err) = download_snapshots(
full_snapshot_archives_dir, full_snapshot_archives_dir,
incremental_snapshot_archives_dir, incremental_snapshot_archives_dir,
validator_config, validator_config,
&bootstrap_config, bootstrap_config,
use_progress_bar, use_progress_bar,
maximum_local_snapshot_age, maximum_local_snapshot_age,
start_progress, start_progress,
minimal_snapshot_download_speed, minimal_snapshot_download_speed,
maximum_snapshot_download_abort, maximum_snapshot_download_abort,
&mut download_abort_count, download_abort_count,
snapshot_hash, snapshot_hash,
&rpc_contact_info, rpc_contact_info,
) ) {
}) return Err(err);
.map(|_| { };
if let Some(url) = bootstrap_config.check_vote_account.as_ref() { if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
let rpc_client = RpcClient::new(url); let rpc_client = RpcClient::new(url);
check_vote_account( check_vote_account(
@ -498,25 +450,155 @@ pub fn rpc_bootstrap(
exit(1); exit(1);
}); });
} }
}); Ok(())
}
if result.is_ok() { #[allow(clippy::too_many_arguments)]
break; pub fn rpc_bootstrap(
node: &Node,
identity_keypair: &Arc<Keypair>,
ledger_path: &Path,
full_snapshot_archives_dir: &Path,
incremental_snapshot_archives_dir: &Path,
vote_account: &Pubkey,
authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
cluster_entrypoints: &[ContactInfo],
validator_config: &mut ValidatorConfig,
bootstrap_config: RpcBootstrapConfig,
do_port_check: bool,
use_progress_bar: bool,
maximum_local_snapshot_age: Slot,
should_check_duplicate_instance: bool,
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
minimal_snapshot_download_speed: f32,
maximum_snapshot_download_abort: u64,
socket_addr_space: SocketAddrSpace,
) {
if do_port_check {
let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
order.shuffle(&mut thread_rng());
if order.into_iter().all(|i| {
!verify_reachable_ports(
node,
&cluster_entrypoints[i],
validator_config,
&socket_addr_space,
)
}) {
exit(1);
}
} }
warn!("{}", result.unwrap_err());
if let Some(ref known_validators) = validator_config.known_validators { if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
if known_validators.contains(&rpc_contact_info.id) { return;
continue; // Never blacklist a known node
} }
let blacklisted_rpc_nodes = RwLock::new(HashSet::new());
let mut gossip = None;
let mut vetted_rpc_nodes: Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)> = vec![];
let mut download_abort_count = 0;
loop {
if gossip.is_none() {
*start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
gossip = Some(start_gossip_node(
identity_keypair.clone(),
cluster_entrypoints,
ledger_path,
&node.info.gossip,
node.sockets.gossip.try_clone().unwrap(),
validator_config.expected_shred_version,
validator_config.gossip_validators.clone(),
should_check_duplicate_instance,
socket_addr_space,
));
} }
while vetted_rpc_nodes.is_empty() {
let rpc_node_details_vec = get_rpc_nodes(
&gossip.as_ref().unwrap().0,
cluster_entrypoints,
validator_config,
&mut blacklisted_rpc_nodes.write().unwrap(),
&bootstrap_config,
);
if rpc_node_details_vec.is_empty() {
return;
}
vetted_rpc_nodes = rpc_node_details_vec
.into_par_iter()
.map(|rpc_node_details| {
let GetRpcNodeResult {
rpc_contact_info,
snapshot_hash,
} = rpc_node_details;
info!( info!(
"Excluding {} as a future RPC candidate", "Using RPC service from node {}: {:?}",
rpc_contact_info.id rpc_contact_info.id, rpc_contact_info.rpc
); );
blacklisted_rpc_nodes.insert(rpc_contact_info.id); let rpc_client = RpcClient::new_socket_with_timeout(
rpc_contact_info.rpc,
Duration::from_secs(5),
);
(rpc_contact_info, snapshot_hash, rpc_client)
})
.filter(|(rpc_contact_info, _snapshot_hash, rpc_client)| {
match rpc_client.get_version() {
Ok(rpc_version) => {
info!("RPC node version: {}", rpc_version.solana_core);
true
} }
Err(err) => {
fail_rpc_node(
format!("Failed to get RPC node version: {}", err),
&validator_config.known_validators,
&rpc_contact_info.id,
&mut blacklisted_rpc_nodes.write().unwrap(),
);
false
}
}
})
.collect();
}
let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
match attempt_download_genesis_and_snapshot(
&rpc_contact_info,
ledger_path,
validator_config,
&bootstrap_config,
use_progress_bar,
&mut gossip,
&rpc_client,
full_snapshot_archives_dir,
incremental_snapshot_archives_dir,
maximum_local_snapshot_age,
start_progress,
minimal_snapshot_download_speed,
maximum_snapshot_download_abort,
&mut download_abort_count,
snapshot_hash,
identity_keypair,
vote_account,
authorized_voter_keypairs.clone(),
) {
Ok(()) => break,
Err(err) => {
fail_rpc_node(
err,
&validator_config.known_validators,
&rpc_contact_info.id,
&mut blacklisted_rpc_nodes.write().unwrap(),
);
}
}
}
if let Some((cluster_info, gossip_exit_flag, gossip_service)) = gossip.take() { if let Some((cluster_info, gossip_exit_flag, gossip_service)) = gossip.take() {
cluster_info.save_contact_info(); cluster_info.save_contact_info();
gossip_exit_flag.store(true, Ordering::Relaxed); gossip_exit_flag.store(true, Ordering::Relaxed);
@ -524,22 +606,20 @@ pub fn rpc_bootstrap(
} }
} }
/// Get an RPC peer node to download from. /// Get RPC peer node candidates to download from.
/// ///
/// This function finds the highest compatible snapshots from the cluster, then picks one peer /// This function finds the highest compatible snapshots from the cluster and returns RPC peers.
/// at random to use (return). fn get_rpc_nodes(
fn get_rpc_node(
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
cluster_entrypoints: &[ContactInfo], cluster_entrypoints: &[ContactInfo],
validator_config: &ValidatorConfig, validator_config: &ValidatorConfig,
blacklisted_rpc_nodes: &mut HashSet<Pubkey>, blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
bootstrap_config: &RpcBootstrapConfig, bootstrap_config: &RpcBootstrapConfig,
) -> Option<GetRpcNodeResult> { ) -> Vec<GetRpcNodeResult> {
let mut blacklist_timeout = Instant::now(); let mut blacklist_timeout = Instant::now();
let mut newer_cluster_snapshot_timeout = None; let mut newer_cluster_snapshot_timeout = None;
let mut retry_reason = None; let mut retry_reason = None;
loop { loop {
sleep(Duration::from_secs(1));
info!("\n{}", cluster_info.rpc_info_trace()); info!("\n{}", cluster_info.rpc_info_trace());
let rpc_peers = get_rpc_peers( let rpc_peers = get_rpc_peers(
@ -556,17 +636,16 @@ fn get_rpc_node(
} }
let rpc_peers = rpc_peers.unwrap(); let rpc_peers = rpc_peers.unwrap();
blacklist_timeout = Instant::now(); blacklist_timeout = Instant::now();
if bootstrap_config.no_snapshot_fetch { if bootstrap_config.no_snapshot_fetch {
if rpc_peers.is_empty() { if rpc_peers.is_empty() {
retry_reason = Some("No RPC peers available.".to_owned()); retry_reason = Some("No RPC peers available.".to_owned());
continue; continue;
} else { } else {
let random_peer = &rpc_peers[thread_rng().gen_range(0, rpc_peers.len())]; let random_peer = &rpc_peers[thread_rng().gen_range(0, rpc_peers.len())];
return Some(GetRpcNodeResult { return vec![GetRpcNodeResult {
rpc_contact_info: random_peer.clone(), rpc_contact_info: random_peer.clone(),
snapshot_hash: None, snapshot_hash: None,
}); }];
} }
} }
@ -576,14 +655,13 @@ fn get_rpc_node(
validator_config.known_validators.as_ref(), validator_config.known_validators.as_ref(),
bootstrap_config.incremental_snapshot_fetch, bootstrap_config.incremental_snapshot_fetch,
); );
if peer_snapshot_hashes.is_empty() { if peer_snapshot_hashes.is_empty() {
match newer_cluster_snapshot_timeout { match newer_cluster_snapshot_timeout {
None => newer_cluster_snapshot_timeout = Some(Instant::now()), None => newer_cluster_snapshot_timeout = Some(Instant::now()),
Some(newer_cluster_snapshot_timeout) => { Some(newer_cluster_snapshot_timeout) => {
if newer_cluster_snapshot_timeout.elapsed().as_secs() > 180 { if newer_cluster_snapshot_timeout.elapsed().as_secs() > 180 {
warn!("Giving up, did not get newer snapshots from the cluster."); warn!("Giving up, did not get newer snapshots from the cluster.");
return None; return vec![];
} }
} }
} }
@ -594,10 +672,7 @@ fn get_rpc_node(
.iter() .iter()
.map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.id) .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.id)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let PeerSnapshotHash { let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
rpc_contact_info: final_rpc_contact_info,
snapshot_hash: final_snapshot_hash,
} = get_final_peer_snapshot_hash(&peer_snapshot_hashes);
info!( info!(
"Highest available snapshot slot is {}, available from {} node{}: {:?}", "Highest available snapshot slot is {}, available from {} node{}: {:?}",
final_snapshot_hash final_snapshot_hash
@ -608,11 +683,15 @@ fn get_rpc_node(
if rpc_peers.len() > 1 { "s" } else { "" }, if rpc_peers.len() > 1 { "s" } else { "" },
rpc_peers, rpc_peers,
); );
let rpc_node_results = peer_snapshot_hashes
return Some(GetRpcNodeResult { .iter()
rpc_contact_info: final_rpc_contact_info, .map(|peer_snapshot_hash| GetRpcNodeResult {
snapshot_hash: Some(final_snapshot_hash), rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
}); snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
})
.take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
.collect();
return rpc_node_results;
} }
} }
} }
@ -971,29 +1050,6 @@ fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
); );
} }
/// Get a final peer from the remaining peer snapshot hashes. At this point all the snapshot
/// hashes should (must) be the same, and only the peers are different. Pick an element from
/// the slice at random and return it.
fn get_final_peer_snapshot_hash(peer_snapshot_hashes: &[PeerSnapshotHash]) -> PeerSnapshotHash {
assert!(!peer_snapshot_hashes.is_empty());
// pick a final rpc peer at random
let final_peer_snapshot_hash =
&peer_snapshot_hashes[thread_rng().gen_range(0, peer_snapshot_hashes.len())];
// It is a programmer bug if the assert fires! By the time this function is called, the
// only remaining `incremental_snapshot_hashes` should all be the same.
assert!(
peer_snapshot_hashes.iter().all(|peer_snapshot_hash| {
peer_snapshot_hash.snapshot_hash == final_peer_snapshot_hash.snapshot_hash
}),
"To safely pick a peer at random, all the snapshot hashes must be the same"
);
trace!("final peer snapshot hash: {:?}", final_peer_snapshot_hash);
final_peer_snapshot_hash.clone()
}
/// Check to see if we can use our local snapshots, otherwise download newer ones. /// Check to see if we can use our local snapshots, otherwise download newer ones.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn download_snapshots( fn download_snapshots(