diff --git a/validator/src/main.rs b/validator/src/main.rs index e2adac505..fbc61301c 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -197,14 +197,65 @@ fn download_tar_bz2( Ok(()) } -fn get_rpc_addr( +fn get_shred_rpc_peers( + cluster_info: &Arc>, + expected_shred_version: Option, +) -> Vec { + let rpc_peers = cluster_info.read().unwrap().all_rpc_peers(); + match expected_shred_version { + Some(expected_shred_version) => { + // Filter out rpc peers that don't match the expected shred version + rpc_peers + .into_iter() + .filter(|contact_info| contact_info.shred_version == expected_shred_version) + .collect::>() + } + None => { + if !rpc_peers + .iter() + .all(|contact_info| contact_info.shred_version == rpc_peers[0].shred_version) + { + eprintln!( + "Multiple shred versions observed in gossip. Restart with --expected-shred-version" + ); + exit(1); + } + rpc_peers + } + } +} + +fn get_trusted_snapshot_hashes( + cluster_info: &Arc>, + trusted_validators: Option<&HashSet>, +) -> Option> { + if let Some(trusted_validators) = trusted_validators { + let mut trusted_snapshot_hashes = HashSet::new(); + for trusted_validator in trusted_validators { + if let Some(snapshot_hashes) = cluster_info + .read() + .unwrap() + .get_snapshot_hash_for_node(trusted_validator) + { + for snapshot_hash in snapshot_hashes { + trusted_snapshot_hashes.insert(*snapshot_hash); + } + } + } + Some(trusted_snapshot_hashes) + } else { + None + } +} + +fn get_rpc_node( node: &Node, identity_keypair: &Arc, entrypoint_gossip: &SocketAddr, expected_shred_version: Option, trusted_validators: Option<&HashSet>, snapshot_not_required: bool, -) -> (RpcClient, SocketAddr) { +) -> (ContactInfo, RpcClient, Option<(Slot, Hash)>) { let mut cluster_info = ClusterInfo::new( ClusterInfo::spy_contact_info(&identity_keypair.pubkey()), identity_keypair.clone(), @@ -220,140 +271,109 @@ fn get_rpc_addr( &gossip_exit_flag, ); - let (rpc_client, rpc_addr) = loop { + let (rpc_contact_info, rpc_client, selected_snapshot_hash) = loop { info!( - "Searching for an RPC service, shred version={:?}...\n{}", - expected_shred_version, - cluster_info.read().unwrap().contact_info_trace() + "Searching for an RPC service, shred version={:?}...", + expected_shred_version ); + sleep(Duration::from_secs(1)); + info!("\n{}", cluster_info.read().unwrap().contact_info_trace()); - let mut rpc_peers = cluster_info.read().unwrap().all_rpc_peers(); - match expected_shred_version { - Some(expected_shred_version) => { - // Filter out rpc peers that don't match the expected shred version - rpc_peers = rpc_peers - .into_iter() - .filter(|contact_info| contact_info.shred_version == expected_shred_version) - .collect::>(); - } - None => { - if !rpc_peers - .iter() - .all(|contact_info| contact_info.shred_version == rpc_peers[0].shred_version) - { - eprintln!( - "Multiple shred versions observed in gossip. Restart with --expected-shred-version" - ); - exit(1); - } - } + let rpc_peers = get_shred_rpc_peers(&cluster_info, expected_shred_version); + if rpc_peers.is_empty() { + info!("No RPC services found"); + continue; } - let trusted_slots = if let Some(trusted_validators) = trusted_validators { - let mut trusted_slots = HashSet::new(); - for trusted_validator in trusted_validators { + let mut highest_snapshot_hash: Option<(Slot, Hash)> = None; + let eligible_rpc_peers = if snapshot_not_required { + rpc_peers + } else { + let trusted_snapshot_hashes = + get_trusted_snapshot_hashes(&cluster_info, trusted_validators); + + let mut eligible_rpc_peers = vec![]; + + for rpc_peer in rpc_peers.iter() { if let Some(snapshot_hashes) = cluster_info .read() .unwrap() - .get_snapshot_hash_for_node(trusted_validator) + .get_snapshot_hash_for_node(&rpc_peer.id) { for snapshot_hash in snapshot_hashes { - trusted_slots.insert(*snapshot_hash); - } - } - } - Some(trusted_slots) - } else { - None - }; - - if rpc_peers.is_empty() { - info!("No RPC services found "); - } else { - let eligible_rpc_peers = if snapshot_not_required { - rpc_peers - } else { - let mut eligible_rpc_peers = vec![]; - let mut highest_snapshot_slot = 0; - - for rpc_peer in rpc_peers.iter() { - if let Some(snapshot_hash) = cluster_info - .read() - .unwrap() - .get_snapshot_hash_for_node(&rpc_peer.id) - { - let highest_snapshot_slot_for_node = - snapshot_hash.iter().fold(0, |highest_slot, snapshot_hash| { - if let Some(ref trusted_slots) = trusted_slots { - if !trusted_slots.contains(snapshot_hash) { - // Ignore all untrusted slots - return highest_slot; - } - } - highest_slot.max(snapshot_hash.0) - }); - - if highest_snapshot_slot_for_node > 0 { - if highest_snapshot_slot_for_node > highest_snapshot_slot { - // Found a higher snapshot, remove all rpc peers with a lower snapshot - eligible_rpc_peers.clear(); - highest_snapshot_slot = highest_snapshot_slot_for_node; + if let Some(ref trusted_snapshot_hashes) = trusted_snapshot_hashes { + if !trusted_snapshot_hashes.contains(snapshot_hash) { + // Ignore all untrusted snapshot hashes + continue; } + } - if highest_snapshot_slot_for_node == highest_snapshot_slot { - eligible_rpc_peers.push(rpc_peer.clone()); - } + if highest_snapshot_hash.is_none() + || snapshot_hash.0 > highest_snapshot_hash.unwrap().0 + { + // Found a higher snapshot, remove all nodes with a lower snapshot + eligible_rpc_peers.clear(); + highest_snapshot_hash = Some(*snapshot_hash) + } + + if Some(*snapshot_hash) == highest_snapshot_hash { + eligible_rpc_peers.push(rpc_peer.clone()); } } } + } - if highest_snapshot_slot == 0 { + match highest_snapshot_hash { + None => { assert!(eligible_rpc_peers.is_empty()); - info!("No snapshot available"); - } else { + info!("No snapshots available"); + } + Some(highest_snapshot_hash) => { info!( - "Highest available snapshot slot is {}", - highest_snapshot_slot + "Highest available snapshot slot is {}, available from {:?}", + highest_snapshot_hash.0, + eligible_rpc_peers + .iter() + .map(|contact_info| contact_info.id) + .collect::>() ); } - eligible_rpc_peers + } + eligible_rpc_peers + }; + + if !eligible_rpc_peers.is_empty() { + // Prefer the entrypoint's RPC service if present, otherwise pick one at random + let contact_info = if let Some(contact_info) = eligible_rpc_peers + .iter() + .find(|contact_info| contact_info.gossip == *entrypoint_gossip) + { + contact_info + } else { + &eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())] }; - if !eligible_rpc_peers.is_empty() { - // Prefer the entrypoint's RPC service if present, otherwise pick one at random - let contact_info = if let Some(contact_info) = eligible_rpc_peers - .iter() - .find(|contact_info| contact_info.gossip == *entrypoint_gossip) - { - contact_info - } else { - &eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())] - }; - - info!( - "Trying RPC service from node {}: {:?}", - contact_info.id, contact_info.rpc - ); - let rpc_client = RpcClient::new_socket(contact_info.rpc); - match rpc_client.get_version() { - Ok(rpc_version) => { - info!("RPC node version: {}", rpc_version.solana_core); - break (rpc_client, contact_info.rpc); - } - Err(err) => { - warn!("Failed to get RPC node's version: {}", err); - } + info!( + "Trying RPC service from node {}: {:?}", + contact_info.id, contact_info.rpc + ); + let rpc_client = RpcClient::new_socket(contact_info.rpc); + match rpc_client.get_version() { + Ok(rpc_version) => { + info!("RPC node version: {}", rpc_version.solana_core); + break (contact_info.clone(), rpc_client, highest_snapshot_hash); + } + Err(err) => { + warn!("Failed to get RPC node's version: {}", err); } } } - - sleep(Duration::from_secs(1)); }; gossip_exit_flag.store(true, Ordering::Relaxed); gossip_service.join().unwrap(); - (rpc_client, rpc_addr) + (rpc_contact_info, rpc_client, selected_snapshot_hash) } fn check_vote_account( @@ -409,11 +429,11 @@ fn check_vote_account( fn download_ledger( rpc_addr: &SocketAddr, ledger_path: &Path, - no_snapshot_fetch: bool, + snapshot_hash: Option<(Slot, Hash)>, ) -> Result<(), String> { download_tar_bz2(rpc_addr, "genesis.tar.bz2", ledger_path, false)?; - if !no_snapshot_fetch { + if snapshot_hash.is_some() { let snapshot_package = solana_ledger::snapshot_utils::get_snapshot_archive_path(ledger_path); if snapshot_package.exists() { @@ -998,7 +1018,7 @@ pub fn main() { ); if !no_genesis_fetch { - let (rpc_client, rpc_addr) = get_rpc_addr( + let (rpc_contact_info, rpc_client, snapshot_hash) = get_rpc_node( &node, &identity_keypair, &cluster_entrypoint.gossip, @@ -1012,11 +1032,6 @@ pub fn main() { no_snapshot_fetch, ); - download_ledger(&rpc_addr, &ledger_path, no_snapshot_fetch).unwrap_or_else(|err| { - error!("Failed to initialize ledger: {}", err); - exit(1); - }); - let genesis_hash = rpc_client.get_genesis_hash().unwrap_or_else(|err| { error!("Failed to get genesis hash: {}", err); exit(1); @@ -1045,6 +1060,13 @@ pub fn main() { exit(1); }); } + + download_ledger(&rpc_contact_info.rpc, &ledger_path, snapshot_hash).unwrap_or_else( + |err| { + error!("Failed to initialize ledger: {}", err); + exit(1); + }, + ); } }