diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index 3a91a80e5..d047cc2c5 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -51,90 +51,6 @@ pub struct RpcBootstrapConfig { pub incremental_snapshot_fetch: bool, } -#[allow(clippy::too_many_arguments)] -pub fn rpc_bootstrap( - node: &Node, - identity_keypair: &Arc, - ledger_path: &Path, - full_snapshot_archives_dir: &Path, - incremental_snapshot_archives_dir: &Path, - vote_account: &Pubkey, - authorized_voter_keypairs: Arc>>>, - cluster_entrypoints: &[ContactInfo], - validator_config: &mut ValidatorConfig, - bootstrap_config: RpcBootstrapConfig, - do_port_check: bool, - use_progress_bar: bool, - maximum_local_snapshot_age: Slot, - should_check_duplicate_instance: bool, - start_progress: &Arc>, - minimal_snapshot_download_speed: f32, - maximum_snapshot_download_abort: u64, - socket_addr_space: SocketAddrSpace, -) { - if do_port_check { - let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect(); - order.shuffle(&mut thread_rng()); - if order.into_iter().all(|i| { - !verify_reachable_ports( - node, - &cluster_entrypoints[i], - validator_config, - &socket_addr_space, - ) - }) { - exit(1); - } - } - - if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch { - return; - } - - if bootstrap_config.incremental_snapshot_fetch { - info!("rpc_bootstrap with incremental snapshot fetch"); - with_incremental_snapshots::rpc_bootstrap( - node, - identity_keypair, - ledger_path, - full_snapshot_archives_dir, - incremental_snapshot_archives_dir, - vote_account, - authorized_voter_keypairs, - cluster_entrypoints, - validator_config, - bootstrap_config, - use_progress_bar, - maximum_local_snapshot_age, - should_check_duplicate_instance, - start_progress, - minimal_snapshot_download_speed, - maximum_snapshot_download_abort, - socket_addr_space, - ) - } else { - info!("rpc_bootstrap without incremental snapshot fetch"); - without_incremental_snapshots::rpc_bootstrap( - node, - identity_keypair, - ledger_path, - full_snapshot_archives_dir, - vote_account, - authorized_voter_keypairs, - cluster_entrypoints, - validator_config, - bootstrap_config, - use_progress_bar, - maximum_local_snapshot_age, - should_check_duplicate_instance, - start_progress, - minimal_snapshot_download_speed, - maximum_snapshot_download_abort, - socket_addr_space, - ) - } -} - fn verify_reachable_ports( node: &Node, cluster_entrypoint: &ContactInfo, @@ -387,68 +303,112 @@ fn check_vote_account( Ok(()) } -mod without_incremental_snapshots { - use super::*; +/// Struct to wrap the return value from get_rpc_node(). The `rpc_contact_info` is the peer to +/// download from, and `snapshot_hash` is the (optional) full and (optional) incremental +/// snapshots to download. +#[derive(Debug)] +struct GetRpcNodeResult { + rpc_contact_info: ContactInfo, + snapshot_hash: Option, +} - #[allow(clippy::too_many_arguments)] - pub fn rpc_bootstrap( - node: &Node, - identity_keypair: &Arc, - ledger_path: &Path, - full_snapshot_archives_dir: &Path, - vote_account: &Pubkey, - authorized_voter_keypairs: Arc>>>, - cluster_entrypoints: &[ContactInfo], - validator_config: &mut ValidatorConfig, - bootstrap_config: RpcBootstrapConfig, - use_progress_bar: bool, - maximum_local_snapshot_age: Slot, - should_check_duplicate_instance: bool, - start_progress: &Arc>, - minimal_snapshot_download_speed: f32, - maximum_snapshot_download_abort: u64, - socket_addr_space: SocketAddrSpace, - ) { - let mut blacklisted_rpc_nodes = HashSet::new(); - let mut gossip = None; - let mut download_abort_count = 0; - loop { - if gossip.is_none() { - *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService; +/// Struct to wrap the peers & snapshot hashes together. +#[derive(Debug, PartialEq, Eq, Clone)] +struct PeerSnapshotHash { + rpc_contact_info: ContactInfo, + snapshot_hash: SnapshotHash, +} - gossip = Some(start_gossip_node( - identity_keypair.clone(), - cluster_entrypoints, - ledger_path, - &node.info.gossip, - node.sockets.gossip.try_clone().unwrap(), - validator_config.expected_shred_version, - validator_config.gossip_validators.clone(), - should_check_duplicate_instance, - socket_addr_space, - )); - } +/// A snapshot hash. In this context (bootstrap *with* incremental snapshots), a snapshot hash +/// is _both_ a full snapshot hash and an (optional) incremental snapshot hash. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +struct SnapshotHash { + full: (Slot, Hash), + incr: Option<(Slot, Hash)>, +} - let rpc_node_details = get_rpc_node( - &gossip.as_ref().unwrap().0, - cluster_entrypoints, +#[allow(clippy::too_many_arguments)] +pub fn rpc_bootstrap( + node: &Node, + identity_keypair: &Arc, + ledger_path: &Path, + full_snapshot_archives_dir: &Path, + incremental_snapshot_archives_dir: &Path, + vote_account: &Pubkey, + authorized_voter_keypairs: Arc>>>, + cluster_entrypoints: &[ContactInfo], + validator_config: &mut ValidatorConfig, + bootstrap_config: RpcBootstrapConfig, + do_port_check: bool, + use_progress_bar: bool, + maximum_local_snapshot_age: Slot, + should_check_duplicate_instance: bool, + start_progress: &Arc>, + minimal_snapshot_download_speed: f32, + maximum_snapshot_download_abort: u64, + socket_addr_space: SocketAddrSpace, +) { + if do_port_check { + let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect(); + order.shuffle(&mut thread_rng()); + if order.into_iter().all(|i| { + !verify_reachable_ports( + node, + &cluster_entrypoints[i], validator_config, - &mut blacklisted_rpc_nodes, - &bootstrap_config, - full_snapshot_archives_dir, - ); - if rpc_node_details.is_none() { - return; - } - let (rpc_contact_info, snapshot_hash) = rpc_node_details.unwrap(); + &socket_addr_space, + ) + }) { + exit(1); + } + } - info!( - "Using RPC service from node {}: {:?}", - rpc_contact_info.id, rpc_contact_info.rpc - ); - let rpc_client = RpcClient::new_socket(rpc_contact_info.rpc); + if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch { + return; + } - let result = match rpc_client.get_version() { + let mut blacklisted_rpc_nodes = HashSet::new(); + let mut gossip = None; + let mut download_abort_count = 0; + loop { + if gossip.is_none() { + *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService; + + gossip = Some(start_gossip_node( + identity_keypair.clone(), + cluster_entrypoints, + ledger_path, + &node.info.gossip, + node.sockets.gossip.try_clone().unwrap(), + validator_config.expected_shred_version, + validator_config.gossip_validators.clone(), + should_check_duplicate_instance, + socket_addr_space, + )); + } + + let rpc_node_details = get_rpc_node( + &gossip.as_ref().unwrap().0, + cluster_entrypoints, + validator_config, + &mut blacklisted_rpc_nodes, + &bootstrap_config, + ); + if rpc_node_details.is_none() { + return; + } + let GetRpcNodeResult { + rpc_contact_info, + snapshot_hash, + } = rpc_node_details.unwrap(); + + info!( + "Using RPC service from node {}: {:?}", + rpc_contact_info.id, rpc_contact_info.rpc + ); + let rpc_client = RpcClient::new_socket(rpc_contact_info.rpc); + + let result = match rpc_client.get_version() { Ok(rpc_version) => { info!("RPC node version: {}", rpc_version.solana_core); Ok(()) @@ -488,106 +448,30 @@ mod without_incremental_snapshots { } } - if let Some(snapshot_hash) = snapshot_hash { - let use_local_snapshot = match get_highest_local_snapshot_hash(full_snapshot_archives_dir) { - None => { - info!("Downloading snapshot for slot {} since there is not a local snapshot", snapshot_hash.0); - false - } - Some((highest_local_snapshot_slot, _hash)) => { - if highest_local_snapshot_slot - > snapshot_hash.0.saturating_sub(maximum_local_snapshot_age) - { - info!( - "Reusing local snapshot at slot {} instead \ - of downloading a snapshot for slot {}", - highest_local_snapshot_slot, snapshot_hash.0 - ); - true - } else { - info!( - "Local snapshot from slot {} is too old. \ - Downloading a newer snapshot for slot {}", - highest_local_snapshot_slot, snapshot_hash.0 - ); - false - } - } - }; + let (cluster_info, gossip_exit_flag, gossip_service) = gossip.take().unwrap(); + cluster_info.save_contact_info(); + gossip_exit_flag.store(true, Ordering::Relaxed); + gossip_service.join().unwrap(); - if use_local_snapshot { - Ok(()) - } else { - rpc_client - .get_slot_with_commitment(CommitmentConfig::finalized()) - .map_err(|err| format!("Failed to get RPC node slot: {}", err)) - .and_then(|slot| { - *start_progress.write().unwrap() = - ValidatorStartProgress::DownloadingSnapshot { - slot: snapshot_hash.0, - rpc_addr: rpc_contact_info.rpc, - }; - info!("RPC node root slot: {}", slot); - let (cluster_info, gossip_exit_flag, gossip_service) = - gossip.take().unwrap(); - cluster_info.save_contact_info(); - gossip_exit_flag.store(true, Ordering::Relaxed); - let (maximum_full_snapshot_archives_to_retain, maximum_incremental_snapshot_archives_to_retain, incremental_snapshot_archives_dir) = if let Some(snapshot_config) = - validator_config.snapshot_config.as_ref() - { - (snapshot_config.maximum_full_snapshot_archives_to_retain, snapshot_config.maximum_incremental_snapshot_archives_to_retain, snapshot_config.incremental_snapshot_archives_dir.as_path()) - } else { - (DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN, full_snapshot_archives_dir) - }; - let ret = download_snapshot_archive( - &rpc_contact_info.rpc, - full_snapshot_archives_dir, - incremental_snapshot_archives_dir, - snapshot_hash, - SnapshotType::FullSnapshot, - maximum_full_snapshot_archives_to_retain, - maximum_incremental_snapshot_archives_to_retain, - use_progress_bar, - &mut Some(Box::new(|download_progress: &DownloadProgressRecord| { - debug!("Download progress: {:?}", download_progress); + let rpc_client_slot = rpc_client + .get_slot_with_commitment(CommitmentConfig::finalized()) + .map_err(|err| format!("Failed to get RPC node slot: {}", err))?; + info!("RPC node root slot: {}", rpc_client_slot); - if download_progress.last_throughput < minimal_snapshot_download_speed - && download_progress.notification_count <= 1 - && download_progress.percentage_done <= 2_f32 - && download_progress.estimated_remaining_time > 60_f32 - && download_abort_count < maximum_snapshot_download_abort { - if let Some(ref known_validators) = validator_config.known_validators { - if known_validators.contains(&rpc_contact_info.id) - && known_validators.len() == 1 - && bootstrap_config.only_known_rpc { - warn!("The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, but will NOT abort \ - and try a different node as it is the only known validator and the --only-known-rpc flag \ - is set. \ - Abort count: {}, Progress detail: {:?}", - download_progress.last_throughput, minimal_snapshot_download_speed, - download_abort_count, download_progress); - return true; // Do not abort download from the one-and-only known validator - } - } - warn!("The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, will abort \ - and try a different node. Abort count: {}, Progress detail: {:?}", - download_progress.last_throughput, minimal_snapshot_download_speed, - download_abort_count, download_progress); - download_abort_count += 1; - false - } else { - true - } - })), - ); - - gossip_service.join().unwrap(); - ret - }) - } - } else { - Ok(()) - } + download_snapshots( + full_snapshot_archives_dir, + incremental_snapshot_archives_dir, + validator_config, + &bootstrap_config, + use_progress_bar, + maximum_local_snapshot_age, + start_progress, + minimal_snapshot_download_speed, + maximum_snapshot_download_abort, + &mut download_abort_count, + snapshot_hash, + &rpc_contact_info, + ) }) .map(|_| { if let Some(url) = bootstrap_config.check_vote_account.as_ref() { @@ -616,629 +500,291 @@ mod without_incremental_snapshots { } }); - if result.is_ok() { - break; - } - warn!("{}", result.unwrap_err()); - - if let Some(ref known_validators) = validator_config.known_validators { - if known_validators.contains(&rpc_contact_info.id) { - continue; // Never blacklist a known node - } - } - - info!( - "Excluding {} as a future RPC candidate", - rpc_contact_info.id - ); - blacklisted_rpc_nodes.insert(rpc_contact_info.id); + if result.is_ok() { + break; } - if let Some((cluster_info, gossip_exit_flag, gossip_service)) = gossip.take() { - cluster_info.save_contact_info(); - gossip_exit_flag.store(true, Ordering::Relaxed); - gossip_service.join().unwrap(); + warn!("{}", result.unwrap_err()); + + if let Some(ref known_validators) = validator_config.known_validators { + if known_validators.contains(&rpc_contact_info.id) { + continue; // Never blacklist a known node + } } + + info!( + "Excluding {} as a future RPC candidate", + rpc_contact_info.id + ); + blacklisted_rpc_nodes.insert(rpc_contact_info.id); } - - fn get_rpc_node( - cluster_info: &ClusterInfo, - cluster_entrypoints: &[ContactInfo], - validator_config: &ValidatorConfig, - blacklisted_rpc_nodes: &mut HashSet, - bootstrap_config: &RpcBootstrapConfig, - full_snapshot_archives_dir: &Path, - ) -> Option<(ContactInfo, Option<(Slot, Hash)>)> { - let mut blacklist_timeout = Instant::now(); - let mut newer_cluster_snapshot_timeout = None; - let mut retry_reason = None; - loop { - sleep(Duration::from_secs(1)); - info!("\n{}", cluster_info.rpc_info_trace()); - - let rpc_peers = get_rpc_peers( - cluster_info, - cluster_entrypoints, - validator_config, - blacklisted_rpc_nodes, - &blacklist_timeout, - &mut retry_reason, - bootstrap_config, - ); - if rpc_peers.is_none() { - continue; - } - let rpc_peers = rpc_peers.unwrap(); - blacklist_timeout = Instant::now(); - - let mut highest_snapshot_hash = - get_highest_local_snapshot_hash(full_snapshot_archives_dir); - let eligible_rpc_peers = if bootstrap_config.no_snapshot_fetch { - rpc_peers - } else { - let known_snapshot_hashes = - get_known_snapshot_hashes(cluster_info, &validator_config.known_validators); - - let mut eligible_rpc_peers = vec![]; - - for rpc_peer in rpc_peers.iter() { - cluster_info.get_snapshot_hash_for_node(&rpc_peer.id, |snapshot_hashes| { - for snapshot_hash in snapshot_hashes { - if let Some(ref known_snapshot_hashes) = known_snapshot_hashes { - if !known_snapshot_hashes.contains(snapshot_hash) { - // Ignore all unknown snapshot hashes - continue; - } - } - - if highest_snapshot_hash.is_none() - || snapshot_hash.0 > highest_snapshot_hash.unwrap().0 - { - // Found a higher snapshot, remove all nodes with a lower snapshot - eligible_rpc_peers.clear(); - highest_snapshot_hash = Some(*snapshot_hash) - } - - if Some(*snapshot_hash) == highest_snapshot_hash { - eligible_rpc_peers.push(rpc_peer.clone()); - } - } - }); - } - - match highest_snapshot_hash { - None => { - assert!(eligible_rpc_peers.is_empty()); - } - Some(highest_snapshot_hash) => { - if eligible_rpc_peers.is_empty() { - match newer_cluster_snapshot_timeout { - None => newer_cluster_snapshot_timeout = Some(Instant::now()), - Some(newer_cluster_snapshot_timeout) => { - if newer_cluster_snapshot_timeout.elapsed().as_secs() > 180 { - warn!("giving up newer snapshot from the cluster"); - return None; - } - } - } - retry_reason = Some(format!( - "Wait for newer snapshot than local: {:?}", - highest_snapshot_hash - )); - continue; - } - - info!( - "Highest available snapshot slot is {}, available from {} node{}: {:?}", - highest_snapshot_hash.0, - eligible_rpc_peers.len(), - if eligible_rpc_peers.len() > 1 { - "s" - } else { - "" - }, - eligible_rpc_peers - .iter() - .map(|contact_info| contact_info.id) - .collect::>() - ); - } - } - eligible_rpc_peers - }; - - if !eligible_rpc_peers.is_empty() { - let contact_info = - &eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())]; - return Some((contact_info.clone(), highest_snapshot_hash)); - } else { - retry_reason = Some("No snapshots available".to_owned()); - } - } - } - - fn get_known_snapshot_hashes( - cluster_info: &ClusterInfo, - known_validators: &Option>, - ) -> Option> { - if let Some(known_validators) = known_validators { - let mut known_snapshot_hashes = HashSet::new(); - for known_validator in known_validators { - cluster_info.get_snapshot_hash_for_node(known_validator, |snapshot_hashes| { - for snapshot_hash in snapshot_hashes { - known_snapshot_hashes.insert(*snapshot_hash); - } - }); - } - Some(known_snapshot_hashes) - } else { - None - } - } - - /// Get the Slot and Hash of the local snapshot with the highest slot. - fn get_highest_local_snapshot_hash( - full_snapshot_archives_dir: impl AsRef, - ) -> Option<(Slot, Hash)> { - snapshot_utils::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir) - .map(|full_snapshot_info| (full_snapshot_info.slot(), *full_snapshot_info.hash())) + if let Some((cluster_info, gossip_exit_flag, gossip_service)) = gossip.take() { + cluster_info.save_contact_info(); + gossip_exit_flag.store(true, Ordering::Relaxed); + gossip_service.join().unwrap(); } } -mod with_incremental_snapshots { - use super::*; +/// Get an RPC peer node to download from. +/// +/// This function finds the highest compatible snapshots from the cluster, then picks one peer +/// at random to use (return). +fn get_rpc_node( + cluster_info: &ClusterInfo, + cluster_entrypoints: &[ContactInfo], + validator_config: &ValidatorConfig, + blacklisted_rpc_nodes: &mut HashSet, + bootstrap_config: &RpcBootstrapConfig, +) -> Option { + let mut blacklist_timeout = Instant::now(); + let mut newer_cluster_snapshot_timeout = None; + let mut retry_reason = None; + loop { + sleep(Duration::from_secs(1)); + info!("\n{}", cluster_info.rpc_info_trace()); - /// Struct to wrap the return value from get_rpc_node(). The `rpc_contact_info` is the peer to - /// download from, and `snapshot_hash` is the (optional) full and (optional) incremental - /// snapshots to download. - #[derive(Debug)] - struct GetRpcNodeResult { - rpc_contact_info: ContactInfo, - snapshot_hash: Option, - } - - /// Struct to wrap the peers & snapshot hashes together. - #[derive(Debug, PartialEq, Eq, Clone)] - struct PeerSnapshotHash { - rpc_contact_info: ContactInfo, - snapshot_hash: SnapshotHash, - } - - /// A snapshot hash. In this context (bootstrap *with* incremental snapshots), a snapshot hash - /// is _both_ a full snapshot hash and an (optional) incremental snapshot hash. - #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] - struct SnapshotHash { - full: (Slot, Hash), - incr: Option<(Slot, Hash)>, - } - - #[allow(clippy::too_many_arguments)] - pub fn rpc_bootstrap( - node: &Node, - identity_keypair: &Arc, - ledger_path: &Path, - full_snapshot_archives_dir: &Path, - incremental_snapshot_archives_dir: &Path, - vote_account: &Pubkey, - authorized_voter_keypairs: Arc>>>, - cluster_entrypoints: &[ContactInfo], - validator_config: &mut ValidatorConfig, - bootstrap_config: RpcBootstrapConfig, - use_progress_bar: bool, - maximum_local_snapshot_age: Slot, - should_check_duplicate_instance: bool, - start_progress: &Arc>, - minimal_snapshot_download_speed: f32, - maximum_snapshot_download_abort: u64, - socket_addr_space: SocketAddrSpace, - ) { - let mut blacklisted_rpc_nodes = HashSet::new(); - let mut gossip = None; - let mut download_abort_count = 0; - loop { - if gossip.is_none() { - *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService; - - gossip = Some(start_gossip_node( - identity_keypair.clone(), - cluster_entrypoints, - ledger_path, - &node.info.gossip, - node.sockets.gossip.try_clone().unwrap(), - validator_config.expected_shred_version, - validator_config.gossip_validators.clone(), - should_check_duplicate_instance, - socket_addr_space, - )); - } - - let rpc_node_details = get_rpc_node( - &gossip.as_ref().unwrap().0, - cluster_entrypoints, - validator_config, - &mut blacklisted_rpc_nodes, - &bootstrap_config, - ); - if rpc_node_details.is_none() { - return; - } - let GetRpcNodeResult { - rpc_contact_info, - snapshot_hash, - } = rpc_node_details.unwrap(); - - info!( - "Using RPC service from node {}: {:?}", - rpc_contact_info.id, rpc_contact_info.rpc - ); - let rpc_client = RpcClient::new_socket(rpc_contact_info.rpc); - - let result = match rpc_client.get_version() { - Ok(rpc_version) => { - info!("RPC node version: {}", rpc_version.solana_core); - Ok(()) - } - Err(err) => Err(format!("Failed to get RPC node version: {}", err)), - } - .and_then(|_| { - let genesis_config = download_then_check_genesis_hash( - &rpc_contact_info.rpc, - ledger_path, - validator_config.expected_genesis_hash, - bootstrap_config.max_genesis_archive_unpacked_size, - bootstrap_config.no_genesis_fetch, - use_progress_bar, - ); - - if let Ok(genesis_config) = genesis_config { - let genesis_hash = genesis_config.hash(); - if validator_config.expected_genesis_hash.is_none() { - info!("Expected genesis hash set to {}", genesis_hash); - validator_config.expected_genesis_hash = Some(genesis_hash); - } - } - - if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash { - // Sanity check that the RPC node is using the expected genesis hash before - // downloading a snapshot from it - let rpc_genesis_hash = rpc_client - .get_genesis_hash() - .map_err(|err| format!("Failed to get genesis hash: {}", err))?; - - if expected_genesis_hash != rpc_genesis_hash { - return Err(format!( - "Genesis hash mismatch: expected {} but RPC node genesis hash is {}", - expected_genesis_hash, rpc_genesis_hash - )); - } - } - - let (cluster_info, gossip_exit_flag, gossip_service) = gossip.take().unwrap(); - cluster_info.save_contact_info(); - gossip_exit_flag.store(true, Ordering::Relaxed); - gossip_service.join().unwrap(); - - let rpc_client_slot = rpc_client - .get_slot_with_commitment(CommitmentConfig::finalized()) - .map_err(|err| format!("Failed to get RPC node slot: {}", err))?; - info!("RPC node root slot: {}", rpc_client_slot); - - download_snapshots( - full_snapshot_archives_dir, - incremental_snapshot_archives_dir, - validator_config, - &bootstrap_config, - use_progress_bar, - maximum_local_snapshot_age, - start_progress, - minimal_snapshot_download_speed, - maximum_snapshot_download_abort, - &mut download_abort_count, - snapshot_hash, - &rpc_contact_info, - ) - }) - .map(|_| { - if let Some(url) = bootstrap_config.check_vote_account.as_ref() { - let rpc_client = RpcClient::new(url); - check_vote_account( - &rpc_client, - &identity_keypair.pubkey(), - vote_account, - &authorized_voter_keypairs - .read() - .unwrap() - .iter() - .map(|k| k.pubkey()) - .collect::>(), - ) - .unwrap_or_else(|err| { - // Consider failures here to be more likely due to user error (eg, - // incorrect `solana-validator` command-line arguments) rather than the - // RPC node failing. - // - // Power users can always use the `--no-check-vote-account` option to - // bypass this check entirely - error!("{}", err); - exit(1); - }); - } - }); - - if result.is_ok() { - break; - } - warn!("{}", result.unwrap_err()); - - if let Some(ref known_validators) = validator_config.known_validators { - if known_validators.contains(&rpc_contact_info.id) { - continue; // Never blacklist a known node - } - } - - info!( - "Excluding {} as a future RPC candidate", - rpc_contact_info.id - ); - blacklisted_rpc_nodes.insert(rpc_contact_info.id); + let rpc_peers = get_rpc_peers( + cluster_info, + cluster_entrypoints, + validator_config, + blacklisted_rpc_nodes, + &blacklist_timeout, + &mut retry_reason, + bootstrap_config, + ); + if rpc_peers.is_none() { + continue; } - if let Some((cluster_info, gossip_exit_flag, gossip_service)) = gossip.take() { - cluster_info.save_contact_info(); - gossip_exit_flag.store(true, Ordering::Relaxed); - gossip_service.join().unwrap(); - } - } + let rpc_peers = rpc_peers.unwrap(); + blacklist_timeout = Instant::now(); - /// Get an RPC peer node to download from. - /// - /// This function finds the highest compatible snapshots from the cluster, then picks one peer - /// at random to use (return). - fn get_rpc_node( - cluster_info: &ClusterInfo, - cluster_entrypoints: &[ContactInfo], - validator_config: &ValidatorConfig, - blacklisted_rpc_nodes: &mut HashSet, - bootstrap_config: &RpcBootstrapConfig, - ) -> Option { - let mut blacklist_timeout = Instant::now(); - let mut newer_cluster_snapshot_timeout = None; - let mut retry_reason = None; - loop { - sleep(Duration::from_secs(1)); - info!("\n{}", cluster_info.rpc_info_trace()); - - let rpc_peers = get_rpc_peers( - cluster_info, - cluster_entrypoints, - validator_config, - blacklisted_rpc_nodes, - &blacklist_timeout, - &mut retry_reason, - bootstrap_config, - ); - if rpc_peers.is_none() { - continue; - } - let rpc_peers = rpc_peers.unwrap(); - blacklist_timeout = Instant::now(); - - if bootstrap_config.no_snapshot_fetch { - if rpc_peers.is_empty() { - retry_reason = Some("No RPC peers available.".to_owned()); - continue; - } else { - let random_peer = &rpc_peers[thread_rng().gen_range(0, rpc_peers.len())]; - return Some(GetRpcNodeResult { - rpc_contact_info: random_peer.clone(), - snapshot_hash: None, - }); - } - } - - let peer_snapshot_hashes = - get_peer_snapshot_hashes(cluster_info, validator_config, &rpc_peers); - - if peer_snapshot_hashes.is_empty() { - match newer_cluster_snapshot_timeout { - None => newer_cluster_snapshot_timeout = Some(Instant::now()), - Some(newer_cluster_snapshot_timeout) => { - if newer_cluster_snapshot_timeout.elapsed().as_secs() > 180 { - warn!("Giving up, did not get newer snapshots from the cluster."); - return None; - } - } - } - retry_reason = Some("No snapshots available".to_owned()); + if bootstrap_config.no_snapshot_fetch { + if rpc_peers.is_empty() { + retry_reason = Some("No RPC peers available.".to_owned()); continue; } else { - let rpc_peers = peer_snapshot_hashes - .iter() - .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.id) - .collect::>(); - let PeerSnapshotHash { - rpc_contact_info: final_rpc_contact_info, - snapshot_hash: final_snapshot_hash, - } = get_final_peer_snapshot_hash(&peer_snapshot_hashes); - info!( - "Highest available snapshot slot is {}, available from {} node{}: {:?}", - final_snapshot_hash - .incr - .map(|(slot, _hash)| slot) - .unwrap_or(final_snapshot_hash.full.0), - rpc_peers.len(), - if rpc_peers.len() > 1 { "s" } else { "" }, - rpc_peers, - ); - + let random_peer = &rpc_peers[thread_rng().gen_range(0, rpc_peers.len())]; return Some(GetRpcNodeResult { - rpc_contact_info: final_rpc_contact_info, - snapshot_hash: Some(final_snapshot_hash), + rpc_contact_info: random_peer.clone(), + snapshot_hash: None, }); } } - } - /// Get the Slot and Hash of the local snapshot with the highest slot. Can be either a full - /// snapshot or an incremental snapshot. - fn get_highest_local_snapshot_hash( - full_snapshot_archives_dir: impl AsRef, - incremental_snapshot_archives_dir: impl AsRef, - ) -> Option<(Slot, Hash)> { - if let Some(full_snapshot_info) = - snapshot_utils::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir) - { - if let Some(incremental_snapshot_info) = + let peer_snapshot_hashes = get_peer_snapshot_hashes( + cluster_info, + &rpc_peers, + validator_config.known_validators.as_ref(), + bootstrap_config.incremental_snapshot_fetch, + ); + + if peer_snapshot_hashes.is_empty() { + match newer_cluster_snapshot_timeout { + None => newer_cluster_snapshot_timeout = Some(Instant::now()), + Some(newer_cluster_snapshot_timeout) => { + if newer_cluster_snapshot_timeout.elapsed().as_secs() > 180 { + warn!("Giving up, did not get newer snapshots from the cluster."); + return None; + } + } + } + retry_reason = Some("No snapshots available".to_owned()); + continue; + } else { + let rpc_peers = peer_snapshot_hashes + .iter() + .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.id) + .collect::>(); + let PeerSnapshotHash { + rpc_contact_info: final_rpc_contact_info, + snapshot_hash: final_snapshot_hash, + } = get_final_peer_snapshot_hash(&peer_snapshot_hashes); + info!( + "Highest available snapshot slot is {}, available from {} node{}: {:?}", + final_snapshot_hash + .incr + .map(|(slot, _hash)| slot) + .unwrap_or(final_snapshot_hash.full.0), + rpc_peers.len(), + if rpc_peers.len() > 1 { "s" } else { "" }, + rpc_peers, + ); + + return Some(GetRpcNodeResult { + rpc_contact_info: final_rpc_contact_info, + snapshot_hash: Some(final_snapshot_hash), + }); + } + } +} + +/// Get the Slot and Hash of the local snapshot with the highest slot. Can be either a full +/// snapshot or an incremental snapshot. +fn get_highest_local_snapshot_hash( + full_snapshot_archives_dir: impl AsRef, + incremental_snapshot_archives_dir: impl AsRef, + incremental_snapshot_fetch: bool, +) -> Option<(Slot, Hash)> { + snapshot_utils::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir).and_then( + |full_snapshot_info| { + if incremental_snapshot_fetch { snapshot_utils::get_highest_incremental_snapshot_archive_info( incremental_snapshot_archives_dir, full_snapshot_info.slot(), ) - { - Some(( - incremental_snapshot_info.slot(), - *incremental_snapshot_info.hash(), - )) + .map(|incremental_snapshot_info| { + ( + incremental_snapshot_info.slot(), + *incremental_snapshot_info.hash(), + ) + }) } else { - Some((full_snapshot_info.slot(), *full_snapshot_info.hash())) + None } - } else { - None - } - } + .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash()))) + }, + ) +} - /// Get peer snapshot hashes - /// - /// The result is a vector of peers with snapshot hashes that: - /// 1. match a snapshot hash from the known validators - /// 2. have the highest incremental snapshot slot - /// 3. have the highest full snapshot slot of (2) - fn get_peer_snapshot_hashes( - cluster_info: &ClusterInfo, - validator_config: &ValidatorConfig, - rpc_peers: &[ContactInfo], - ) -> Vec { - let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers); - if validator_config.known_validators.is_some() { - let known_snapshot_hashes = - get_snapshot_hashes_from_known_validators(cluster_info, validator_config); - retain_peer_snapshot_hashes_that_match_known_snapshot_hashes( - &known_snapshot_hashes, - &mut peer_snapshot_hashes, - ); - } - retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot( +/// Get peer snapshot hashes +/// +/// The result is a vector of peers with snapshot hashes that: +/// 1. match a snapshot hash from the known validators +/// 2. have the highest incremental snapshot slot +/// 3. have the highest full snapshot slot of (2) +fn get_peer_snapshot_hashes( + cluster_info: &ClusterInfo, + rpc_peers: &[ContactInfo], + known_validators: Option<&HashSet>, + incremental_snapshot_fetch: bool, +) -> Vec { + let mut peer_snapshot_hashes = + get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers, incremental_snapshot_fetch); + if known_validators.is_some() { + let known_snapshot_hashes = get_snapshot_hashes_from_known_validators( + cluster_info, + known_validators, + incremental_snapshot_fetch, + ); + retain_peer_snapshot_hashes_that_match_known_snapshot_hashes( + &known_snapshot_hashes, &mut peer_snapshot_hashes, ); - retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes); + } + retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut peer_snapshot_hashes); + retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes); - peer_snapshot_hashes + peer_snapshot_hashes +} + +/// Map full snapshot hashes to a set of incremental snapshot hashes. Each full snapshot hash +/// is treated as the base for its set of incremental snapshot hashes. +type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>; + +/// Get the snapshot hashes from known validators. +/// +/// The snapshot hashes are put into a map from full snapshot hash to a set of incremental +/// snapshot hashes. This map will be used as the "known snapshot hashes"; when peers are +/// queried for their individual snapshot hashes, their results will be checked against this +/// map to verify correctness. +/// +/// NOTE: Only a single snashot hash is allowed per slot. If somehow two known validators have +/// a snapshot hash with the same slot and _different_ hashes, the second will be skipped. +/// This applies to both full and incremental snapshot hashes. +fn get_snapshot_hashes_from_known_validators( + cluster_info: &ClusterInfo, + known_validators: Option<&HashSet>, + incremental_snapshot_fetch: bool, +) -> KnownSnapshotHashes { + // Get the full snapshot hashes for a node from CRDS + let get_full_snapshot_hashes_for_node = |node| { + let mut full_snapshot_hashes = Vec::new(); + cluster_info.get_snapshot_hash_for_node(node, |snapshot_hashes| { + full_snapshot_hashes = snapshot_hashes.clone(); + }); + full_snapshot_hashes + }; + + // Get the incremental snapshot hashes for a node from CRDS + let get_incremental_snapshot_hashes_for_node = |node| { + cluster_info + .get_incremental_snapshot_hashes_for_node(node) + .map(|hashes| (hashes.base, hashes.hashes)) + }; + + known_validators + .map(|known_validators| { + build_known_snapshot_hashes( + known_validators, + get_full_snapshot_hashes_for_node, + get_incremental_snapshot_hashes_for_node, + incremental_snapshot_fetch, + ) + }) + .unwrap_or_else(|| { + trace!("No known validators, so no known snapshot hashes"); + KnownSnapshotHashes::new() + }) +} + +/// Build the known snapshot hashes from a set of nodes. +/// +/// The `get_full_snapshot_hashes_for_node` and `get_incremental_snapshot_hashes_for_node` +/// parameters are Fns that map a pubkey to its respective full and incremental snapshot +/// hashes. These parameters exist to provide a way to test the inner algorithm without +/// needing runtime information such as the ClusterInfo or ValidatorConfig. +fn build_known_snapshot_hashes<'a, F1, F2>( + nodes: impl IntoIterator, + get_full_snapshot_hashes_for_node: F1, + get_incremental_snapshot_hashes_for_node: F2, + incremental_snapshot_fetch: bool, +) -> KnownSnapshotHashes +where + F1: Fn(&'a Pubkey) -> Vec<(Slot, Hash)>, + F2: Fn(&'a Pubkey) -> Option<((Slot, Hash), Vec<(Slot, Hash)>)>, +{ + let mut known_snapshot_hashes = KnownSnapshotHashes::new(); + + /// Check to see if there exists another snapshot hash in the haystack with the *same* slot + /// but *different* hash as the needle. + fn is_any_same_slot_and_different_hash<'a>( + needle: &(Slot, Hash), + haystack: impl IntoIterator, + ) -> bool { + haystack + .into_iter() + .any(|hay| needle.0 == hay.0 && needle.1 != hay.1) } - /// Map full snapshot hashes to a set of incremental snapshot hashes. Each full snapshot hash - /// is treated as the base for its set of incremental snapshot hashes. - type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>; - - /// Get the snapshot hashes from known validators. - /// - /// The snapshot hashes are put into a map from full snapshot hash to a set of incremental - /// snapshot hashes. This map will be used as the "known snapshot hashes"; when peers are - /// queried for their individual snapshot hashes, their results will be checked against this - /// map to verify correctness. - /// - /// NOTE: Only a single snashot hash is allowed per slot. If somehow two known validators have - /// a snapshot hash with the same slot and _different_ hashes, the second will be skipped. - /// This applies to both full and incremental snapshot hashes. - fn get_snapshot_hashes_from_known_validators( - cluster_info: &ClusterInfo, - validator_config: &ValidatorConfig, - ) -> KnownSnapshotHashes { - // Get the full snapshot hashes for a node from CRDS - let get_full_snapshot_hashes_for_node = |node| { - let mut full_snapshot_hashes = Vec::new(); - cluster_info.get_snapshot_hash_for_node(node, |snapshot_hashes| { - full_snapshot_hashes = snapshot_hashes.clone(); - }); - full_snapshot_hashes - }; - - // Get the incremental snapshot hashes for a node from CRDS - let get_incremental_snapshot_hashes_for_node = |node| { - cluster_info - .get_incremental_snapshot_hashes_for_node(node) - .map(|hashes| (hashes.base, hashes.hashes)) - }; - - validator_config - .known_validators - .as_ref() - .map(|known_validators| { - build_known_snapshot_hashes( - known_validators, - get_full_snapshot_hashes_for_node, - get_incremental_snapshot_hashes_for_node, - ) - }) - .unwrap_or_else(|| { - trace!("No known validators, so no known snapshot hashes"); - KnownSnapshotHashes::new() - }) - } - - /// Build the known snapshot hashes from a set of nodes. - /// - /// The `get_full_snapshot_hashes_for_node` and `get_incremental_snapshot_hashes_for_node` - /// parameters are Fns that map a pubkey to its respective full and incremental snapshot - /// hashes. These parameters exist to provide a way to test the inner algorithm without - /// needing runtime information such as the ClusterInfo or ValidatorConfig. - fn build_known_snapshot_hashes<'a, F1, F2>( - nodes: impl IntoIterator, - get_full_snapshot_hashes_for_node: F1, - get_incremental_snapshot_hashes_for_node: F2, - ) -> KnownSnapshotHashes - where - F1: Fn(&'a Pubkey) -> Vec<(Slot, Hash)>, - F2: Fn(&'a Pubkey) -> Option<((Slot, Hash), Vec<(Slot, Hash)>)>, - { - let mut known_snapshot_hashes = KnownSnapshotHashes::new(); - - /// Check to see if there exists another snapshot hash in the haystack with the *same* slot - /// but *different* hash as the needle. - fn is_any_same_slot_and_different_hash<'a>( - needle: &(Slot, Hash), - haystack: impl IntoIterator, - ) -> bool { - haystack - .into_iter() - .any(|hay| needle.0 == hay.0 && needle.1 != hay.1) + 'outer: for node in nodes { + // First get the full snapshot hashes for each node and add them as the keys in the + // known snapshot hashes map. + let full_snapshot_hashes = get_full_snapshot_hashes_for_node(node); + for full_snapshot_hash in &full_snapshot_hashes { + // Do not add this snapshot hash if there's already a full snapshot hash with the + // same slot but with a _different_ hash. + // NOTE: Nodes should not produce snapshots at the same slot with _different_ + // hashes. So if it happens, keep the first and ignore the rest. + if !is_any_same_slot_and_different_hash( + full_snapshot_hash, + known_snapshot_hashes.keys(), + ) { + // Insert a new full snapshot hash into the known snapshot hashes IFF an entry + // doesn't already exist. This is to ensure we don't overwrite existing + // incremental snapshot hashes that may be present for this full snapshot hash. + known_snapshot_hashes + .entry(*full_snapshot_hash) + .or_default(); + } else { + warn!( + "Ignoring all snapshot hashes from node {} since we've seen a different full snapshot hash with this slot. full snapshot hash: {:?}", + node, + full_snapshot_hash, + ); + continue 'outer; + } } - 'outer: for node in nodes { - // First get the full snapshot hashes for each node and add them as the keys in the - // known snapshot hashes map. - let full_snapshot_hashes = get_full_snapshot_hashes_for_node(node); - for full_snapshot_hash in &full_snapshot_hashes { - // Do not add this snapshot hash if there's already a full snapshot hash with the - // same slot but with a _different_ hash. - // NOTE: Nodes should not produce snapshots at the same slot with _different_ - // hashes. So if it happens, keep the first and ignore the rest. - if !is_any_same_slot_and_different_hash( - full_snapshot_hash, - known_snapshot_hashes.keys(), - ) { - // Insert a new full snapshot hash into the known snapshot hashes IFF an entry - // doesn't already exist. This is to ensure we don't overwrite existing - // incremental snapshot hashes that may be present for this full snapshot hash. - known_snapshot_hashes - .entry(*full_snapshot_hash) - .or_default(); - } else { - warn!( - "Ignoring all snapshot hashes from node {} since we've seen a different full snapshot hash with this slot. full snapshot hash: {:?}", - node, - full_snapshot_hash, - ); - continue 'outer; - } - } - + if incremental_snapshot_fetch { // Then get the incremental snapshot hashes for each node and add them as the values in the // known snapshot hashes map. if let Some((base_snapshot_hash, incremental_snapshot_hashes)) = @@ -1252,7 +798,7 @@ mod with_incremental_snapshots { node, base_snapshot_hash, full_snapshot_hashes - ); + ); continue 'outer; } @@ -1295,192 +841,226 @@ mod with_incremental_snapshots { } } } + } - trace!("known snapshot hashes: {:?}", &known_snapshot_hashes); + trace!("known snapshot hashes: {:?}", &known_snapshot_hashes); + known_snapshot_hashes +} + +/// Get snapshot hashes from all the eligible peers. This fn will get only one +/// snapshot hash per peer (the one with the highest slot). This may be just a full snapshot +/// hash, or a combo full (i.e. base) snapshot hash and incremental snapshot hash. +fn get_eligible_peer_snapshot_hashes( + cluster_info: &ClusterInfo, + rpc_peers: &[ContactInfo], + incremental_snapshot_fetch: bool, +) -> Vec { + let mut peer_snapshot_hashes = Vec::new(); + for rpc_peer in rpc_peers { + // Get this peer's highest (full) snapshot hash. We need to get these snapshot hashes + // (instead of just the IncrementalSnapshotHashes) in case the peer is either (1) not + // taking incremental snapshots, or (2) if the last snapshot taken was a full snapshot, + // which would get pushed to CRDS here (i.e. `crds_value::SnapshotHashes`) first. + let highest_snapshot_hash = + get_highest_full_snapshot_hash_for_peer(cluster_info, &rpc_peer.id).max( + if incremental_snapshot_fetch { + get_highest_incremental_snapshot_hash_for_peer(cluster_info, &rpc_peer.id) + } else { + None + }, + ); + + if let Some(snapshot_hash) = highest_snapshot_hash { + peer_snapshot_hashes.push(PeerSnapshotHash { + rpc_contact_info: rpc_peer.clone(), + snapshot_hash, + }); + }; + } + + trace!("peer snapshot hashes: {:?}", &peer_snapshot_hashes); + peer_snapshot_hashes +} + +/// Retain the peer snapshot hashes that match a snapshot hash from the known snapshot hashes +fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes( + known_snapshot_hashes: &KnownSnapshotHashes, + peer_snapshot_hashes: &mut Vec, +) { + peer_snapshot_hashes.retain(|peer_snapshot_hash| { known_snapshot_hashes - } - - /// Get snapshot hashes from all the eligible peers. This fn will get only one - /// snapshot hash per peer (the one with the highest slot). This may be just a full snapshot - /// hash, or a combo full (i.e. base) snapshot hash and incremental snapshot hash. - fn get_eligible_peer_snapshot_hashes( - cluster_info: &ClusterInfo, - rpc_peers: &[ContactInfo], - ) -> Vec { - let mut peer_snapshot_hashes = Vec::new(); - for rpc_peer in rpc_peers { - let mut highest_snapshot_hash = - get_highest_incremental_snapshot_hash_for_peer(cluster_info, &rpc_peer.id); - - // Get this peer's highest (full) snapshot hash. We need to get these snapshot hashes - // (instead of just the IncrementalSnapshotHashes) in case the peer is either (1) not - // taking incremental snapshots, or (2) if the last snapshot taken was a full snapshot, - // which would get pushed to CRDS here (i.e. `crds_value::SnapshotHashes`) first. - let snapshot_hash = get_highest_full_snapshot_hash_for_peer(cluster_info, &rpc_peer.id); - if snapshot_hash > highest_snapshot_hash { - highest_snapshot_hash = snapshot_hash; - } - - if let Some(snapshot_hash) = highest_snapshot_hash { - peer_snapshot_hashes.push(PeerSnapshotHash { - rpc_contact_info: rpc_peer.clone(), - snapshot_hash, - }); - }; - } - - trace!("peer snapshot hashes: {:?}", &peer_snapshot_hashes); - peer_snapshot_hashes - } - - /// Retain the peer snapshot hashes that match a snapshot hash from the known snapshot hashes - fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes( - known_snapshot_hashes: &KnownSnapshotHashes, - peer_snapshot_hashes: &mut Vec, - ) { - peer_snapshot_hashes.retain(|peer_snapshot_hash| { - known_snapshot_hashes - .get(&peer_snapshot_hash.snapshot_hash.full) - .map(|known_incremental_hashes| { - if peer_snapshot_hash.snapshot_hash.incr.is_none() { - // If the peer's full snapshot hashes match, but doesn't have any - // incremental snapshots, that's fine; keep 'em! - true - } else { - known_incremental_hashes - .contains(peer_snapshot_hash.snapshot_hash.incr.as_ref().unwrap()) - } - }) - .unwrap_or(false) - }); - - trace!( - "retain peer snapshot hashes that match known snapshot hashes: {:?}", - &peer_snapshot_hashes - ); - } - - /// Retain the peer snapshot hashes with the highest full snapshot slot - fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot( - peer_snapshot_hashes: &mut Vec, - ) { - // retain the hashes with the highest full snapshot slot - // do a two-pass algorithm - // 1. find the full snapshot hash with the highest full snapshot slot - // 2. retain elems with that full snapshot hash - let mut highest_full_snapshot_hash = (Slot::MIN, Hash::default()); - peer_snapshot_hashes.iter().for_each(|peer_snapshot_hash| { - if peer_snapshot_hash.snapshot_hash.full.0 > highest_full_snapshot_hash.0 { - highest_full_snapshot_hash = peer_snapshot_hash.snapshot_hash.full; - } - }); - - peer_snapshot_hashes.retain(|peer_snapshot_hash| { - peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash - }); - - trace!( - "retain peer snapshot hashes with highest full snapshot slot: {:?}", - &peer_snapshot_hashes - ); - } - - /// Retain the peer snapshot hashes with the highest incremental snapshot slot - fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot( - peer_snapshot_hashes: &mut Vec, - ) { - let mut highest_incremental_snapshot_hash: Option<(Slot, Hash)> = None; - peer_snapshot_hashes.iter().for_each(|peer_snapshot_hash| { - if let Some(incremental_snapshot_hash) = peer_snapshot_hash.snapshot_hash.incr.as_ref() - { - if highest_incremental_snapshot_hash.is_none() - || incremental_snapshot_hash.0 > highest_incremental_snapshot_hash.unwrap().0 - { - highest_incremental_snapshot_hash = Some(*incremental_snapshot_hash); + .get(&peer_snapshot_hash.snapshot_hash.full) + .map(|known_incremental_hashes| { + if peer_snapshot_hash.snapshot_hash.incr.is_none() { + // If the peer's full snapshot hashes match, but doesn't have any + // incremental snapshots, that's fine; keep 'em! + true + } else { + known_incremental_hashes + .contains(peer_snapshot_hash.snapshot_hash.incr.as_ref().unwrap()) } - }; - }); - - peer_snapshot_hashes.retain(|peer_snapshot_hash| { - peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash - }); - - trace!( - "retain peer snapshot hashes with highest incremental snapshot slot: {:?}", - &peer_snapshot_hashes - ); - } - - /// Get a final peer from the remaining peer snapshot hashes. At this point all the snapshot - /// hashes should (must) be the same, and only the peers are different. Pick an element from - /// the slice at random and return it. - fn get_final_peer_snapshot_hash(peer_snapshot_hashes: &[PeerSnapshotHash]) -> PeerSnapshotHash { - assert!(!peer_snapshot_hashes.is_empty()); - - // pick a final rpc peer at random - let final_peer_snapshot_hash = - &peer_snapshot_hashes[thread_rng().gen_range(0, peer_snapshot_hashes.len())]; - - // It is a programmer bug if the assert fires! By the time this function is called, the - // only remaining `incremental_snapshot_hashes` should all be the same. - assert!( - peer_snapshot_hashes.iter().all(|peer_snapshot_hash| { - peer_snapshot_hash.snapshot_hash == final_peer_snapshot_hash.snapshot_hash - }), - "To safely pick a peer at random, all the snapshot hashes must be the same" - ); - - trace!("final peer snapshot hash: {:?}", final_peer_snapshot_hash); - final_peer_snapshot_hash.clone() - } - - /// Check to see if we can use our local snapshots, otherwise download newer ones. - #[allow(clippy::too_many_arguments)] - fn download_snapshots( - full_snapshot_archives_dir: &Path, - incremental_snapshot_archives_dir: &Path, - validator_config: &ValidatorConfig, - bootstrap_config: &RpcBootstrapConfig, - use_progress_bar: bool, - maximum_local_snapshot_age: Slot, - start_progress: &Arc>, - minimal_snapshot_download_speed: f32, - maximum_snapshot_download_abort: u64, - download_abort_count: &mut u64, - snapshot_hash: Option, - rpc_contact_info: &ContactInfo, - ) -> Result<(), String> { - if snapshot_hash.is_none() { - return Ok(()); - } - let SnapshotHash { - full: full_snapshot_hash, - incr: incremental_snapshot_hash, - } = snapshot_hash.unwrap(); - - // If the local snapshots are new enough, then use 'em; no need to download new snapshots - if should_use_local_snapshot( - full_snapshot_archives_dir, - incremental_snapshot_archives_dir, - maximum_local_snapshot_age, - full_snapshot_hash, - incremental_snapshot_hash, - ) { - return Ok(()); - } - - // Check and see if we've already got the full snapshot; if not, download it - if snapshot_utils::get_full_snapshot_archives(full_snapshot_archives_dir) - .into_iter() - .any(|snapshot_archive| { - snapshot_archive.slot() == full_snapshot_hash.0 - && snapshot_archive.hash() == &full_snapshot_hash.1 }) - { - info!( + .unwrap_or(false) + }); + + trace!( + "retain peer snapshot hashes that match known snapshot hashes: {:?}", + &peer_snapshot_hashes + ); +} + +/// Retain the peer snapshot hashes with the highest full snapshot slot +fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot( + peer_snapshot_hashes: &mut Vec, +) { + // retain the hashes with the highest full snapshot slot + // do a two-pass algorithm + // 1. find the full snapshot hash with the highest full snapshot slot + // 2. retain elems with that full snapshot hash + let mut highest_full_snapshot_hash = (Slot::MIN, Hash::default()); + peer_snapshot_hashes.iter().for_each(|peer_snapshot_hash| { + if peer_snapshot_hash.snapshot_hash.full.0 > highest_full_snapshot_hash.0 { + highest_full_snapshot_hash = peer_snapshot_hash.snapshot_hash.full; + } + }); + + peer_snapshot_hashes.retain(|peer_snapshot_hash| { + peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash + }); + + trace!( + "retain peer snapshot hashes with highest full snapshot slot: {:?}", + &peer_snapshot_hashes + ); +} + +/// Retain the peer snapshot hashes with the highest incremental snapshot slot +fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot( + peer_snapshot_hashes: &mut Vec, +) { + let mut highest_incremental_snapshot_hash: Option<(Slot, Hash)> = None; + peer_snapshot_hashes.iter().for_each(|peer_snapshot_hash| { + if let Some(incremental_snapshot_hash) = peer_snapshot_hash.snapshot_hash.incr.as_ref() { + if highest_incremental_snapshot_hash.is_none() + || incremental_snapshot_hash.0 > highest_incremental_snapshot_hash.unwrap().0 + { + highest_incremental_snapshot_hash = Some(*incremental_snapshot_hash); + } + }; + }); + + peer_snapshot_hashes.retain(|peer_snapshot_hash| { + peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash + }); + + trace!( + "retain peer snapshot hashes with highest incremental snapshot slot: {:?}", + &peer_snapshot_hashes + ); +} + +/// Get a final peer from the remaining peer snapshot hashes. At this point all the snapshot +/// hashes should (must) be the same, and only the peers are different. Pick an element from +/// the slice at random and return it. +fn get_final_peer_snapshot_hash(peer_snapshot_hashes: &[PeerSnapshotHash]) -> PeerSnapshotHash { + assert!(!peer_snapshot_hashes.is_empty()); + + // pick a final rpc peer at random + let final_peer_snapshot_hash = + &peer_snapshot_hashes[thread_rng().gen_range(0, peer_snapshot_hashes.len())]; + + // It is a programmer bug if the assert fires! By the time this function is called, the + // only remaining `incremental_snapshot_hashes` should all be the same. + assert!( + peer_snapshot_hashes.iter().all(|peer_snapshot_hash| { + peer_snapshot_hash.snapshot_hash == final_peer_snapshot_hash.snapshot_hash + }), + "To safely pick a peer at random, all the snapshot hashes must be the same" + ); + + trace!("final peer snapshot hash: {:?}", final_peer_snapshot_hash); + final_peer_snapshot_hash.clone() +} + +/// Check to see if we can use our local snapshots, otherwise download newer ones. +#[allow(clippy::too_many_arguments)] +fn download_snapshots( + full_snapshot_archives_dir: &Path, + incremental_snapshot_archives_dir: &Path, + validator_config: &ValidatorConfig, + bootstrap_config: &RpcBootstrapConfig, + use_progress_bar: bool, + maximum_local_snapshot_age: Slot, + start_progress: &Arc>, + minimal_snapshot_download_speed: f32, + maximum_snapshot_download_abort: u64, + download_abort_count: &mut u64, + snapshot_hash: Option, + rpc_contact_info: &ContactInfo, +) -> Result<(), String> { + if snapshot_hash.is_none() { + return Ok(()); + } + let SnapshotHash { + full: full_snapshot_hash, + incr: incremental_snapshot_hash, + } = snapshot_hash.unwrap(); + + // If the local snapshots are new enough, then use 'em; no need to download new snapshots + if should_use_local_snapshot( + full_snapshot_archives_dir, + incremental_snapshot_archives_dir, + maximum_local_snapshot_age, + full_snapshot_hash, + incremental_snapshot_hash, + bootstrap_config.incremental_snapshot_fetch, + ) { + return Ok(()); + } + + // Check and see if we've already got the full snapshot; if not, download it + if snapshot_utils::get_full_snapshot_archives(full_snapshot_archives_dir) + .into_iter() + .any(|snapshot_archive| { + snapshot_archive.slot() == full_snapshot_hash.0 + && snapshot_archive.hash() == &full_snapshot_hash.1 + }) + { + info!( "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}", full_snapshot_hash.0, full_snapshot_hash.1 ); + } else { + download_snapshot( + full_snapshot_archives_dir, + incremental_snapshot_archives_dir, + validator_config, + bootstrap_config, + use_progress_bar, + start_progress, + minimal_snapshot_download_speed, + maximum_snapshot_download_abort, + download_abort_count, + rpc_contact_info, + full_snapshot_hash, + SnapshotType::FullSnapshot, + )?; + } + + // Check and see if we've already got the incremental snapshot; if not, download it + if let Some(incremental_snapshot_hash) = incremental_snapshot_hash { + if snapshot_utils::get_incremental_snapshot_archives(incremental_snapshot_archives_dir) + .into_iter() + .any(|snapshot_archive| { + snapshot_archive.slot() == incremental_snapshot_hash.0 + && snapshot_archive.hash() == &incremental_snapshot_hash.1 + && snapshot_archive.base_slot() == full_snapshot_hash.0 + }) + { + info!( + "Incremental snapshot archive already exists locally. Skipping download. slot: {}, hash: {}", + incremental_snapshot_hash.0, incremental_snapshot_hash.1 + ); } else { download_snapshot( full_snapshot_archives_dir, @@ -1493,66 +1073,33 @@ mod with_incremental_snapshots { maximum_snapshot_download_abort, download_abort_count, rpc_contact_info, - full_snapshot_hash, - SnapshotType::FullSnapshot, + incremental_snapshot_hash, + SnapshotType::IncrementalSnapshot(full_snapshot_hash.0), )?; } - - // Check and see if we've already got the incremental snapshot; if not, download it - if let Some(incremental_snapshot_hash) = incremental_snapshot_hash { - if snapshot_utils::get_incremental_snapshot_archives(incremental_snapshot_archives_dir) - .into_iter() - .any(|snapshot_archive| { - snapshot_archive.slot() == incremental_snapshot_hash.0 - && snapshot_archive.hash() == &incremental_snapshot_hash.1 - && snapshot_archive.base_slot() == full_snapshot_hash.0 - }) - { - info!( - "Incremental snapshot archive already exists locally. Skipping download. slot: {}, hash: {}", - incremental_snapshot_hash.0, incremental_snapshot_hash.1 - ); - } else { - download_snapshot( - full_snapshot_archives_dir, - incremental_snapshot_archives_dir, - validator_config, - bootstrap_config, - use_progress_bar, - start_progress, - minimal_snapshot_download_speed, - maximum_snapshot_download_abort, - download_abort_count, - rpc_contact_info, - incremental_snapshot_hash, - SnapshotType::IncrementalSnapshot(full_snapshot_hash.0), - )?; - } - } - - Ok(()) } - /// Download a snapshot - #[allow(clippy::too_many_arguments)] - fn download_snapshot( - full_snapshot_archives_dir: &Path, - incremental_snapshot_archives_dir: &Path, - validator_config: &ValidatorConfig, - bootstrap_config: &RpcBootstrapConfig, - use_progress_bar: bool, - start_progress: &Arc>, - minimal_snapshot_download_speed: f32, - maximum_snapshot_download_abort: u64, - download_abort_count: &mut u64, - rpc_contact_info: &ContactInfo, - desired_snapshot_hash: (Slot, Hash), - snapshot_type: SnapshotType, - ) -> Result<(), String> { - let ( - maximum_full_snapshot_archives_to_retain, - maximum_incremental_snapshot_archives_to_retain, - ) = if let Some(snapshot_config) = validator_config.snapshot_config.as_ref() { + Ok(()) +} + +/// Download a snapshot +#[allow(clippy::too_many_arguments)] +fn download_snapshot( + full_snapshot_archives_dir: &Path, + incremental_snapshot_archives_dir: &Path, + validator_config: &ValidatorConfig, + bootstrap_config: &RpcBootstrapConfig, + use_progress_bar: bool, + start_progress: &Arc>, + minimal_snapshot_download_speed: f32, + maximum_snapshot_download_abort: u64, + download_abort_count: &mut u64, + rpc_contact_info: &ContactInfo, + desired_snapshot_hash: (Slot, Hash), + snapshot_type: SnapshotType, +) -> Result<(), String> { + let (maximum_full_snapshot_archives_to_retain, maximum_incremental_snapshot_archives_to_retain) = + if let Some(snapshot_config) = validator_config.snapshot_config.as_ref() { ( snapshot_config.maximum_full_snapshot_archives_to_retain, snapshot_config.maximum_incremental_snapshot_archives_to_retain, @@ -1563,506 +1110,542 @@ mod with_incremental_snapshots { DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN, ) }; - *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot { - slot: desired_snapshot_hash.0, - rpc_addr: rpc_contact_info.rpc, - }; - download_snapshot_archive( - &rpc_contact_info.rpc, - full_snapshot_archives_dir, - incremental_snapshot_archives_dir, - desired_snapshot_hash, - snapshot_type, - maximum_full_snapshot_archives_to_retain, - maximum_incremental_snapshot_archives_to_retain, - use_progress_bar, - &mut Some(Box::new(|download_progress: &DownloadProgressRecord| { - debug!("Download progress: {:?}", download_progress); - if download_progress.last_throughput < minimal_snapshot_download_speed - && download_progress.notification_count <= 1 - && download_progress.percentage_done <= 2_f32 - && download_progress.estimated_remaining_time > 60_f32 - && *download_abort_count < maximum_snapshot_download_abort - { - if let Some(ref known_validators) = validator_config.known_validators { - if known_validators.contains(&rpc_contact_info.id) - && known_validators.len() == 1 - && bootstrap_config.only_known_rpc - { - warn!("The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, but will NOT abort \ - and try a different node as it is the only known validator and the --only-known-rpc flag \ - is set. \ - Abort count: {}, Progress detail: {:?}", - download_progress.last_throughput, minimal_snapshot_download_speed, - download_abort_count, download_progress); - return true; // Do not abort download from the one-and-only known validator - } + *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot { + slot: desired_snapshot_hash.0, + rpc_addr: rpc_contact_info.rpc, + }; + download_snapshot_archive( + &rpc_contact_info.rpc, + full_snapshot_archives_dir, + incremental_snapshot_archives_dir, + desired_snapshot_hash, + snapshot_type, + maximum_full_snapshot_archives_to_retain, + maximum_incremental_snapshot_archives_to_retain, + use_progress_bar, + &mut Some(Box::new(|download_progress: &DownloadProgressRecord| { + debug!("Download progress: {:?}", download_progress); + if download_progress.last_throughput < minimal_snapshot_download_speed + && download_progress.notification_count <= 1 + && download_progress.percentage_done <= 2_f32 + && download_progress.estimated_remaining_time > 60_f32 + && *download_abort_count < maximum_snapshot_download_abort + { + if let Some(ref known_validators) = validator_config.known_validators { + if known_validators.contains(&rpc_contact_info.id) + && known_validators.len() == 1 + && bootstrap_config.only_known_rpc + { + warn!("The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, but will NOT abort \ + and try a different node as it is the only known validator and the --only-known-rpc flag \ + is set. \ + Abort count: {}, Progress detail: {:?}", + download_progress.last_throughput, minimal_snapshot_download_speed, + download_abort_count, download_progress); + return true; // Do not abort download from the one-and-only known validator } - warn!("The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, will abort \ - and try a different node. Abort count: {}, Progress detail: {:?}", - download_progress.last_throughput, minimal_snapshot_download_speed, - download_abort_count, download_progress); - *download_abort_count += 1; - false - } else { - true } - })), - ) - } + warn!("The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, will abort \ + and try a different node. Abort count: {}, Progress detail: {:?}", + download_progress.last_throughput, minimal_snapshot_download_speed, + download_abort_count, download_progress); + *download_abort_count += 1; + false + } else { + true + } + })), + ) +} - /// Check to see if bootstrap should load from its local snapshots or not. If not, then snapshots - /// will be downloaded. - fn should_use_local_snapshot( - full_snapshot_archives_dir: &Path, - incremental_snapshot_archives_dir: &Path, - maximum_local_snapshot_age: Slot, - full_snapshot_hash: (Slot, Hash), - incremental_snapshot_hash: Option<(Slot, Hash)>, - ) -> bool { - let cluster_snapshot_slot = incremental_snapshot_hash - .map(|(slot, _)| slot) - .unwrap_or(full_snapshot_hash.0); +/// Check to see if bootstrap should load from its local snapshots or not. If not, then snapshots +/// will be downloaded. +fn should_use_local_snapshot( + full_snapshot_archives_dir: &Path, + incremental_snapshot_archives_dir: &Path, + maximum_local_snapshot_age: Slot, + full_snapshot_hash: (Slot, Hash), + incremental_snapshot_hash: Option<(Slot, Hash)>, + incremental_snapshot_fetch: bool, +) -> bool { + let cluster_snapshot_slot = incremental_snapshot_hash + .map(|(slot, _)| slot) + .unwrap_or(full_snapshot_hash.0); - match get_highest_local_snapshot_hash( - full_snapshot_archives_dir, - incremental_snapshot_archives_dir, - ) { - None => { + match get_highest_local_snapshot_hash( + full_snapshot_archives_dir, + incremental_snapshot_archives_dir, + incremental_snapshot_fetch, + ) { + None => { + info!( + "Downloading a snapshot for slot {} since there is not a local snapshot.", + cluster_snapshot_slot, + ); + false + } + Some((local_snapshot_slot, _)) => { + if local_snapshot_slot + >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age) + { info!( - "Downloading a snapshot for slot {} since there is not a local snapshot.", + "Reusing local snapshot at slot {} instead of downloading a snapshot for slot {}.", + local_snapshot_slot, + cluster_snapshot_slot, + ); + true + } else { + info!( + "Local snapshot from slot {} is too old. Downloading a newer snapshot for slot {}.", + local_snapshot_slot, cluster_snapshot_slot, ); false } - Some((local_snapshot_slot, _)) => { - if local_snapshot_slot - >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age) - { - info!( - "Reusing local snapshot at slot {} instead of downloading a snapshot for slot {}.", - local_snapshot_slot, - cluster_snapshot_slot, - ); - true - } else { - info!( - "Local snapshot from slot {} is too old. Downloading a newer snapshot for slot {}.", - local_snapshot_slot, - cluster_snapshot_slot, - ); - false - } - } - } - } - - /// Get the highest full snapshot hash for a peer from CRDS - fn get_highest_full_snapshot_hash_for_peer( - cluster_info: &ClusterInfo, - peer: &Pubkey, - ) -> Option { - let mut full_snapshot_hashes = Vec::new(); - cluster_info.get_snapshot_hash_for_node(peer, |snapshot_hashes| { - full_snapshot_hashes = snapshot_hashes.clone() - }); - full_snapshot_hashes - .into_iter() - .max() - .map(|full_snapshot_hash| SnapshotHash { - full: full_snapshot_hash, - incr: None, - }) - } - - /// Get the highest incremental snapshot hash for a peer from CRDS - fn get_highest_incremental_snapshot_hash_for_peer( - cluster_info: &ClusterInfo, - peer: &Pubkey, - ) -> Option { - cluster_info - .get_incremental_snapshot_hashes_for_node(peer) - .map( - |crds_value::IncrementalSnapshotHashes { base, hashes, .. }| { - let highest_incremental_snapshot_hash = hashes.into_iter().max(); - SnapshotHash { - full: base, - incr: highest_incremental_snapshot_hash, - } - }, - ) - } - - #[cfg(test)] - mod tests { - use super::*; - - impl PeerSnapshotHash { - fn new( - rpc_contact_info: ContactInfo, - full_snapshot_hash: (Slot, Hash), - incremental_snapshot_hash: Option<(Slot, Hash)>, - ) -> Self { - Self { - rpc_contact_info, - snapshot_hash: SnapshotHash { - full: full_snapshot_hash, - incr: incremental_snapshot_hash, - }, - } - } - } - - fn default_contact_info_for_tests() -> ContactInfo { - let sock_addr = SocketAddr::from(([1, 1, 1, 1], 11_111)); - ContactInfo { - id: Pubkey::default(), - gossip: sock_addr, - tvu: sock_addr, - tvu_forwards: sock_addr, - repair: sock_addr, - tpu: sock_addr, - tpu_forwards: sock_addr, - tpu_vote: sock_addr, - rpc: sock_addr, - rpc_pubsub: sock_addr, - serve_repair: sock_addr, - wallclock: 123456789, - shred_version: 1, - } - } - - #[test] - fn test_build_known_snapshot_hashes() { - let full_snapshot_hashes1 = vec![ - (100_000, Hash::new_unique()), - (200_000, Hash::new_unique()), - (300_000, Hash::new_unique()), - (400_000, Hash::new_unique()), - ]; - let full_snapshot_hashes2 = vec![ - (100_000, Hash::new_unique()), - (200_000, Hash::new_unique()), - (300_000, Hash::new_unique()), - (400_000, Hash::new_unique()), - ]; - - let base_snapshot_hash1 = full_snapshot_hashes1.last().unwrap(); - let base_snapshot_hash2 = full_snapshot_hashes2.last().unwrap(); - - let incremental_snapshot_hashes1 = vec![ - (400_500, Hash::new_unique()), - (400_600, Hash::new_unique()), - (400_700, Hash::new_unique()), - (400_800, Hash::new_unique()), - ]; - let incremental_snapshot_hashes2 = vec![ - (400_500, Hash::new_unique()), - (400_600, Hash::new_unique()), - (400_700, Hash::new_unique()), - (400_800, Hash::new_unique()), - ]; - - #[allow(clippy::type_complexity)] - let mut oracle: HashMap< - Pubkey, - (Vec<(Slot, Hash)>, Option<((Slot, Hash), Vec<(Slot, Hash)>)>), - > = HashMap::new(); - - // no snapshots at all - oracle.insert(Pubkey::new_unique(), (vec![], None)); - - // just full snapshots - oracle.insert(Pubkey::new_unique(), (full_snapshot_hashes1.clone(), None)); - - // just full snapshots, with different hashes - oracle.insert(Pubkey::new_unique(), (full_snapshot_hashes2.clone(), None)); - - // full and incremental snapshots - oracle.insert( - Pubkey::new_unique(), - ( - full_snapshot_hashes1.clone(), - Some((*base_snapshot_hash1, incremental_snapshot_hashes1.clone())), - ), - ); - - // full and incremental snapshots, but base hash is wrong - oracle.insert( - Pubkey::new_unique(), - ( - full_snapshot_hashes1.clone(), - Some((*base_snapshot_hash2, incremental_snapshot_hashes1.clone())), - ), - ); - - // full and incremental snapshots, with different hashes - oracle.insert( - Pubkey::new_unique(), - ( - full_snapshot_hashes2.clone(), - Some((*base_snapshot_hash2, incremental_snapshot_hashes2.clone())), - ), - ); - - // full and incremental snapshots, but base hash is wrong - oracle.insert( - Pubkey::new_unique(), - ( - full_snapshot_hashes2.clone(), - Some((*base_snapshot_hash1, incremental_snapshot_hashes2.clone())), - ), - ); - - // handle duplicates as well - oracle.insert( - Pubkey::new_unique(), - ( - full_snapshot_hashes1.clone(), - Some((*base_snapshot_hash1, incremental_snapshot_hashes1.clone())), - ), - ); - - // handle duplicates, with different hashes - oracle.insert( - Pubkey::new_unique(), - ( - full_snapshot_hashes2.clone(), - Some((*base_snapshot_hash2, incremental_snapshot_hashes2.clone())), - ), - ); - - let node_to_full_snapshot_hashes = |node| oracle.get(node).unwrap().clone().0; - let node_to_incremental_snapshot_hashes = |node| oracle.get(node).unwrap().clone().1; - - let known_snapshot_hashes = build_known_snapshot_hashes( - oracle.keys(), - node_to_full_snapshot_hashes, - node_to_incremental_snapshot_hashes, - ); - - let mut known_full_snapshot_hashes: Vec<_> = - known_snapshot_hashes.keys().copied().collect(); - known_full_snapshot_hashes.sort_unstable(); - - let known_base_snapshot_hash = known_full_snapshot_hashes.last().unwrap(); - - let mut known_incremental_snapshot_hashes: Vec<_> = known_snapshot_hashes - .get(known_base_snapshot_hash) - .unwrap() - .iter() - .copied() - .collect(); - known_incremental_snapshot_hashes.sort_unstable(); - - assert!( - known_full_snapshot_hashes == full_snapshot_hashes1 - || known_full_snapshot_hashes == full_snapshot_hashes2 - ); - - if known_full_snapshot_hashes == full_snapshot_hashes1 { - assert_eq!(known_base_snapshot_hash, base_snapshot_hash1); - assert_eq!( - known_incremental_snapshot_hashes, - incremental_snapshot_hashes1 - ); - } else { - assert_eq!(known_base_snapshot_hash, base_snapshot_hash2); - assert_eq!( - known_incremental_snapshot_hashes, - incremental_snapshot_hashes2 - ); - } - } - - #[test] - fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() { - let known_snapshot_hashes: KnownSnapshotHashes = [ - ( - (200_000, Hash::new_unique()), - [ - (200_200, Hash::new_unique()), - (200_400, Hash::new_unique()), - (200_600, Hash::new_unique()), - (200_800, Hash::new_unique()), - ] - .iter() - .cloned() - .collect(), - ), - ( - (300_000, Hash::new_unique()), - [ - (300_200, Hash::new_unique()), - (300_400, Hash::new_unique()), - (300_600, Hash::new_unique()), - ] - .iter() - .cloned() - .collect(), - ), - ] - .iter() - .cloned() - .collect(); - - let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap(); - let known_full_snapshot_hash = known_snapshot_hash.0; - let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap(); - - let contact_info = default_contact_info_for_tests(); - let peer_snapshot_hashes = vec![ - // bad full snapshot hash, no incremental snapshot hash - PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None), - // bad everything - PeerSnapshotHash::new( - contact_info.clone(), - (111_000, Hash::default()), - Some((111_111, Hash::default())), - ), - // good full snapshot hash, no incremental snapshot hash - PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None), - // bad full snapshot hash, good (not possible) incremental snapshot hash - PeerSnapshotHash::new( - contact_info.clone(), - (111_000, Hash::default()), - Some(*known_incremental_snapshot_hash), - ), - // good full snapshot hash, bad incremental snapshot hash - PeerSnapshotHash::new( - contact_info.clone(), - *known_full_snapshot_hash, - Some((111_111, Hash::default())), - ), - // good everything - PeerSnapshotHash::new( - contact_info.clone(), - *known_full_snapshot_hash, - Some(*known_incremental_snapshot_hash), - ), - ]; - - let expected = vec![ - PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None), - PeerSnapshotHash::new( - contact_info, - *known_full_snapshot_hash, - Some(*known_incremental_snapshot_hash), - ), - ]; - let mut actual = peer_snapshot_hashes; - retain_peer_snapshot_hashes_that_match_known_snapshot_hashes( - &known_snapshot_hashes, - &mut actual, - ); - assert_eq!(expected, actual); - } - - #[test] - fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() { - let contact_info = default_contact_info_for_tests(); - let peer_snapshot_hashes = vec![ - // old - PeerSnapshotHash::new( - contact_info.clone(), - (100_000, Hash::default()), - Some((100_100, Hash::default())), - ), - PeerSnapshotHash::new( - contact_info.clone(), - (100_000, Hash::default()), - Some((100_200, Hash::default())), - ), - PeerSnapshotHash::new( - contact_info.clone(), - (100_000, Hash::default()), - Some((100_300, Hash::default())), - ), - // new - PeerSnapshotHash::new( - contact_info.clone(), - (200_000, Hash::default()), - Some((200_100, Hash::default())), - ), - PeerSnapshotHash::new( - contact_info.clone(), - (200_000, Hash::default()), - Some((200_200, Hash::default())), - ), - PeerSnapshotHash::new( - contact_info.clone(), - (200_000, Hash::default()), - Some((200_300, Hash::default())), - ), - ]; - - let expected = vec![ - PeerSnapshotHash::new( - contact_info.clone(), - (200_000, Hash::default()), - Some((200_100, Hash::default())), - ), - PeerSnapshotHash::new( - contact_info.clone(), - (200_000, Hash::default()), - Some((200_200, Hash::default())), - ), - PeerSnapshotHash::new( - contact_info, - (200_000, Hash::default()), - Some((200_300, Hash::default())), - ), - ]; - let mut actual = peer_snapshot_hashes; - retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual); - assert_eq!(expected, actual); - } - - #[test] - fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot() { - let contact_info = default_contact_info_for_tests(); - let peer_snapshot_hashes = vec![ - PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None), - PeerSnapshotHash::new( - contact_info.clone(), - (200_000, Hash::default()), - Some((200_100, Hash::default())), - ), - PeerSnapshotHash::new( - contact_info.clone(), - (200_000, Hash::default()), - Some((200_200, Hash::default())), - ), - PeerSnapshotHash::new( - contact_info.clone(), - (200_000, Hash::default()), - Some((200_300, Hash::default())), - ), - PeerSnapshotHash::new( - contact_info.clone(), - (200_000, Hash::default()), - Some((200_010, Hash::default())), - ), - PeerSnapshotHash::new( - contact_info.clone(), - (200_000, Hash::default()), - Some((200_020, Hash::default())), - ), - PeerSnapshotHash::new( - contact_info.clone(), - (200_000, Hash::default()), - Some((200_030, Hash::default())), - ), - ]; - - let expected = vec![PeerSnapshotHash::new( - contact_info, - (200_000, Hash::default()), - Some((200_300, Hash::default())), - )]; - let mut actual = peer_snapshot_hashes; - retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual); - assert_eq!(expected, actual); } } } + +/// Get the highest full snapshot hash for a peer from CRDS +fn get_highest_full_snapshot_hash_for_peer( + cluster_info: &ClusterInfo, + peer: &Pubkey, +) -> Option { + let mut full_snapshot_hashes = Vec::new(); + cluster_info.get_snapshot_hash_for_node(peer, |snapshot_hashes| { + full_snapshot_hashes = snapshot_hashes.clone() + }); + full_snapshot_hashes + .into_iter() + .max() + .map(|full_snapshot_hash| SnapshotHash { + full: full_snapshot_hash, + incr: None, + }) +} + +/// Get the highest incremental snapshot hash for a peer from CRDS +fn get_highest_incremental_snapshot_hash_for_peer( + cluster_info: &ClusterInfo, + peer: &Pubkey, +) -> Option { + cluster_info + .get_incremental_snapshot_hashes_for_node(peer) + .map( + |crds_value::IncrementalSnapshotHashes { base, hashes, .. }| { + let highest_incremental_snapshot_hash = hashes.into_iter().max(); + SnapshotHash { + full: base, + incr: highest_incremental_snapshot_hash, + } + }, + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + impl PeerSnapshotHash { + fn new( + rpc_contact_info: ContactInfo, + full_snapshot_hash: (Slot, Hash), + incremental_snapshot_hash: Option<(Slot, Hash)>, + ) -> Self { + Self { + rpc_contact_info, + snapshot_hash: SnapshotHash { + full: full_snapshot_hash, + incr: incremental_snapshot_hash, + }, + } + } + } + + fn default_contact_info_for_tests() -> ContactInfo { + let sock_addr = SocketAddr::from(([1, 1, 1, 1], 11_111)); + ContactInfo { + id: Pubkey::default(), + gossip: sock_addr, + tvu: sock_addr, + tvu_forwards: sock_addr, + repair: sock_addr, + tpu: sock_addr, + tpu_forwards: sock_addr, + tpu_vote: sock_addr, + rpc: sock_addr, + rpc_pubsub: sock_addr, + serve_repair: sock_addr, + wallclock: 123456789, + shred_version: 1, + } + } + + #[test] + fn test_build_known_snapshot_hashes() { + let full_snapshot_hashes1 = vec![ + (100_000, Hash::new_unique()), + (200_000, Hash::new_unique()), + (300_000, Hash::new_unique()), + (400_000, Hash::new_unique()), + ]; + let full_snapshot_hashes2 = vec![ + (100_000, Hash::new_unique()), + (200_000, Hash::new_unique()), + (300_000, Hash::new_unique()), + (400_000, Hash::new_unique()), + ]; + + let base_snapshot_hash1 = full_snapshot_hashes1.last().unwrap(); + let base_snapshot_hash2 = full_snapshot_hashes2.last().unwrap(); + + let incremental_snapshot_hashes1 = vec![ + (400_500, Hash::new_unique()), + (400_600, Hash::new_unique()), + (400_700, Hash::new_unique()), + (400_800, Hash::new_unique()), + ]; + let incremental_snapshot_hashes2 = vec![ + (400_500, Hash::new_unique()), + (400_600, Hash::new_unique()), + (400_700, Hash::new_unique()), + (400_800, Hash::new_unique()), + ]; + + #[allow(clippy::type_complexity)] + let mut oracle: HashMap< + Pubkey, + (Vec<(Slot, Hash)>, Option<((Slot, Hash), Vec<(Slot, Hash)>)>), + > = HashMap::new(); + + // no snapshots at all + oracle.insert(Pubkey::new_unique(), (vec![], None)); + + // just full snapshots + oracle.insert(Pubkey::new_unique(), (full_snapshot_hashes1.clone(), None)); + + // just full snapshots, with different hashes + oracle.insert(Pubkey::new_unique(), (full_snapshot_hashes2.clone(), None)); + + // full and incremental snapshots + oracle.insert( + Pubkey::new_unique(), + ( + full_snapshot_hashes1.clone(), + Some((*base_snapshot_hash1, incremental_snapshot_hashes1.clone())), + ), + ); + + // full and incremental snapshots, but base hash is wrong + oracle.insert( + Pubkey::new_unique(), + ( + full_snapshot_hashes1.clone(), + Some((*base_snapshot_hash2, incremental_snapshot_hashes1.clone())), + ), + ); + + // full and incremental snapshots, with different hashes + oracle.insert( + Pubkey::new_unique(), + ( + full_snapshot_hashes2.clone(), + Some((*base_snapshot_hash2, incremental_snapshot_hashes2.clone())), + ), + ); + + // full and incremental snapshots, but base hash is wrong + oracle.insert( + Pubkey::new_unique(), + ( + full_snapshot_hashes2.clone(), + Some((*base_snapshot_hash1, incremental_snapshot_hashes2.clone())), + ), + ); + + // handle duplicates as well + oracle.insert( + Pubkey::new_unique(), + ( + full_snapshot_hashes1.clone(), + Some((*base_snapshot_hash1, incremental_snapshot_hashes1.clone())), + ), + ); + + // handle duplicates, with different hashes + oracle.insert( + Pubkey::new_unique(), + ( + full_snapshot_hashes2.clone(), + Some((*base_snapshot_hash2, incremental_snapshot_hashes2.clone())), + ), + ); + + let node_to_full_snapshot_hashes = |node| oracle.get(node).unwrap().clone().0; + let node_to_incremental_snapshot_hashes = |node| oracle.get(node).unwrap().clone().1; + + let known_snapshot_hashes_with_incremental = build_known_snapshot_hashes( + oracle.keys(), + node_to_full_snapshot_hashes, + node_to_incremental_snapshot_hashes, + true, + ); + + let mut known_full_snapshot_hashes: Vec<_> = known_snapshot_hashes_with_incremental + .keys() + .copied() + .collect(); + known_full_snapshot_hashes.sort_unstable(); + + let known_base_snapshot_hash = known_full_snapshot_hashes.last().unwrap(); + + let mut known_incremental_snapshot_hashes: Vec<_> = known_snapshot_hashes_with_incremental + .get(known_base_snapshot_hash) + .unwrap() + .iter() + .copied() + .collect(); + known_incremental_snapshot_hashes.sort_unstable(); + + assert!( + known_full_snapshot_hashes == full_snapshot_hashes1 + || known_full_snapshot_hashes == full_snapshot_hashes2 + ); + + if known_full_snapshot_hashes == full_snapshot_hashes1 { + assert_eq!(known_base_snapshot_hash, base_snapshot_hash1); + assert_eq!( + known_incremental_snapshot_hashes, + incremental_snapshot_hashes1 + ); + } else { + assert_eq!(known_base_snapshot_hash, base_snapshot_hash2); + assert_eq!( + known_incremental_snapshot_hashes, + incremental_snapshot_hashes2 + ); + } + + let known_snapshot_hashes_without_incremental = build_known_snapshot_hashes( + oracle.keys(), + node_to_full_snapshot_hashes, + node_to_incremental_snapshot_hashes, + false, + ); + + let mut known_full_snapshot_hashes: Vec<_> = known_snapshot_hashes_without_incremental + .keys() + .copied() + .collect(); + known_full_snapshot_hashes.sort_unstable(); + + let known_base_snapshot_hash = known_full_snapshot_hashes.last().unwrap(); + + let known_incremental_snapshot_hashes: Vec<_> = known_snapshot_hashes_without_incremental + .get(known_base_snapshot_hash) + .unwrap() + .iter() + .copied() + .collect(); + + assert!( + known_full_snapshot_hashes == full_snapshot_hashes1 + || known_full_snapshot_hashes == full_snapshot_hashes2 + ); + + assert_eq!( + known_incremental_snapshot_hashes, + Vec::<(Slot, Hash)>::new() + ); + } + + #[test] + fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() { + let known_snapshot_hashes: KnownSnapshotHashes = [ + ( + (200_000, Hash::new_unique()), + [ + (200_200, Hash::new_unique()), + (200_400, Hash::new_unique()), + (200_600, Hash::new_unique()), + (200_800, Hash::new_unique()), + ] + .iter() + .cloned() + .collect(), + ), + ( + (300_000, Hash::new_unique()), + [ + (300_200, Hash::new_unique()), + (300_400, Hash::new_unique()), + (300_600, Hash::new_unique()), + ] + .iter() + .cloned() + .collect(), + ), + ] + .iter() + .cloned() + .collect(); + + let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap(); + let known_full_snapshot_hash = known_snapshot_hash.0; + let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap(); + + let contact_info = default_contact_info_for_tests(); + let peer_snapshot_hashes = vec![ + // bad full snapshot hash, no incremental snapshot hash + PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None), + // bad everything + PeerSnapshotHash::new( + contact_info.clone(), + (111_000, Hash::default()), + Some((111_111, Hash::default())), + ), + // good full snapshot hash, no incremental snapshot hash + PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None), + // bad full snapshot hash, good (not possible) incremental snapshot hash + PeerSnapshotHash::new( + contact_info.clone(), + (111_000, Hash::default()), + Some(*known_incremental_snapshot_hash), + ), + // good full snapshot hash, bad incremental snapshot hash + PeerSnapshotHash::new( + contact_info.clone(), + *known_full_snapshot_hash, + Some((111_111, Hash::default())), + ), + // good everything + PeerSnapshotHash::new( + contact_info.clone(), + *known_full_snapshot_hash, + Some(*known_incremental_snapshot_hash), + ), + ]; + + let expected = vec![ + PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None), + PeerSnapshotHash::new( + contact_info, + *known_full_snapshot_hash, + Some(*known_incremental_snapshot_hash), + ), + ]; + let mut actual = peer_snapshot_hashes; + retain_peer_snapshot_hashes_that_match_known_snapshot_hashes( + &known_snapshot_hashes, + &mut actual, + ); + assert_eq!(expected, actual); + } + + #[test] + fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() { + let contact_info = default_contact_info_for_tests(); + let peer_snapshot_hashes = vec![ + // old + PeerSnapshotHash::new( + contact_info.clone(), + (100_000, Hash::default()), + Some((100_100, Hash::default())), + ), + PeerSnapshotHash::new( + contact_info.clone(), + (100_000, Hash::default()), + Some((100_200, Hash::default())), + ), + PeerSnapshotHash::new( + contact_info.clone(), + (100_000, Hash::default()), + Some((100_300, Hash::default())), + ), + // new + PeerSnapshotHash::new( + contact_info.clone(), + (200_000, Hash::default()), + Some((200_100, Hash::default())), + ), + PeerSnapshotHash::new( + contact_info.clone(), + (200_000, Hash::default()), + Some((200_200, Hash::default())), + ), + PeerSnapshotHash::new( + contact_info.clone(), + (200_000, Hash::default()), + Some((200_300, Hash::default())), + ), + ]; + + let expected = vec![ + PeerSnapshotHash::new( + contact_info.clone(), + (200_000, Hash::default()), + Some((200_100, Hash::default())), + ), + PeerSnapshotHash::new( + contact_info.clone(), + (200_000, Hash::default()), + Some((200_200, Hash::default())), + ), + PeerSnapshotHash::new( + contact_info, + (200_000, Hash::default()), + Some((200_300, Hash::default())), + ), + ]; + let mut actual = peer_snapshot_hashes; + retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual); + assert_eq!(expected, actual); + } + + #[test] + fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot() { + let contact_info = default_contact_info_for_tests(); + let peer_snapshot_hashes = vec![ + PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None), + PeerSnapshotHash::new( + contact_info.clone(), + (200_000, Hash::default()), + Some((200_100, Hash::default())), + ), + PeerSnapshotHash::new( + contact_info.clone(), + (200_000, Hash::default()), + Some((200_200, Hash::default())), + ), + PeerSnapshotHash::new( + contact_info.clone(), + (200_000, Hash::default()), + Some((200_300, Hash::default())), + ), + PeerSnapshotHash::new( + contact_info.clone(), + (200_000, Hash::default()), + Some((200_010, Hash::default())), + ), + PeerSnapshotHash::new( + contact_info.clone(), + (200_000, Hash::default()), + Some((200_020, Hash::default())), + ), + PeerSnapshotHash::new( + contact_info.clone(), + (200_000, Hash::default()), + Some((200_030, Hash::default())), + ), + ]; + + let expected = vec![PeerSnapshotHash::new( + contact_info, + (200_000, Hash::default()), + Some((200_300, Hash::default())), + )]; + let mut actual = peer_snapshot_hashes; + retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual); + assert_eq!(expected, actual); + } +}