Refactor out get_rpc_peers() (#20744)

This commit is contained in:
Brooks Prumo 2021-10-18 14:01:52 -05:00 committed by GitHub
parent 751b45df64
commit 33f4e79589
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 84 additions and 65 deletions

View File

@ -101,6 +101,80 @@ fn start_gossip_node(
(cluster_info, gossip_exit_flag, gossip_service)
}
fn get_rpc_peers(
cluster_info: &ClusterInfo,
cluster_entrypoints: &[ContactInfo],
validator_config: &ValidatorConfig,
blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
blacklist_timeout: &Instant,
retry_reason: &mut Option<String>,
) -> Option<Vec<ContactInfo>> {
let shred_version = validator_config
.expected_shred_version
.unwrap_or_else(|| cluster_info.my_shred_version());
if shred_version == 0 {
let all_zero_shred_versions = cluster_entrypoints.iter().all(|cluster_entrypoint| {
cluster_info
.lookup_contact_info_by_gossip_addr(&cluster_entrypoint.gossip)
.map_or(false, |entrypoint| entrypoint.shred_version == 0)
});
if all_zero_shred_versions {
eprintln!("Entrypoint shred version is zero. Restart with --expected-shred-version");
exit(1);
}
info!("Waiting to adopt entrypoint shred version...");
return None;
}
info!(
"Searching for an RPC service with shred version {}{}...",
shred_version,
retry_reason
.as_ref()
.map(|s| format!(" (Retrying: {})", s))
.unwrap_or_default()
);
let rpc_peers = cluster_info
.all_rpc_peers()
.into_iter()
.filter(|contact_info| contact_info.shred_version == shred_version)
.collect::<Vec<_>>();
let rpc_peers_total = rpc_peers.len();
// Filter out blacklisted nodes
let rpc_peers: Vec<_> = rpc_peers
.into_iter()
.filter(|rpc_peer| !blacklisted_rpc_nodes.contains(&rpc_peer.id))
.collect();
let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
let rpc_peers_trusted = rpc_peers
.iter()
.filter(|rpc_peer| is_trusted_validator(&rpc_peer.id, &validator_config.trusted_validators))
.count();
info!(
"Total {} RPC nodes found. {} known, {} blacklisted ",
rpc_peers_total, rpc_peers_trusted, rpc_peers_blacklisted
);
if rpc_peers_blacklisted == rpc_peers_total {
*retry_reason =
if !blacklisted_rpc_nodes.is_empty() && blacklist_timeout.elapsed().as_secs() > 60 {
// If all nodes are blacklisted and no additional nodes are discovered after 60 seconds,
// remove the blacklist and try them all again
blacklisted_rpc_nodes.clear();
Some("Blacklist timeout expired".to_owned())
} else {
Some("Wait for known rpc peers".to_owned())
};
return None;
}
Some(rpc_peers)
}
fn get_rpc_node(
cluster_info: &ClusterInfo,
cluster_entrypoints: &[ContactInfo],
@ -117,73 +191,18 @@ fn get_rpc_node(
sleep(Duration::from_secs(1));
info!("\n{}", cluster_info.rpc_info_trace());
let shred_version = validator_config
.expected_shred_version
.unwrap_or_else(|| cluster_info.my_shred_version());
if shred_version == 0 {
let all_zero_shred_versions = cluster_entrypoints.iter().all(|cluster_entrypoint| {
cluster_info
.lookup_contact_info_by_gossip_addr(&cluster_entrypoint.gossip)
.map_or(false, |entrypoint| entrypoint.shred_version == 0)
});
if all_zero_shred_versions {
eprintln!(
"Entrypoint shred version is zero. Restart with --expected-shred-version"
);
exit(1);
}
info!("Waiting to adopt entrypoint shred version...");
continue;
}
info!(
"Searching for an RPC service with shred version {}{}...",
shred_version,
retry_reason
.as_ref()
.map(|s| format!(" (Retrying: {})", s))
.unwrap_or_default()
);
let rpc_peers = cluster_info
.all_rpc_peers()
.into_iter()
.filter(|contact_info| contact_info.shred_version == shred_version)
.collect::<Vec<_>>();
let rpc_peers_total = rpc_peers.len();
// Filter out blacklisted nodes
let rpc_peers: Vec<_> = rpc_peers
.into_iter()
.filter(|rpc_peer| !blacklisted_rpc_nodes.contains(&rpc_peer.id))
.collect();
let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
let rpc_peers_trusted = rpc_peers
.iter()
.filter(|rpc_peer| {
is_trusted_validator(&rpc_peer.id, &validator_config.trusted_validators)
})
.count();
info!(
"Total {} RPC nodes found. {} known, {} blacklisted ",
rpc_peers_total, rpc_peers_trusted, rpc_peers_blacklisted
);
if rpc_peers_blacklisted == rpc_peers_total {
retry_reason = if !blacklisted_rpc_nodes.is_empty()
&& blacklist_timeout.elapsed().as_secs() > 60
{
// If all nodes are blacklisted and no additional nodes are discovered after 60 seconds,
// remove the blacklist and try them all again
blacklisted_rpc_nodes.clear();
Some("Blacklist timeout expired".to_owned())
} else {
Some("Wait for known rpc peers".to_owned())
};
let rpc_peers = get_rpc_peers(
cluster_info,
cluster_entrypoints,
validator_config,
blacklisted_rpc_nodes,
&blacklist_timeout,
&mut retry_reason,
);
if rpc_peers.is_none() {
continue;
}
let rpc_peers = rpc_peers.unwrap();
blacklist_timeout = Instant::now();
let mut highest_snapshot_hash = get_highest_local_snapshot_hash(snapshot_archives_dir);