Search for the validator with the highest snapshot

This commit is contained in:
Michael Vines 2020-02-20 12:39:53 -07:00
parent 01697a9f5c
commit cc7c6c960e
2 changed files with 88 additions and 33 deletions

View File

@ -504,6 +504,14 @@ impl ClusterInfo {
.collect()
}
pub fn get_snapshot_hash_for_node(&self, pubkey: &Pubkey) -> Option<&Vec<(Slot, Hash)>> {
self.gossip
.crds
.table
.get(&CrdsValueLabel::SnapshotHash(*pubkey))
.map(|x| &x.value.snapshot_hash().unwrap().hashes)
}
pub fn get_epoch_state_for_node(
&self,
pubkey: &Pubkey,

View File

@ -201,6 +201,7 @@ fn get_rpc_addr(
identity_keypair: &Arc<Keypair>,
entrypoint_gossip: &SocketAddr,
expected_shred_version: Option<u16>,
snapshot_not_required: bool,
) -> (RpcClient, SocketAddr) {
let mut cluster_info = ClusterInfo::new(
ClusterInfo::spy_contact_info(&identity_keypair.pubkey()),
@ -219,57 +220,102 @@ fn get_rpc_addr(
let (rpc_client, rpc_addr) = loop {
info!(
"Searching for RPC service, shred version={:?}...\n{}",
"Searching for an RPC service, shred version={:?}...\n{}",
expected_shred_version,
cluster_info.read().unwrap().contact_info_trace()
);
let mut rpc_peers = cluster_info.read().unwrap().all_rpc_peers();
let shred_version_required = !rpc_peers
.iter()
.all(|contact_info| contact_info.shred_version == rpc_peers[0].shred_version);
if let Some(expected_shred_version) = expected_shred_version {
// Filter out rpc peers that don't match the expected shred version
rpc_peers = rpc_peers
.into_iter()
.filter(|contact_info| contact_info.shred_version == expected_shred_version)
.collect::<Vec<_>>();
match expected_shred_version {
Some(expected_shred_version) => {
// Filter out rpc peers that don't match the expected shred version
rpc_peers = rpc_peers
.into_iter()
.filter(|contact_info| contact_info.shred_version == expected_shred_version)
.collect::<Vec<_>>();
}
None => {
if !rpc_peers
.iter()
.all(|contact_info| contact_info.shred_version == rpc_peers[0].shred_version)
{
eprintln!(
"Multiple shred versions observed in gossip. Restart with --expected-shred-version"
);
exit(1);
}
}
}
if !rpc_peers.is_empty() {
// Prefer the entrypoint's RPC service if present, otherwise pick a node at random
let contact_info = if let Some(contact_info) = rpc_peers
.iter()
.find(|contact_info| contact_info.gossip == *entrypoint_gossip)
{
Some(contact_info.clone())
} else if shred_version_required && expected_shred_version.is_none() {
// Require the user supply a shred version if there are conflicting shred version in
// gossip to reduce the chance of human error
warn!("Multiple shred versions in gossip. Restart with --expected-shred-version");
None
if rpc_peers.is_empty() {
info!("No RPC services found ");
} else {
let eligible_rpc_peers = if snapshot_not_required {
rpc_peers
} else {
// Pick a node at random
Some(rpc_peers[thread_rng().gen_range(0, rpc_peers.len())].clone())
let mut eligible_rpc_peers = vec![];
let mut highest_snapshot_slot = 0;
for rpc_peer in rpc_peers.iter() {
if let Some(snapshot_hash) = cluster_info
.read()
.unwrap()
.get_snapshot_hash_for_node(&rpc_peer.id)
{
let highest_snapshot_slot_for_node = snapshot_hash
.iter()
.fold(0, |a, (slot, _hash)| a.max(*slot));
if highest_snapshot_slot_for_node > highest_snapshot_slot {
// Found a higher snapshot, remove all rpc peers with a lower snapshot
eligible_rpc_peers.clear();
highest_snapshot_slot = highest_snapshot_slot_for_node;
}
if highest_snapshot_slot_for_node == highest_snapshot_slot {
eligible_rpc_peers.push(rpc_peer.clone());
}
}
}
if highest_snapshot_slot == 0 {
assert!(eligible_rpc_peers.is_empty());
info!("No snapshot available");
} else {
info!(
"Highest available snapshot slot is {}",
highest_snapshot_slot
);
}
eligible_rpc_peers
};
if let Some(ContactInfo { id, rpc, .. }) = contact_info {
info!("Contacting RPC port of node {}: {:?}", id, rpc);
let rpc_client = RpcClient::new_socket(rpc);
if !eligible_rpc_peers.is_empty() {
// Prefer the entrypoint's RPC service if present, otherwise pick one at random
let contact_info = if let Some(contact_info) = eligible_rpc_peers
.iter()
.find(|contact_info| contact_info.gossip == *entrypoint_gossip)
{
contact_info
} else {
&eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())]
};
info!(
"Trying RPC service from node {}: {:?}",
contact_info.id, contact_info.rpc
);
let rpc_client = RpcClient::new_socket(contact_info.rpc);
match rpc_client.get_version() {
Ok(rpc_version) => {
info!("RPC node version: {}", rpc_version.solana_core);
break (rpc_client, rpc);
break (rpc_client, contact_info.rpc);
}
Err(err) => {
warn!("Failed to get RPC version: {}", err);
warn!("Failed to get RPC node's version: {}", err);
}
}
}
} else {
info!("No RPC service found");
}
sleep(Duration::from_secs(1));
@ -910,6 +956,7 @@ pub fn main() {
&identity_keypair,
&cluster_entrypoint.gossip,
validator_config.expected_shred_version,
no_snapshot_fetch,
);
download_ledger(&rpc_addr, &ledger_path, no_snapshot_fetch).unwrap_or_else(|err| {