diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index 8b9a4667f3..4f06863da8 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -11,7 +11,6 @@ use { gossip_service::GossipService, }, solana_runtime::{ - hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_package::SnapshotType, snapshot_utils::{ @@ -41,6 +40,118 @@ use { }, }; +#[derive(Debug)] +pub struct RpcBootstrapConfig { + pub no_genesis_fetch: bool, + pub no_snapshot_fetch: bool, + pub no_untrusted_rpc: bool, + pub max_genesis_archive_unpacked_size: u64, + pub no_check_vote_account: bool, +} + +#[allow(clippy::too_many_arguments)] +pub fn rpc_bootstrap( + node: &Node, + identity_keypair: &Arc, + ledger_path: &Path, + snapshot_archives_dir: &Path, + vote_account: &Pubkey, + authorized_voter_keypairs: Arc>>>, + cluster_entrypoints: &[ContactInfo], + validator_config: &mut ValidatorConfig, + bootstrap_config: RpcBootstrapConfig, + no_port_check: bool, + use_progress_bar: bool, + maximum_local_snapshot_age: Slot, + should_check_duplicate_instance: bool, + start_progress: &Arc>, + minimal_snapshot_download_speed: f32, + maximum_snapshot_download_abort: u64, + socket_addr_space: SocketAddrSpace, +) { + without_incremental_snapshots::rpc_bootstrap( + node, + identity_keypair, + ledger_path, + snapshot_archives_dir, + vote_account, + authorized_voter_keypairs, + cluster_entrypoints, + validator_config, + bootstrap_config, + no_port_check, + use_progress_bar, + maximum_local_snapshot_age, + should_check_duplicate_instance, + start_progress, + minimal_snapshot_download_speed, + maximum_snapshot_download_abort, + socket_addr_space, + ) +} + +fn verify_reachable_ports( + node: &Node, + cluster_entrypoint: &ContactInfo, + validator_config: &ValidatorConfig, + socket_addr_space: &SocketAddrSpace, +) -> bool { + let mut udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair]; + + if ContactInfo::is_valid_address(&node.info.serve_repair, socket_addr_space) { + udp_sockets.push(&node.sockets.serve_repair); + } + if ContactInfo::is_valid_address(&node.info.tpu, socket_addr_space) { + udp_sockets.extend(node.sockets.tpu.iter()); + } + if ContactInfo::is_valid_address(&node.info.tpu_forwards, socket_addr_space) { + udp_sockets.extend(node.sockets.tpu_forwards.iter()); + } + if ContactInfo::is_valid_address(&node.info.tpu_vote, socket_addr_space) { + udp_sockets.extend(node.sockets.tpu_vote.iter()); + } + if ContactInfo::is_valid_address(&node.info.tvu, socket_addr_space) { + udp_sockets.extend(node.sockets.tvu.iter()); + udp_sockets.extend(node.sockets.broadcast.iter()); + udp_sockets.extend(node.sockets.retransmit_sockets.iter()); + } + if ContactInfo::is_valid_address(&node.info.tvu_forwards, socket_addr_space) { + udp_sockets.extend(node.sockets.tvu_forwards.iter()); + } + + let mut tcp_listeners = vec![]; + if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs { + for (purpose, bind_addr, public_addr) in &[ + ("RPC", rpc_addr, &node.info.rpc), + ("RPC pubsub", rpc_pubsub_addr, &node.info.rpc_pubsub), + ] { + if ContactInfo::is_valid_address(public_addr, socket_addr_space) { + tcp_listeners.push(( + bind_addr.port(), + TcpListener::bind(bind_addr).unwrap_or_else(|err| { + error!( + "Unable to bind to tcp {:?} for {}: {}", + bind_addr, purpose, err + ); + exit(1); + }), + )); + } + } + } + + if let Some(ip_echo) = &node.sockets.ip_echo { + let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener"); + tcp_listeners.push((ip_echo.local_addr().unwrap().port(), ip_echo)); + } + + solana_net_utils::verify_reachable_ports( + &cluster_entrypoint.gossip, + tcp_listeners, + &udp_sockets, + ) +} + fn is_trusted_validator(id: &Pubkey, trusted_validators: &Option>) -> bool { if let Some(trusted_validators) = trusted_validators { trusted_validators.contains(id) @@ -49,25 +160,6 @@ fn is_trusted_validator(id: &Pubkey, trusted_validators: &Option } } -fn get_trusted_snapshot_hashes( - cluster_info: &ClusterInfo, - trusted_validators: &Option>, -) -> Option> { - if let Some(trusted_validators) = trusted_validators { - let mut trusted_snapshot_hashes = HashSet::new(); - for trusted_validator in trusted_validators { - cluster_info.get_snapshot_hash_for_node(trusted_validator, |snapshot_hashes| { - for snapshot_hash in snapshot_hashes { - trusted_snapshot_hashes.insert(*snapshot_hash); - } - }); - } - Some(trusted_snapshot_hashes) - } else { - None - } -} - fn start_gossip_node( identity_keypair: Arc, cluster_entrypoints: &[ContactInfo], @@ -175,126 +267,6 @@ fn get_rpc_peers( Some(rpc_peers) } -fn get_rpc_node( - cluster_info: &ClusterInfo, - cluster_entrypoints: &[ContactInfo], - validator_config: &ValidatorConfig, - blacklisted_rpc_nodes: &mut HashSet, - snapshot_not_required: bool, - no_untrusted_rpc: bool, - snapshot_archives_dir: &Path, -) -> Option<(ContactInfo, Option<(Slot, Hash)>)> { - let mut blacklist_timeout = Instant::now(); - let mut newer_cluster_snapshot_timeout = None; - let mut retry_reason = None; - loop { - sleep(Duration::from_secs(1)); - info!("\n{}", cluster_info.rpc_info_trace()); - - let rpc_peers = get_rpc_peers( - cluster_info, - cluster_entrypoints, - validator_config, - blacklisted_rpc_nodes, - &blacklist_timeout, - &mut retry_reason, - ); - if rpc_peers.is_none() { - continue; - } - let rpc_peers = rpc_peers.unwrap(); - blacklist_timeout = Instant::now(); - - let mut highest_snapshot_hash = get_highest_local_snapshot_hash(snapshot_archives_dir); - let eligible_rpc_peers = if snapshot_not_required { - rpc_peers - } else { - let trusted_snapshot_hashes = - get_trusted_snapshot_hashes(cluster_info, &validator_config.trusted_validators); - - let mut eligible_rpc_peers = vec![]; - - for rpc_peer in rpc_peers.iter() { - if no_untrusted_rpc - && !is_trusted_validator(&rpc_peer.id, &validator_config.trusted_validators) - { - continue; - } - cluster_info.get_snapshot_hash_for_node(&rpc_peer.id, |snapshot_hashes| { - for snapshot_hash in snapshot_hashes { - if let Some(ref trusted_snapshot_hashes) = trusted_snapshot_hashes { - if !trusted_snapshot_hashes.contains(snapshot_hash) { - // Ignore all untrusted snapshot hashes - continue; - } - } - - if highest_snapshot_hash.is_none() - || snapshot_hash.0 > highest_snapshot_hash.unwrap().0 - { - // Found a higher snapshot, remove all nodes with a lower snapshot - eligible_rpc_peers.clear(); - highest_snapshot_hash = Some(*snapshot_hash) - } - - if Some(*snapshot_hash) == highest_snapshot_hash { - eligible_rpc_peers.push(rpc_peer.clone()); - } - } - }); - } - - match highest_snapshot_hash { - None => { - assert!(eligible_rpc_peers.is_empty()); - } - Some(highest_snapshot_hash) => { - if eligible_rpc_peers.is_empty() { - match newer_cluster_snapshot_timeout { - None => newer_cluster_snapshot_timeout = Some(Instant::now()), - Some(newer_cluster_snapshot_timeout) => { - if newer_cluster_snapshot_timeout.elapsed().as_secs() > 180 { - warn!("giving up newer snapshot from the cluster"); - return None; - } - } - } - retry_reason = Some(format!( - "Wait for newer snapshot than local: {:?}", - highest_snapshot_hash - )); - continue; - } - - info!( - "Highest available snapshot slot is {}, available from {} node{}: {:?}", - highest_snapshot_hash.0, - eligible_rpc_peers.len(), - if eligible_rpc_peers.len() > 1 { - "s" - } else { - "" - }, - eligible_rpc_peers - .iter() - .map(|contact_info| contact_info.id) - .collect::>() - ); - } - } - eligible_rpc_peers - }; - - if !eligible_rpc_peers.is_empty() { - let contact_info = - &eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())]; - return Some((contact_info.clone(), highest_snapshot_hash)); - } else { - retry_reason = Some("No snapshots available".to_owned()); - } - } -} - fn check_vote_account( rpc_client: &RpcClient, identity_pubkey: &Pubkey, @@ -359,168 +331,115 @@ fn check_vote_account( Ok(()) } -fn verify_reachable_ports( - node: &Node, - cluster_entrypoint: &ContactInfo, - validator_config: &ValidatorConfig, - socket_addr_space: &SocketAddrSpace, -) -> bool { - let mut udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair]; +/// Get the Slot and Hash of the local snapshot with the highest slot. Can be either a full +/// snapshot or an incremental snapshot. +fn get_highest_local_snapshot_hash( + snapshot_archives_dir: impl AsRef, +) -> Option<(Slot, Hash)> { + if let Some(full_snapshot_info) = + snapshot_utils::get_highest_full_snapshot_archive_info(&snapshot_archives_dir) + { + if let Some(incremental_snapshot_info) = + snapshot_utils::get_highest_incremental_snapshot_archive_info( + &snapshot_archives_dir, + full_snapshot_info.slot(), + ) + { + Some(( + incremental_snapshot_info.slot(), + *incremental_snapshot_info.hash(), + )) + } else { + Some((full_snapshot_info.slot(), *full_snapshot_info.hash())) + } + } else { + None + } +} - if ContactInfo::is_valid_address(&node.info.serve_repair, socket_addr_space) { - udp_sockets.push(&node.sockets.serve_repair); - } - if ContactInfo::is_valid_address(&node.info.tpu, socket_addr_space) { - udp_sockets.extend(node.sockets.tpu.iter()); - } - if ContactInfo::is_valid_address(&node.info.tpu_forwards, socket_addr_space) { - udp_sockets.extend(node.sockets.tpu_forwards.iter()); - } - if ContactInfo::is_valid_address(&node.info.tpu_vote, socket_addr_space) { - udp_sockets.extend(node.sockets.tpu_vote.iter()); - } - if ContactInfo::is_valid_address(&node.info.tvu, socket_addr_space) { - udp_sockets.extend(node.sockets.tvu.iter()); - udp_sockets.extend(node.sockets.broadcast.iter()); - udp_sockets.extend(node.sockets.retransmit_sockets.iter()); - } - if ContactInfo::is_valid_address(&node.info.tvu_forwards, socket_addr_space) { - udp_sockets.extend(node.sockets.tvu_forwards.iter()); - } +mod without_incremental_snapshots { + use super::*; - let mut tcp_listeners = vec![]; - if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs { - for (purpose, bind_addr, public_addr) in &[ - ("RPC", rpc_addr, &node.info.rpc), - ("RPC pubsub", rpc_pubsub_addr, &node.info.rpc_pubsub), - ] { - if ContactInfo::is_valid_address(public_addr, socket_addr_space) { - tcp_listeners.push(( - bind_addr.port(), - TcpListener::bind(bind_addr).unwrap_or_else(|err| { - error!( - "Unable to bind to tcp {:?} for {}: {}", - bind_addr, purpose, err - ); - exit(1); - }), - )); + #[allow(clippy::too_many_arguments)] + pub fn rpc_bootstrap( + node: &Node, + identity_keypair: &Arc, + ledger_path: &Path, + snapshot_archives_dir: &Path, + vote_account: &Pubkey, + authorized_voter_keypairs: Arc>>>, + cluster_entrypoints: &[ContactInfo], + validator_config: &mut ValidatorConfig, + bootstrap_config: RpcBootstrapConfig, + no_port_check: bool, + use_progress_bar: bool, + maximum_local_snapshot_age: Slot, + should_check_duplicate_instance: bool, + start_progress: &Arc>, + minimal_snapshot_download_speed: f32, + maximum_snapshot_download_abort: u64, + socket_addr_space: SocketAddrSpace, + ) { + if !no_port_check { + let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect(); + order.shuffle(&mut thread_rng()); + if order.into_iter().all(|i| { + !verify_reachable_ports( + node, + &cluster_entrypoints[i], + validator_config, + &socket_addr_space, + ) + }) { + exit(1); } } - } - if let Some(ip_echo) = &node.sockets.ip_echo { - let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener"); - tcp_listeners.push((ip_echo.local_addr().unwrap().port(), ip_echo)); - } - - solana_net_utils::verify_reachable_ports( - &cluster_entrypoint.gossip, - tcp_listeners, - &udp_sockets, - ) -} - -pub struct RpcBootstrapConfig { - pub no_genesis_fetch: bool, - pub no_snapshot_fetch: bool, - pub no_untrusted_rpc: bool, - pub max_genesis_archive_unpacked_size: u64, - pub no_check_vote_account: bool, -} - -impl Default for RpcBootstrapConfig { - fn default() -> Self { - Self { - no_genesis_fetch: true, - no_snapshot_fetch: true, - no_untrusted_rpc: true, - max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, - no_check_vote_account: true, - } - } -} - -#[allow(clippy::too_many_arguments)] -pub fn rpc_bootstrap( - node: &Node, - identity_keypair: &Arc, - ledger_path: &Path, - snapshot_archives_dir: &Path, - vote_account: &Pubkey, - authorized_voter_keypairs: Arc>>>, - cluster_entrypoints: &[ContactInfo], - validator_config: &mut ValidatorConfig, - bootstrap_config: RpcBootstrapConfig, - no_port_check: bool, - use_progress_bar: bool, - maximum_local_snapshot_age: Slot, - should_check_duplicate_instance: bool, - start_progress: &Arc>, - minimal_snapshot_download_speed: f32, - maximum_snapshot_download_abort: u64, - socket_addr_space: SocketAddrSpace, -) { - if !no_port_check { - let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect(); - order.shuffle(&mut thread_rng()); - if order.into_iter().all(|i| { - !verify_reachable_ports( - node, - &cluster_entrypoints[i], - validator_config, - &socket_addr_space, - ) - }) { - exit(1); - } - } - - if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch { - return; - } - - let mut blacklisted_rpc_nodes = HashSet::new(); - let mut gossip = None; - let mut download_abort_count = 0; - loop { - if gossip.is_none() { - *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService; - - gossip = Some(start_gossip_node( - identity_keypair.clone(), - cluster_entrypoints, - ledger_path, - &node.info.gossip, - node.sockets.gossip.try_clone().unwrap(), - validator_config.expected_shred_version, - validator_config.gossip_validators.clone(), - should_check_duplicate_instance, - socket_addr_space, - )); - } - - let rpc_node_details = get_rpc_node( - &gossip.as_ref().unwrap().0, - cluster_entrypoints, - validator_config, - &mut blacklisted_rpc_nodes, - bootstrap_config.no_snapshot_fetch, - bootstrap_config.no_untrusted_rpc, - snapshot_archives_dir, - ); - if rpc_node_details.is_none() { + if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch { return; } - let (rpc_contact_info, snapshot_hash) = rpc_node_details.unwrap(); - info!( - "Using RPC service from node {}: {:?}", - rpc_contact_info.id, rpc_contact_info.rpc - ); - let rpc_client = RpcClient::new_socket(rpc_contact_info.rpc); + let mut blacklisted_rpc_nodes = HashSet::new(); + let mut gossip = None; + let mut download_abort_count = 0; + loop { + if gossip.is_none() { + *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService; - let result = match rpc_client.get_version() { + gossip = Some(start_gossip_node( + identity_keypair.clone(), + cluster_entrypoints, + ledger_path, + &node.info.gossip, + node.sockets.gossip.try_clone().unwrap(), + validator_config.expected_shred_version, + validator_config.gossip_validators.clone(), + should_check_duplicate_instance, + socket_addr_space, + )); + } + + let rpc_node_details = get_rpc_node( + &gossip.as_ref().unwrap().0, + cluster_entrypoints, + validator_config, + &mut blacklisted_rpc_nodes, + bootstrap_config.no_snapshot_fetch, + bootstrap_config.no_untrusted_rpc, + snapshot_archives_dir, + ); + if rpc_node_details.is_none() { + return; + } + let (rpc_contact_info, snapshot_hash) = rpc_node_details.unwrap(); + + info!( + "Using RPC service from node {}: {:?}", + rpc_contact_info.id, rpc_contact_info.rpc + ); + let rpc_client = RpcClient::new_socket(rpc_contact_info.rpc); + + let result = match rpc_client.get_version() { Ok(rpc_version) => { info!("RPC node version: {}", rpc_version.solana_core); Ok(()) @@ -686,52 +605,166 @@ pub fn rpc_bootstrap( } }); - if result.is_ok() { - break; - } - warn!("{}", result.unwrap_err()); + if result.is_ok() { + break; + } + warn!("{}", result.unwrap_err()); - if let Some(ref trusted_validators) = validator_config.trusted_validators { - if trusted_validators.contains(&rpc_contact_info.id) { - continue; // Never blacklist a trusted node + if let Some(ref trusted_validators) = validator_config.trusted_validators { + if trusted_validators.contains(&rpc_contact_info.id) { + continue; // Never blacklist a trusted node + } + } + + info!( + "Excluding {} as a future RPC candidate", + rpc_contact_info.id + ); + blacklisted_rpc_nodes.insert(rpc_contact_info.id); + } + if let Some((cluster_info, gossip_exit_flag, gossip_service)) = gossip.take() { + cluster_info.save_contact_info(); + gossip_exit_flag.store(true, Ordering::Relaxed); + gossip_service.join().unwrap(); + } + } + + fn get_rpc_node( + cluster_info: &ClusterInfo, + cluster_entrypoints: &[ContactInfo], + validator_config: &ValidatorConfig, + blacklisted_rpc_nodes: &mut HashSet, + snapshot_not_required: bool, + no_untrusted_rpc: bool, + snapshot_archives_dir: &Path, + ) -> Option<(ContactInfo, Option<(Slot, Hash)>)> { + let mut blacklist_timeout = Instant::now(); + let mut newer_cluster_snapshot_timeout = None; + let mut retry_reason = None; + loop { + sleep(Duration::from_secs(1)); + info!("\n{}", cluster_info.rpc_info_trace()); + + let rpc_peers = get_rpc_peers( + cluster_info, + cluster_entrypoints, + validator_config, + blacklisted_rpc_nodes, + &blacklist_timeout, + &mut retry_reason, + ); + if rpc_peers.is_none() { + continue; + } + let rpc_peers = rpc_peers.unwrap(); + blacklist_timeout = Instant::now(); + + let mut highest_snapshot_hash = get_highest_local_snapshot_hash(snapshot_archives_dir); + let eligible_rpc_peers = if snapshot_not_required { + rpc_peers + } else { + let trusted_snapshot_hashes = + get_trusted_snapshot_hashes(cluster_info, &validator_config.trusted_validators); + + let mut eligible_rpc_peers = vec![]; + + for rpc_peer in rpc_peers.iter() { + if no_untrusted_rpc + && !is_trusted_validator(&rpc_peer.id, &validator_config.trusted_validators) + { + continue; + } + cluster_info.get_snapshot_hash_for_node(&rpc_peer.id, |snapshot_hashes| { + for snapshot_hash in snapshot_hashes { + if let Some(ref trusted_snapshot_hashes) = trusted_snapshot_hashes { + if !trusted_snapshot_hashes.contains(snapshot_hash) { + // Ignore all untrusted snapshot hashes + continue; + } + } + + if highest_snapshot_hash.is_none() + || snapshot_hash.0 > highest_snapshot_hash.unwrap().0 + { + // Found a higher snapshot, remove all nodes with a lower snapshot + eligible_rpc_peers.clear(); + highest_snapshot_hash = Some(*snapshot_hash) + } + + if Some(*snapshot_hash) == highest_snapshot_hash { + eligible_rpc_peers.push(rpc_peer.clone()); + } + } + }); + } + + match highest_snapshot_hash { + None => { + assert!(eligible_rpc_peers.is_empty()); + } + Some(highest_snapshot_hash) => { + if eligible_rpc_peers.is_empty() { + match newer_cluster_snapshot_timeout { + None => newer_cluster_snapshot_timeout = Some(Instant::now()), + Some(newer_cluster_snapshot_timeout) => { + if newer_cluster_snapshot_timeout.elapsed().as_secs() > 180 { + warn!("giving up newer snapshot from the cluster"); + return None; + } + } + } + retry_reason = Some(format!( + "Wait for newer snapshot than local: {:?}", + highest_snapshot_hash + )); + continue; + } + + info!( + "Highest available snapshot slot is {}, available from {} node{}: {:?}", + highest_snapshot_hash.0, + eligible_rpc_peers.len(), + if eligible_rpc_peers.len() > 1 { + "s" + } else { + "" + }, + eligible_rpc_peers + .iter() + .map(|contact_info| contact_info.id) + .collect::>() + ); + } + } + eligible_rpc_peers + }; + + if !eligible_rpc_peers.is_empty() { + let contact_info = + &eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())]; + return Some((contact_info.clone(), highest_snapshot_hash)); + } else { + retry_reason = Some("No snapshots available".to_owned()); } } - - info!( - "Excluding {} as a future RPC candidate", - rpc_contact_info.id - ); - blacklisted_rpc_nodes.insert(rpc_contact_info.id); } - if let Some((cluster_info, gossip_exit_flag, gossip_service)) = gossip.take() { - cluster_info.save_contact_info(); - gossip_exit_flag.store(true, Ordering::Relaxed); - gossip_service.join().unwrap(); - } -} -/// Get the Slot and Hash of the local snapshot with the highest slot. Can be either a full -/// snapshot or an incremental snapshot. -fn get_highest_local_snapshot_hash( - snapshot_archives_dir: impl AsRef, -) -> Option<(Slot, Hash)> { - if let Some(full_snapshot_info) = - snapshot_utils::get_highest_full_snapshot_archive_info(&snapshot_archives_dir) - { - if let Some(incremental_snapshot_info) = - snapshot_utils::get_highest_incremental_snapshot_archive_info( - &snapshot_archives_dir, - full_snapshot_info.slot(), - ) - { - Some(( - incremental_snapshot_info.slot(), - *incremental_snapshot_info.hash(), - )) + fn get_trusted_snapshot_hashes( + cluster_info: &ClusterInfo, + trusted_validators: &Option>, + ) -> Option> { + if let Some(trusted_validators) = trusted_validators { + let mut trusted_snapshot_hashes = HashSet::new(); + for trusted_validator in trusted_validators { + cluster_info.get_snapshot_hash_for_node(trusted_validator, |snapshot_hashes| { + for snapshot_hash in snapshot_hashes { + trusted_snapshot_hashes.insert(*snapshot_hash); + } + }); + } + Some(trusted_snapshot_hashes) } else { - Some((full_snapshot_info.slot(), *full_snapshot_info.hash())) + None } - } else { - None } }