From cc7c6c960e27d6bede951cb9e24221e02d17aa1b Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 20 Feb 2020 12:39:53 -0700 Subject: [PATCH] Search for the validator with the highest snapshot --- core/src/cluster_info.rs | 8 +++ validator/src/main.rs | 113 +++++++++++++++++++++++++++------------ 2 files changed, 88 insertions(+), 33 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 57d3dfc42..e52bcee8d 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -504,6 +504,14 @@ impl ClusterInfo { .collect() } + pub fn get_snapshot_hash_for_node(&self, pubkey: &Pubkey) -> Option<&Vec<(Slot, Hash)>> { + self.gossip + .crds + .table + .get(&CrdsValueLabel::SnapshotHash(*pubkey)) + .map(|x| &x.value.snapshot_hash().unwrap().hashes) + } + pub fn get_epoch_state_for_node( &self, pubkey: &Pubkey, diff --git a/validator/src/main.rs b/validator/src/main.rs index 649ff168f..c1ebaf2ac 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -201,6 +201,7 @@ fn get_rpc_addr( identity_keypair: &Arc, entrypoint_gossip: &SocketAddr, expected_shred_version: Option, + snapshot_not_required: bool, ) -> (RpcClient, SocketAddr) { let mut cluster_info = ClusterInfo::new( ClusterInfo::spy_contact_info(&identity_keypair.pubkey()), @@ -219,57 +220,102 @@ fn get_rpc_addr( let (rpc_client, rpc_addr) = loop { info!( - "Searching for RPC service, shred version={:?}...\n{}", + "Searching for an RPC service, shred version={:?}...\n{}", expected_shred_version, cluster_info.read().unwrap().contact_info_trace() ); let mut rpc_peers = cluster_info.read().unwrap().all_rpc_peers(); - - let shred_version_required = !rpc_peers - .iter() - .all(|contact_info| contact_info.shred_version == rpc_peers[0].shred_version); - - if let Some(expected_shred_version) = expected_shred_version { - // Filter out rpc peers that don't match the expected shred version - rpc_peers = rpc_peers - .into_iter() - .filter(|contact_info| contact_info.shred_version == expected_shred_version) - .collect::>(); + match expected_shred_version { + Some(expected_shred_version) => { + // Filter out rpc peers that don't match the expected shred version + rpc_peers = rpc_peers + .into_iter() + .filter(|contact_info| contact_info.shred_version == expected_shred_version) + .collect::>(); + } + None => { + if !rpc_peers + .iter() + .all(|contact_info| contact_info.shred_version == rpc_peers[0].shred_version) + { + eprintln!( + "Multiple shred versions observed in gossip. Restart with --expected-shred-version" + ); + exit(1); + } + } } - if !rpc_peers.is_empty() { - // Prefer the entrypoint's RPC service if present, otherwise pick a node at random - let contact_info = if let Some(contact_info) = rpc_peers - .iter() - .find(|contact_info| contact_info.gossip == *entrypoint_gossip) - { - Some(contact_info.clone()) - } else if shred_version_required && expected_shred_version.is_none() { - // Require the user supply a shred version if there are conflicting shred version in - // gossip to reduce the chance of human error - warn!("Multiple shred versions in gossip. Restart with --expected-shred-version"); - None + if rpc_peers.is_empty() { + info!("No RPC services found "); + } else { + let eligible_rpc_peers = if snapshot_not_required { + rpc_peers } else { - // Pick a node at random - Some(rpc_peers[thread_rng().gen_range(0, rpc_peers.len())].clone()) + let mut eligible_rpc_peers = vec![]; + let mut highest_snapshot_slot = 0; + + for rpc_peer in rpc_peers.iter() { + if let Some(snapshot_hash) = cluster_info + .read() + .unwrap() + .get_snapshot_hash_for_node(&rpc_peer.id) + { + let highest_snapshot_slot_for_node = snapshot_hash + .iter() + .fold(0, |a, (slot, _hash)| a.max(*slot)); + + if highest_snapshot_slot_for_node > highest_snapshot_slot { + // Found a higher snapshot, remove all rpc peers with a lower snapshot + eligible_rpc_peers.clear(); + highest_snapshot_slot = highest_snapshot_slot_for_node; + } + + if highest_snapshot_slot_for_node == highest_snapshot_slot { + eligible_rpc_peers.push(rpc_peer.clone()); + } + } + } + + if highest_snapshot_slot == 0 { + assert!(eligible_rpc_peers.is_empty()); + info!("No snapshot available"); + } else { + info!( + "Highest available snapshot slot is {}", + highest_snapshot_slot + ); + } + eligible_rpc_peers }; - if let Some(ContactInfo { id, rpc, .. }) = contact_info { - info!("Contacting RPC port of node {}: {:?}", id, rpc); - let rpc_client = RpcClient::new_socket(rpc); + if !eligible_rpc_peers.is_empty() { + // Prefer the entrypoint's RPC service if present, otherwise pick one at random + let contact_info = if let Some(contact_info) = eligible_rpc_peers + .iter() + .find(|contact_info| contact_info.gossip == *entrypoint_gossip) + { + contact_info + } else { + &eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())] + }; + + info!( + "Trying RPC service from node {}: {:?}", + contact_info.id, contact_info.rpc + ); + let rpc_client = RpcClient::new_socket(contact_info.rpc); match rpc_client.get_version() { Ok(rpc_version) => { info!("RPC node version: {}", rpc_version.solana_core); - break (rpc_client, rpc); + break (rpc_client, contact_info.rpc); } Err(err) => { - warn!("Failed to get RPC version: {}", err); + warn!("Failed to get RPC node's version: {}", err); } } } - } else { - info!("No RPC service found"); } sleep(Duration::from_secs(1)); @@ -910,6 +956,7 @@ pub fn main() { &identity_keypair, &cluster_entrypoint.gossip, validator_config.expected_shred_version, + no_snapshot_fetch, ); download_ledger(&rpc_addr, &ledger_path, no_snapshot_fetch).unwrap_or_else(|err| {